diff options
author | George Hazan <ghazan@miranda.im> | 2022-11-30 17:48:47 +0300 |
---|---|---|
committer | George Hazan <ghazan@miranda.im> | 2022-11-30 17:48:47 +0300 |
commit | 0ece30dc7c0e34b4c5911969b8fa99c33c6d023c (patch) | |
tree | 671325d3fec09b999411e4e3ab84ef8259261818 /protocols/Telegram/tdlib/td/tddb | |
parent | 46c53ffc6809c67e4607e99951a2846c382b63b2 (diff) |
Telegram: update for TDLIB
Diffstat (limited to 'protocols/Telegram/tdlib/td/tddb')
35 files changed, 2080 insertions, 715 deletions
diff --git a/protocols/Telegram/tdlib/td/tddb/CMakeLists.txt b/protocols/Telegram/tdlib/td/tddb/CMakeLists.txt index 531dcc5c02..036f0ca61c 100644 --- a/protocols/Telegram/tdlib/td/tddb/CMakeLists.txt +++ b/protocols/Telegram/tdlib/td/tddb/CMakeLists.txt @@ -1,4 +1,10 @@ -cmake_minimum_required(VERSION 3.0.2 FATAL_ERROR) +if ((CMAKE_MAJOR_VERSION LESS 3) OR (CMAKE_VERSION VERSION_LESS "3.0.2")) + message(FATAL_ERROR "CMake >= 3.0.2 is required") +endif() + +if (NOT DEFINED CMAKE_INSTALL_LIBDIR) + set(CMAKE_INSTALL_LIBDIR "lib") +endif() #SOURCE SETS set(TDDB_SOURCE @@ -8,16 +14,19 @@ set(TDDB_SOURCE td/db/binlog/detail/BinlogEventsBuffer.cpp td/db/binlog/detail/BinlogEventsProcessor.cpp + td/db/SqliteConnectionSafe.cpp td/db/SqliteDb.cpp - td/db/SqliteStatement.cpp + td/db/SqliteKeyValue.cpp td/db/SqliteKeyValueAsync.cpp + td/db/SqliteStatement.cpp + td/db/TQueue.cpp td/db/detail/RawSqliteDb.cpp td/db/binlog/Binlog.h - td/db/binlog/BinlogInterface.h td/db/binlog/BinlogEvent.h td/db/binlog/BinlogHelper.h + td/db/binlog/BinlogInterface.h td/db/binlog/ConcurrentBinlog.h td/db/binlog/detail/BinlogEventsBuffer.h td/db/binlog/detail/BinlogEventsProcessor.h @@ -25,7 +34,6 @@ set(TDDB_SOURCE td/db/BinlogKeyValue.h td/db/DbKey.h td/db/KeyValueSyncInterface.h - td/db/Pmc.h td/db/SeqKeyValue.h td/db/SqliteConnectionSafe.h td/db/SqliteDb.h @@ -33,6 +41,7 @@ set(TDDB_SOURCE td/db/SqliteKeyValueAsync.h td/db/SqliteKeyValueSafe.h td/db/SqliteStatement.h + td/db/TQueue.h td/db/TsSeqKeyValue.h td/db/detail/RawSqliteDb.h @@ -48,8 +57,6 @@ if (NOT CMAKE_CROSSCOMPILING) endif() install(TARGETS tddb EXPORT TdTargets - LIBRARY DESTINATION lib - ARCHIVE DESTINATION lib - RUNTIME DESTINATION bin - INCLUDES DESTINATION include + LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}" + ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}" ) diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/BinlogKeyValue.h b/protocols/Telegram/tdlib/td/tddb/td/db/BinlogKeyValue.h index 04df413d53..9ed4d92240 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/BinlogKeyValue.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/BinlogKeyValue.h @@ -1,40 +1,42 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once -#include "td/actor/PromiseFuture.h" - #include "td/db/binlog/Binlog.h" #include "td/db/binlog/BinlogEvent.h" -#include "td/db/binlog/ConcurrentBinlog.h" +#include "td/db/DbKey.h" #include "td/db/KeyValueSyncInterface.h" +#include "td/utils/algorithm.h" #include "td/utils/buffer.h" #include "td/utils/common.h" +#include "td/utils/HashTableUtils.h" #include "td/utils/logging.h" #include "td/utils/misc.h" #include "td/utils/port/RwMutex.h" +#include "td/utils/Promise.h" #include "td/utils/Slice.h" #include "td/utils/Status.h" +#include "td/utils/StorerBase.h" #include "td/utils/tl_parsers.h" #include "td/utils/tl_storers.h" -#include <map> #include <memory> #include <unordered_map> #include <utility> namespace td { + template <class BinlogT> -class BinlogKeyValue : public KeyValueSyncInterface { +class BinlogKeyValue final : public KeyValueSyncInterface { public: - static constexpr int32 magic = 0x2a280000; + static constexpr int32 MAGIC = 0x2a280000; - struct Event : public Storer { + struct Event final : public Storer { Event() = default; Event(Slice key, Slice value) : key(key), value(value) { } @@ -52,16 +54,15 @@ class BinlogKeyValue : public KeyValueSyncInterface { value = parser.template fetch_string<Slice>(); } - size_t size() const override { + size_t size() const final { TlStorerCalcLength storer; store(storer); return storer.get_length(); } - size_t store(uint8 *ptr_x) const override { - auto ptr = reinterpret_cast<char *>(ptr_x); + size_t store(uint8 *ptr) const final { TlStorerUnsafe storer(ptr); store(storer); - return storer.get_buf() - ptr; + return static_cast<size_t>(storer.get_buf() - ptr); } }; @@ -76,15 +77,15 @@ class BinlogKeyValue : public KeyValueSyncInterface { magic_ = override_magic; } - binlog_ = std::make_unique<BinlogT>(); - TRY_STATUS(binlog_->init(name, - [&](const BinlogEvent &binlog_event) { - Event event; - event.parse(TlParser(binlog_event.data_)); - map_.emplace(std::make_pair(event.key.str(), - std::make_pair(event.value.str(), binlog_event.id_))); - }, - std::move(db_key), DbKey::empty(), scheduler_id)); + binlog_ = std::make_shared<BinlogT>(); + TRY_STATUS(binlog_->init( + name, + [&](const BinlogEvent &binlog_event) { + Event event; + event.parse(TlParser(binlog_event.data_)); + map_.emplace(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_)); + }, + std::move(db_key), DbKey::empty(), scheduler_id)); return Status::OK(); } @@ -103,7 +104,7 @@ class BinlogKeyValue : public KeyValueSyncInterface { void external_init_handle(const BinlogEvent &binlog_event) { Event event; event.parse(TlParser(binlog_event.data_)); - map_.emplace(std::make_pair(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_))); + map_.emplace(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_)); } void external_init_finish(std::shared_ptr<BinlogT> binlog) { @@ -113,17 +114,24 @@ class BinlogKeyValue : public KeyValueSyncInterface { void close() { *this = BinlogKeyValue(); } + void close(Promise<> promise) final { + binlog_->close(std::move(promise)); + } - SeqNo set(string key, string value) override { + SeqNo set(string key, string value) final { auto lock = rw_mutex_.lock_write().move_as_ok(); uint64 old_id = 0; - auto it_ok = map_.insert({key, {value, 0}}); + auto it_ok = map_.emplace(key, std::make_pair(value, 0)); if (!it_ok.second) { if (it_ok.first->second.first == value) { return 0; } + VLOG(binlog) << "Change value of key " << key << " from " << hex_encode(it_ok.first->second.first) << " to " + << hex_encode(value); old_id = it_ok.first->second.second; it_ok.first->second.first = value; + } else { + VLOG(binlog) << "Set value of key " << key << " to " << hex_encode(value); } bool rewrite = false; uint64 id; @@ -142,15 +150,15 @@ class BinlogKeyValue : public KeyValueSyncInterface { return seq_no; } - SeqNo erase(const string &key) override { + SeqNo erase(const string &key) final { auto lock = rw_mutex_.lock_write().move_as_ok(); auto it = map_.find(key); if (it == map_.end()) { return 0; } + VLOG(binlog) << "Remove value of key " << key << ", which is " << hex_encode(it->second.first); uint64 id = it->second.second; map_.erase(it); - // LOG(ERROR) << "ADD EVENT"; auto seq_no = binlog_->next_id(); lock.reset(); add_event(seq_no, BinlogEvent::create_raw(id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, @@ -159,62 +167,62 @@ class BinlogKeyValue : public KeyValueSyncInterface { } void add_event(uint64 seq_no, BufferSlice &&event) { - binlog_->add_raw_event(seq_no, std::move(event)); + binlog_->add_raw_event(BinlogDebugInfo{__FILE__, __LINE__}, seq_no, std::move(event)); } - bool isset(const string &key) override { + bool isset(const string &key) final { auto lock = rw_mutex_.lock_read().move_as_ok(); return map_.count(key) > 0; } - string get(const string &key) override { + string get(const string &key) final { auto lock = rw_mutex_.lock_read().move_as_ok(); auto it = map_.find(key); if (it == map_.end()) { return string(); } + VLOG(binlog) << "Get value of key " << key << ", which is " << hex_encode(it->second.first); return it->second.first; } - void force_sync(Promise<> &&promise) { + void force_sync(Promise<> &&promise) final { binlog_->force_sync(std::move(promise)); } + void lazy_sync(Promise<> &&promise) { binlog_->lazy_sync(std::move(promise)); } - std::unordered_map<string, string> prefix_get(Slice prefix) { - // TODO: optimize with std::map? + std::unordered_map<string, string, Hash<string>> prefix_get(Slice prefix) final { auto lock = rw_mutex_.lock_write().move_as_ok(); - std::unordered_map<string, string> res; + std::unordered_map<string, string, Hash<string>> res; for (const auto &kv : map_) { if (begins_with(kv.first, prefix)) { - res[kv.first] = kv.second.first; + res.emplace(kv.first.substr(prefix.size()), kv.second.first); } } return res; } - std::unordered_map<string, string> get_all() { + std::unordered_map<string, string, Hash<string>> get_all() final { auto lock = rw_mutex_.lock_write().move_as_ok(); - std::unordered_map<string, string> res; + std::unordered_map<string, string, Hash<string>> res; for (const auto &kv : map_) { - res[kv.first] = kv.second.first; + res.emplace(kv.first, kv.second.first); } return res; } - void erase_by_prefix(Slice prefix) { + void erase_by_prefix(Slice prefix) final { auto lock = rw_mutex_.lock_write().move_as_ok(); - std::vector<uint64> ids; - for (auto it = map_.begin(); it != map_.end();) { - if (begins_with(it->first, prefix)) { - ids.push_back(it->second.second); - it = map_.erase(it); - } else { - ++it; + vector<uint64> ids; + table_remove_if(map_, [&](const auto &it) { + if (begins_with(it.first, prefix)) { + ids.push_back(it.second.second); + return true; } - } + return false; + }); auto seq_no = binlog_->next_id(narrow_cast<int32>(ids.size())); lock.reset(); for (auto id : ids) { @@ -231,22 +239,26 @@ class BinlogKeyValue : public KeyValueSyncInterface { } private: - std::unordered_map<string, std::pair<string, uint64>> map_; + std::unordered_map<string, std::pair<string, uint64>, Hash<string>> map_; std::shared_ptr<BinlogT> binlog_; RwMutex rw_mutex_; - int32 magic_ = magic; + int32 magic_ = MAGIC; }; + template <> inline void BinlogKeyValue<Binlog>::add_event(uint64 seq_no, BufferSlice &&event) { - binlog_->add_raw_event(std::move(event)); + binlog_->add_raw_event(std::move(event), BinlogDebugInfo{__FILE__, __LINE__}); } + template <> inline void BinlogKeyValue<Binlog>::force_sync(Promise<> &&promise) { binlog_->sync(); promise.set_value(Unit()); } + template <> inline void BinlogKeyValue<Binlog>::lazy_sync(Promise<> &&promise) { force_sync(std::move(promise)); } + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/DbKey.h b/protocols/Telegram/tdlib/td/tddb/td/db/DbKey.h index b0edde2ae1..084f6283de 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/DbKey.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/DbKey.h @@ -1,51 +1,61 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once + #include "td/utils/common.h" #include "td/utils/Slice.h" namespace td { + class DbKey { - public: - enum Type { Empty, RawKey, Password }; + enum class Type { Empty, RawKey, Password }; Type type() const { return type_; } + + public: bool is_empty() const { - return type_ == Empty; + return type_ == Type::Empty; } + bool is_raw_key() const { - return type_ == RawKey; + return type_ == Type::RawKey; } + bool is_password() const { - return type_ == Password; + return type_ == Type::Password; } + CSlice data() const { return data_; } + static DbKey raw_key(string raw_key) { DbKey res; - res.type_ = RawKey; + res.type_ = Type::RawKey; res.data_ = std::move(raw_key); return res; } + static DbKey password(string password) { DbKey res; - res.type_ = Password; + res.type_ = Type::Password; res.data_ = std::move(password); return res; } + static DbKey empty() { return DbKey(); } private: - Type type_{Empty}; + Type type_{Type::Empty}; string data_; }; + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/KeyValueSyncInterface.h b/protocols/Telegram/tdlib/td/tddb/td/db/KeyValueSyncInterface.h index 8d19d0e75c..4007d0bfd6 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/KeyValueSyncInterface.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/KeyValueSyncInterface.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -7,6 +7,11 @@ #pragma once #include "td/utils/common.h" +#include "td/utils/HashTableUtils.h" +#include "td/utils/Promise.h" +#include "td/utils/Slice.h" + +#include <unordered_map> namespace td { @@ -29,7 +34,17 @@ class KeyValueSyncInterface { virtual string get(const string &key) = 0; + virtual std::unordered_map<string, string, Hash<string>> prefix_get(Slice prefix) = 0; + + virtual std::unordered_map<string, string, Hash<string>> get_all() = 0; + virtual SeqNo erase(const string &key) = 0; + + virtual void erase_by_prefix(Slice prefix) = 0; + + virtual void force_sync(Promise<> &&promise) = 0; + + virtual void close(Promise<> promise) = 0; }; } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/Pmc.h b/protocols/Telegram/tdlib/td/tddb/td/db/Pmc.h deleted file mode 100644 index dcf0e0c351..0000000000 --- a/protocols/Telegram/tdlib/td/tddb/td/db/Pmc.h +++ /dev/null @@ -1,27 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#pragma once - -#include "td/db/binlog/ConcurrentBinlog.h" -#include "td/db/BinlogKeyValue.h" -#include "td/db/SqliteKeyValue.h" - -#include "td/utils/common.h" - -#include <memory> - -namespace td { - -using BinlogPmcBase = BinlogKeyValue<ConcurrentBinlog>; -using BinlogPmc = std::shared_ptr<BinlogPmcBase>; -using BinlogPmcPtr = BinlogPmcBase *; - -using BigPmcBase = SqliteKeyValue; -using BigPmc = std::unique_ptr<BigPmcBase>; -using BigPmcPtr = BigPmcBase *; - -}; // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SeqKeyValue.h b/protocols/Telegram/tdlib/td/tddb/td/db/SeqKeyValue.h index ec6f2b99b6..0ec10b682a 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SeqKeyValue.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SeqKeyValue.h @@ -1,16 +1,18 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once +#include "td/utils/HashTableUtils.h" #include "td/utils/Slice.h" #include <unordered_map> namespace td { + class SeqKeyValue { public: using SeqNo = uint64; @@ -22,7 +24,7 @@ class SeqKeyValue { ~SeqKeyValue() = default; SeqNo set(Slice key, Slice value) { - auto it_ok = map_.insert({key.str(), value.str()}); + auto it_ok = map_.emplace(key.str(), value.str()); if (!it_ok.second) { if (it_ok.first->second == value) { return 0; @@ -31,6 +33,7 @@ class SeqKeyValue { } return next_seq_no(); } + SeqNo erase(const string &key) { auto it = map_.find(key); if (it == map_.end()) { @@ -39,9 +42,11 @@ class SeqKeyValue { map_.erase(it); return next_seq_no(); } + SeqNo seq_no() const { return current_id_ + 1; } + string get(const string &key) const { auto it = map_.find(key); if (it == map_.end()) { @@ -50,29 +55,28 @@ class SeqKeyValue { return it->second; } - template <class F> - void foreach (const F &f) { - for (auto &it : map_) { - f(it.first, it.second); + bool isset(const string &key) const { + auto it = map_.find(key); + if (it == map_.end()) { + return false; } + return true; } size_t size() const { return map_.size(); } - void reset_seq_no() { - current_id_ = 0; - } - std::unordered_map<string, string> get_all() const { + std::unordered_map<string, string, Hash<string>> get_all() const { return map_; } private: - std::unordered_map<string, string> map_; + std::unordered_map<string, string, Hash<string>> map_; SeqNo current_id_ = 0; SeqNo next_seq_no() { return ++current_id_; } }; + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.cpp new file mode 100644 index 0000000000..a3c13d7120 --- /dev/null +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.cpp @@ -0,0 +1,51 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/SqliteConnectionSafe.h" + +#include "td/utils/common.h" +#include "td/utils/format.h" +#include "td/utils/logging.h" + +namespace td { + +SqliteConnectionSafe::SqliteConnectionSafe(string path, DbKey key, optional<int32> cipher_version) + : path_(std::move(path)) + , lsls_connection_([path = path_, close_state_ptr = &close_state_, key = std::move(key), + cipher_version = std::move(cipher_version)] { + auto r_db = SqliteDb::open_with_key(path, false, key, cipher_version.copy()); + if (r_db.is_error()) { + LOG(FATAL) << "Can't open database in state " << close_state_ptr->load() << ": " << r_db.error().message(); + } + auto db = r_db.move_as_ok(); + db.exec("PRAGMA journal_mode=WAL").ensure(); + db.exec("PRAGMA secure_delete=1").ensure(); + return db; + }) { +} + +void SqliteConnectionSafe::set(SqliteDb &&db) { + lsls_connection_.set(std::move(db)); +} + +SqliteDb &SqliteConnectionSafe::get() { + return lsls_connection_.get(); +} + +void SqliteConnectionSafe::close() { + LOG(INFO) << "Close SQLite database " << tag("path", path_); + close_state_++; + lsls_connection_.clear_values(); +} + +void SqliteConnectionSafe::close_and_destroy() { + close(); + LOG(INFO) << "Destroy SQLite database " << tag("path", path_); + close_state_ += 65536; + SqliteDb::destroy(path_).ignore(); +} + +} // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.h index 6e45e79e63..a8bae2b3c2 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.h @@ -1,53 +1,39 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once -#include "td/actor/SchedulerLocalStorage.h" - +#include "td/db/DbKey.h" #include "td/db/SqliteDb.h" +#include "td/actor/SchedulerLocalStorage.h" + #include "td/utils/common.h" -#include "td/utils/format.h" -#include "td/utils/logging.h" +#include "td/utils/optional.h" + +#include <atomic> namespace td { class SqliteConnectionSafe { public: SqliteConnectionSafe() = default; - explicit SqliteConnectionSafe(string name, DbKey key = DbKey::empty()) - : lsls_connection_([name = name, key = std::move(key)] { - auto db = SqliteDb::open_with_key(name, key).move_as_ok(); - db.exec("PRAGMA synchronous=NORMAL").ensure(); - db.exec("PRAGMA temp_store=MEMORY").ensure(); - db.exec("PRAGMA secure_delete=1").ensure(); - db.exec("PRAGMA recursive_triggers=1").ensure(); - return db; - }) - , name_(std::move(name)) { - } - - SqliteDb &get() { - return lsls_connection_.get(); - } - - void close() { - LOG(INFO) << "Close sqlite db " << tag("path", name_); - lsls_connection_.clear_values(); - } - void close_and_destroy() { - close(); - LOG(INFO) << "Destroy sqlite db " << tag("path", name_); - SqliteDb::destroy(name_).ignore(); - } + SqliteConnectionSafe(string path, DbKey key, optional<int32> cipher_version = {}); + + SqliteDb &get(); + void set(SqliteDb &&db); + + void close(); + + void close_and_destroy(); private: + string path_; + std::atomic<uint32> close_state_{0}; LazySchedulerLocalStorage<SqliteDb> lsls_connection_; - string name_; }; } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.cpp index 819818197d..2ae37b7f90 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.cpp @@ -1,14 +1,17 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "td/db/SqliteDb.h" +#include "td/utils/common.h" #include "td/utils/format.h" +#include "td/utils/logging.h" #include "td/utils/port/path.h" #include "td/utils/port/Stat.h" +#include "td/utils/SliceBuilder.h" #include "td/utils/Status.h" #include "td/utils/StringBuilder.h" #include "td/utils/Timer.h" @@ -18,12 +21,34 @@ namespace td { namespace { +string quote_string(Slice str) { + size_t cnt = 0; + for (auto &c : str) { + if (c == '\'') { + cnt++; + } + } + if (cnt == 0) { + return str.str(); + } + + string result; + result.reserve(str.size() + cnt); + for (auto &c : str) { + if (c == '\'') { + result += '\''; + } + result += c; + } + return result; +} + string db_key_to_sqlcipher_key(const DbKey &db_key) { if (db_key.is_empty()) { return "''"; } if (db_key.is_password()) { - return PSTRING() << "'" << db_key.data().str() << "'"; + return PSTRING() << "'" << quote_string(db_key.data()) << "'"; } CHECK(db_key.is_raw_key()); Slice raw_key = db_key.data(); @@ -46,27 +71,29 @@ string db_key_to_sqlcipher_key(const DbKey &db_key) { SqliteDb::~SqliteDb() = default; -Status SqliteDb::init(CSlice path, bool *was_created) { - // If database does not exist, delete all other files which may left - // from older database - bool is_db_exists = stat(path).is_ok(); - if (!is_db_exists) { - destroy(path).ignore(); +Status SqliteDb::init(CSlice path, bool allow_creation) { + // if database does not exist, delete all other files which could have been left from the old database + auto database_stat = stat(path); + if (database_stat.is_error()) { + if (!allow_creation) { + bool was_destroyed = detail::RawSqliteDb::was_any_database_destroyed(); + auto reason = was_destroyed ? Slice("was corrupted and deleted") : Slice("disappeared"); + return Status::Error(PSLICE() << "Database " << reason + << " during execution and can't be recreated: " << database_stat.error()); + } + TRY_STATUS(destroy(path)); } - if (was_created != nullptr) { - *was_created = !is_db_exists; - } - sqlite3 *db; - CHECK(sqlite3_threadsafe() != 0); - int rc = sqlite3_open_v2(path.c_str(), &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE /*| SQLITE_OPEN_SHAREDCACHE*/, - nullptr); + tdsqlite3 *db; + CHECK(tdsqlite3_threadsafe() != 0); + int rc = + tdsqlite3_open_v2(path.c_str(), &db, SQLITE_OPEN_READWRITE | (allow_creation ? SQLITE_OPEN_CREATE : 0), nullptr); if (rc != SQLITE_OK) { - auto res = Status::Error(PSLICE() << "Failed to open db: " << detail::RawSqliteDb::last_error(db)); - sqlite3_close(db); + auto res = detail::RawSqliteDb::last_error(db, path); + tdsqlite3_close(db); return res; } - sqlite3_busy_timeout(db, 1000 * 5 /* 5 seconds */); + tdsqlite3_busy_timeout(db, 1000 * 5 /* 5 seconds */); raw_ = std::make_shared<detail::RawSqliteDb>(db, path.str()); return Status::OK(); } @@ -74,32 +101,41 @@ Status SqliteDb::init(CSlice path, bool *was_created) { static void trace_callback(void *ptr, const char *query) { LOG(ERROR) << query; } + static int trace_v2_callback(unsigned code, void *ctx, void *p_raw, void *x_raw) { CHECK(code == SQLITE_TRACE_STMT); auto x = static_cast<const char *>(x_raw); if (x[0] == '-' && x[1] == '-') { trace_callback(ctx, x); } else { - trace_callback(ctx, sqlite3_expanded_sql(static_cast<sqlite3_stmt *>(p_raw))); + trace_callback(ctx, tdsqlite3_expanded_sql(static_cast<tdsqlite3_stmt *>(p_raw))); } return 0; } + void SqliteDb::trace(bool flag) { - sqlite3_trace_v2(raw_->db(), SQLITE_TRACE_STMT, flag ? trace_v2_callback : nullptr, nullptr); + tdsqlite3_trace_v2(raw_->db(), SQLITE_TRACE_STMT, flag ? trace_v2_callback : nullptr, nullptr); } Status SqliteDb::exec(CSlice cmd) { CHECK(!empty()); char *msg; - VLOG(sqlite) << "Start exec " << tag("cmd", cmd) << tag("db", raw_->db()); - auto rc = sqlite3_exec(raw_->db(), cmd.c_str(), nullptr, nullptr, &msg); - VLOG(sqlite) << "Finish exec " << tag("cmd", cmd) << tag("db", raw_->db()); + if (enable_logging_) { + VLOG(sqlite) << "Start exec " << tag("query", cmd) << tag("database", raw_->db()); + } + auto rc = tdsqlite3_exec(raw_->db(), cmd.c_str(), nullptr, nullptr, &msg); if (rc != SQLITE_OK) { CHECK(msg != nullptr); - return Status::Error(PSLICE() << tag("query", cmd) << " failed: " << msg); + if (enable_logging_) { + VLOG(sqlite) << "Finish exec with error " << msg; + } + return Status::Error(PSLICE() << tag("query", cmd) << " to database \"" << raw_->path() << "\" failed: " << msg); } CHECK(msg == nullptr); + if (enable_logging_) { + VLOG(sqlite) << "Finish exec"; + } return Status::OK(); } @@ -111,6 +147,7 @@ Result<bool> SqliteDb::has_table(Slice table) { auto cnt = stmt.view_int32(0); return cnt == 1; } + Result<string> SqliteDb::get_pragma(Slice name) { TRY_RESULT(stmt, get_statement(PSLICE() << "PRAGMA " << name)); TRY_STATUS(stmt.step()); @@ -121,11 +158,21 @@ Result<string> SqliteDb::get_pragma(Slice name) { return std::move(res); } +Result<string> SqliteDb::get_pragma_string(Slice name) { + TRY_RESULT(stmt, get_statement(PSLICE() << "PRAGMA " << name)); + TRY_STATUS(stmt.step()); + CHECK(stmt.has_row()); + auto res = stmt.view_string(0).str(); + TRY_STATUS(stmt.step()); + CHECK(!stmt.can_step()); + return std::move(res); +} + Result<int32> SqliteDb::user_version() { TRY_RESULT(get_version_stmt, get_statement("PRAGMA user_version")); TRY_STATUS(get_version_stmt.step()); if (!get_version_stmt.has_row()) { - return Status::Error("PRAGMA user_version failed"); + return Status::Error(PSLICE() << "PRAGMA user_version failed for database \"" << raw_->path() << '"'); } return get_version_stmt.view_int32(0); } @@ -134,56 +181,103 @@ Status SqliteDb::set_user_version(int32 version) { return exec(PSLICE() << "PRAGMA user_version = " << version); } -Status SqliteDb::begin_transaction() { - return exec("BEGIN"); +Status SqliteDb::begin_read_transaction() { + if (raw_->on_begin()) { + return exec("BEGIN"); + } + return Status::OK(); +} + +Status SqliteDb::begin_write_transaction() { + if (raw_->on_begin()) { + return exec("BEGIN IMMEDIATE"); + } + return Status::OK(); } + Status SqliteDb::commit_transaction() { - return exec("COMMIT"); + TRY_RESULT(need_commit, raw_->on_commit()); + if (need_commit) { + return exec("COMMIT"); + } + return Status::OK(); } -bool SqliteDb::is_encrypted() { - return exec("SELECT count(*) FROM sqlite_master").is_error(); +Status SqliteDb::check_encryption() { + auto status = exec("SELECT count(*) FROM sqlite_master"); + if (status.is_ok()) { + enable_logging_ = true; + } + return status; } -Result<SqliteDb> SqliteDb::open_with_key(CSlice path, const DbKey &db_key) { +Result<SqliteDb> SqliteDb::open_with_key(CSlice path, bool allow_creation, const DbKey &db_key, + optional<int32> cipher_version) { + auto res = do_open_with_key(path, allow_creation, db_key, cipher_version ? cipher_version.value() : 0); + if (res.is_error() && !cipher_version && !db_key.is_empty()) { + return do_open_with_key(path, false, db_key, 3); + } + return res; +} + +Result<SqliteDb> SqliteDb::do_open_with_key(CSlice path, bool allow_creation, const DbKey &db_key, + int32 cipher_version) { SqliteDb db; - TRY_STATUS(db.init(path)); + TRY_STATUS(db.init(path, allow_creation)); if (!db_key.is_empty()) { - if (!db.is_encrypted()) { - return Status::Error("No key is needed"); + if (db.check_encryption().is_ok()) { + return Status::Error(PSLICE() << "No key is needed for database \"" << path << '"'); } auto key = db_key_to_sqlcipher_key(db_key); TRY_STATUS(db.exec(PSLICE() << "PRAGMA key = " << key)); + if (cipher_version != 0) { + LOG(INFO) << "Trying SQLCipher compatibility mode with version = " << cipher_version; + TRY_STATUS(db.exec(PSLICE() << "PRAGMA cipher_compatibility = " << cipher_version)); + } + db.set_cipher_version(cipher_version); } - if (db.is_encrypted()) { - return Status::Error("Wrong key"); - } + TRY_STATUS_PREFIX(db.check_encryption(), "Can't check database: "); return std::move(db); } -Status SqliteDb::change_key(CSlice path, const DbKey &new_db_key, const DbKey &old_db_key) { +void SqliteDb::set_cipher_version(int32 cipher_version) { + raw_->set_cipher_version(cipher_version); +} + +optional<int32> SqliteDb::get_cipher_version() const { + return raw_->get_cipher_version(); +} + +Result<SqliteDb> SqliteDb::change_key(CSlice path, bool allow_creation, const DbKey &new_db_key, + const DbKey &old_db_key) { // fast path { - auto r_db = open_with_key(path, new_db_key); + PerfWarningTimer perf("open database", 0.05); + auto r_db = open_with_key(path, allow_creation, new_db_key); if (r_db.is_ok()) { - return Status::OK(); + return r_db; } } - TRY_RESULT(db, open_with_key(path, old_db_key)); + PerfWarningTimer perf("change database key", 0.5); + auto create_database = [](CSlice tmp_path) -> Status { + TRY_STATUS(destroy(tmp_path)); + SqliteDb db; + return db.init(tmp_path, true); + }; + + TRY_RESULT(db, open_with_key(path, false, old_db_key)); TRY_RESULT(user_version, db.user_version()); auto new_key = db_key_to_sqlcipher_key(new_db_key); if (old_db_key.is_empty() && !new_db_key.is_empty()) { LOG(DEBUG) << "ENCRYPT"; - // Encrypt - PerfWarningTimer timer("Encrypt sqlite database", 0.1); - auto tmp_path = path.str() + ".ecnrypted"; - unlink(tmp_path).ignore(); + PerfWarningTimer timer("Encrypt SQLite database", 0.1); + auto tmp_path = path.str() + ".encrypted"; + TRY_STATUS(create_database(tmp_path)); - // make shure that database is not empty + // make sure that database is not empty TRY_STATUS(db.exec("CREATE TABLE IF NOT EXISTS encryption_dummy_table(id INT PRIMARY KEY)")); - //NB: not really safe - TRY_STATUS(db.exec(PSLICE() << "ATTACH DATABASE '" << tmp_path << "' AS encrypted KEY " << new_key)); + TRY_STATUS(db.exec(PSLICE() << "ATTACH DATABASE '" << quote_string(tmp_path) << "' AS encrypted KEY " << new_key)); TRY_STATUS(db.exec("SELECT sqlcipher_export('encrypted')")); TRY_STATUS(db.exec(PSLICE() << "PRAGMA encrypted.user_version = " << user_version)); TRY_STATUS(db.exec("DETACH DATABASE encrypted")); @@ -191,13 +285,11 @@ Status SqliteDb::change_key(CSlice path, const DbKey &new_db_key, const DbKey &o TRY_STATUS(rename(tmp_path, path)); } else if (!old_db_key.is_empty() && new_db_key.is_empty()) { LOG(DEBUG) << "DECRYPT"; - // Dectypt - PerfWarningTimer timer("Decrypt sqlite database", 0.1); - auto tmp_path = path.str() + ".ecnrypted"; - unlink(tmp_path).ignore(); + PerfWarningTimer timer("Decrypt SQLite database", 0.1); + auto tmp_path = path.str() + ".encrypted"; + TRY_STATUS(create_database(tmp_path)); - //NB: not really safe - TRY_STATUS(db.exec(PSLICE() << "ATTACH DATABASE '" << tmp_path << "' AS decrypted KEY ''")); + TRY_STATUS(db.exec(PSLICE() << "ATTACH DATABASE '" << quote_string(tmp_path) << "' AS decrypted KEY ''")); TRY_STATUS(db.exec("SELECT sqlcipher_export('decrypted')")); TRY_STATUS(db.exec(PSLICE() << "PRAGMA decrypted.user_version = " << user_version)); TRY_STATUS(db.exec("DETACH DATABASE decrypted")); @@ -205,24 +297,27 @@ Status SqliteDb::change_key(CSlice path, const DbKey &new_db_key, const DbKey &o TRY_STATUS(rename(tmp_path, path)); } else { LOG(DEBUG) << "REKEY"; - PerfWarningTimer timer("Rekey sqlite database", 0.1); + PerfWarningTimer timer("Rekey SQLite database", 0.1); TRY_STATUS(db.exec(PSLICE() << "PRAGMA rekey = " << new_key)); } - TRY_RESULT(new_db, open_with_key(path, new_db_key)); + TRY_RESULT(new_db, open_with_key(path, false, new_db_key)); CHECK(new_db.user_version().ok() == user_version); - return Status::OK(); + return std::move(new_db); } Status SqliteDb::destroy(Slice path) { return detail::RawSqliteDb::destroy(path); } Result<SqliteStatement> SqliteDb::get_statement(CSlice statement) { - sqlite3_stmt *stmt = nullptr; - auto rc = sqlite3_prepare_v2(get_native(), statement.c_str(), static_cast<int>(statement.size()) + 1, &stmt, nullptr); + tdsqlite3_stmt *stmt = nullptr; + auto rc = + tdsqlite3_prepare_v2(get_native(), statement.c_str(), static_cast<int>(statement.size()) + 1, &stmt, nullptr); if (rc != SQLITE_OK) { - return Status::Error(PSLICE() << "Failed to prepare sqlite " << tag("stmt", statement) << raw_->last_error()); + return Status::Error(PSLICE() << "Failed to prepare SQLite " << tag("statement", statement) << raw_->last_error()); } + LOG_CHECK(stmt != nullptr) << statement; return SqliteStatement(stmt, raw_); } + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.h index 40137464ce..899b02e4a5 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -11,23 +11,19 @@ #include "td/db/detail/RawSqliteDb.h" -#include "td/utils/logging.h" +#include "td/utils/optional.h" #include "td/utils/Slice.h" #include "td/utils/Status.h" #include <memory> -struct sqlite3; +struct tdsqlite3; namespace td { class SqliteDb { public: SqliteDb() = default; - explicit SqliteDb(CSlice path) { - auto status = init(path); - LOG_IF(FATAL, status.is_error()) << status; - } SqliteDb(SqliteDb &&) = default; SqliteDb &operator=(SqliteDb &&) = default; SqliteDb(const SqliteDb &) = delete; @@ -36,7 +32,7 @@ class SqliteDb { // dangerous SqliteDb clone() const { - return SqliteDb(raw_); + return SqliteDb(raw_, enable_logging_); } bool empty() const { @@ -46,26 +42,28 @@ class SqliteDb { *this = SqliteDb(); } - Status init(CSlice path, bool *was_created = nullptr) TD_WARN_UNUSED_RESULT; Status exec(CSlice cmd) TD_WARN_UNUSED_RESULT; Result<bool> has_table(Slice table); Result<string> get_pragma(Slice name); - Status begin_transaction(); - Status commit_transaction(); + Result<string> get_pragma_string(Slice name); + + Status begin_read_transaction() TD_WARN_UNUSED_RESULT; + Status begin_write_transaction() TD_WARN_UNUSED_RESULT; + Status commit_transaction() TD_WARN_UNUSED_RESULT; Result<int32> user_version(); - Status set_user_version(int32 version); + Status set_user_version(int32 version) TD_WARN_UNUSED_RESULT; void trace(bool flag); static Status destroy(Slice path) TD_WARN_UNUSED_RESULT; - // Anyway we can't change the key on the fly, so static functions is more than enough - static Result<SqliteDb> open_with_key(CSlice path, const DbKey &db_key); - static Status change_key(CSlice path, const DbKey &new_db_key, const DbKey &old_db_key); - - Status last_error(); + // we can't change the key on the fly, so static functions are more than enough + static Result<SqliteDb> open_with_key(CSlice path, bool allow_creation, const DbKey &db_key, + optional<int32> cipher_version = {}); + static Result<SqliteDb> change_key(CSlice path, bool allow_creation, const DbKey &new_db_key, + const DbKey &old_db_key); - sqlite3 *get_native() const { + tdsqlite3 *get_native() const { return raw_->db(); } @@ -76,11 +74,20 @@ class SqliteDb { detail::RawSqliteDb::with_db_path(main_path, f); } + optional<int32> get_cipher_version() const; + private: - explicit SqliteDb(std::shared_ptr<detail::RawSqliteDb> raw) : raw_(std::move(raw)) { + SqliteDb(std::shared_ptr<detail::RawSqliteDb> raw, bool enable_logging) + : raw_(std::move(raw)), enable_logging_(enable_logging) { } std::shared_ptr<detail::RawSqliteDb> raw_; + bool enable_logging_ = false; + + Status init(CSlice path, bool allow_creation) TD_WARN_UNUSED_RESULT; - bool is_encrypted(); + Status check_encryption(); + static Result<SqliteDb> do_open_with_key(CSlice path, bool allow_creation, const DbKey &db_key, int32 cipher_version); + void set_cipher_version(int32 cipher_version); }; + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.cpp new file mode 100644 index 0000000000..97e2e6fe94 --- /dev/null +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.cpp @@ -0,0 +1,124 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/SqliteKeyValue.h" + +#include "td/utils/base64.h" +#include "td/utils/logging.h" +#include "td/utils/ScopeGuard.h" + +namespace td { + +Status SqliteKeyValue::init_with_connection(SqliteDb connection, string table_name) { + auto init_guard = ScopeExit() + [&] { + close(); + }; + db_ = std::move(connection); + table_name_ = std::move(table_name); + TRY_STATUS(init(db_, table_name_)); + + TRY_RESULT_ASSIGN(set_stmt_, + db_.get_statement(PSLICE() << "REPLACE INTO " << table_name_ << " (k, v) VALUES (?1, ?2)")); + TRY_RESULT_ASSIGN(get_stmt_, db_.get_statement(PSLICE() << "SELECT v FROM " << table_name_ << " WHERE k = ?1")); + TRY_RESULT_ASSIGN(erase_stmt_, db_.get_statement(PSLICE() << "DELETE FROM " << table_name_ << " WHERE k = ?1")); + TRY_RESULT_ASSIGN(get_all_stmt_, db_.get_statement(PSLICE() << "SELECT k, v FROM " << table_name_)); + + TRY_RESULT_ASSIGN(erase_by_prefix_stmt_, + db_.get_statement(PSLICE() << "DELETE FROM " << table_name_ << " WHERE ?1 <= k AND k < ?2")); + TRY_RESULT_ASSIGN(erase_by_prefix_rare_stmt_, + db_.get_statement(PSLICE() << "DELETE FROM " << table_name_ << " WHERE ?1 <= k")); + + TRY_RESULT_ASSIGN(get_by_prefix_stmt_, + db_.get_statement(PSLICE() << "SELECT k, v FROM " << table_name_ << " WHERE ?1 <= k AND k < ?2")); + TRY_RESULT_ASSIGN(get_by_prefix_rare_stmt_, + db_.get_statement(PSLICE() << "SELECT k, v FROM " << table_name_ << " WHERE ?1 <= k")); + + init_guard.dismiss(); + return Status::OK(); +} + +Status SqliteKeyValue::drop() { + if (empty()) { + return Status::OK(); + } + + auto result = drop(db_, table_name_); + close(); + return result; +} + +void SqliteKeyValue::set(Slice key, Slice value) { + set_stmt_.bind_blob(1, key).ensure(); + set_stmt_.bind_blob(2, value).ensure(); + auto status = set_stmt_.step(); + if (status.is_error()) { + LOG(FATAL) << "Failed to set \"" << base64_encode(key) << "\": " << status; + } + set_stmt_.reset(); +} + +void SqliteKeyValue::set_all(const FlatHashMap<string, string> &key_values) { + begin_write_transaction().ensure(); + for (auto &key_value : key_values) { + set(key_value.first, key_value.second); + } + commit_transaction().ensure(); +} + +string SqliteKeyValue::get(Slice key) { + SCOPE_EXIT { + get_stmt_.reset(); + }; + get_stmt_.bind_blob(1, key).ensure(); + get_stmt_.step().ensure(); + if (!get_stmt_.has_row()) { + return string(); + } + auto data = get_stmt_.view_blob(0).str(); + get_stmt_.step().ignore(); + return data; +} + +void SqliteKeyValue::erase(Slice key) { + erase_stmt_.bind_blob(1, key).ensure(); + erase_stmt_.step().ensure(); + erase_stmt_.reset(); +} + +void SqliteKeyValue::erase_by_prefix(Slice prefix) { + auto next = next_prefix(prefix); + if (next.empty()) { + SCOPE_EXIT { + erase_by_prefix_rare_stmt_.reset(); + }; + erase_by_prefix_rare_stmt_.bind_blob(1, prefix).ensure(); + erase_by_prefix_rare_stmt_.step().ensure(); + } else { + SCOPE_EXIT { + erase_by_prefix_stmt_.reset(); + }; + erase_by_prefix_stmt_.bind_blob(1, prefix).ensure(); + erase_by_prefix_stmt_.bind_blob(2, next).ensure(); + erase_by_prefix_stmt_.step().ensure(); + } +} + +string SqliteKeyValue::next_prefix(Slice prefix) { + string next = prefix.str(); + size_t pos = next.size(); + while (pos) { + pos--; + auto value = static_cast<uint8>(next[pos]); + value++; + next[pos] = static_cast<char>(value); + if (value != 0) { + return next; + } + } + return string{}; +} + +} // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.h index 6fe050e7f2..bc617ff05f 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -9,149 +9,65 @@ #include "td/db/SqliteDb.h" #include "td/db/SqliteStatement.h" -#include "td/utils/logging.h" -#include "td/utils/ScopeGuard.h" +#include "td/utils/common.h" +#include "td/utils/FlatHashMap.h" #include "td/utils/Slice.h" +#include "td/utils/SliceBuilder.h" #include "td/utils/Status.h" -#include <unordered_map> - namespace td { + class SqliteKeyValue { public: - static Status drop(SqliteDb &connection, Slice kv_name) TD_WARN_UNUSED_RESULT { - return connection.exec(PSLICE() << "DROP TABLE IF EXISTS " << kv_name); + static Status drop(SqliteDb &connection, Slice table_name) TD_WARN_UNUSED_RESULT { + return connection.exec(PSLICE() << "DROP TABLE IF EXISTS " << table_name); } - static Status init(SqliteDb &connection, Slice kv_name) TD_WARN_UNUSED_RESULT { - return connection.exec(PSLICE() << "CREATE TABLE IF NOT EXISTS " << kv_name << " (k BLOB PRIMARY KEY, v BLOB)"); + static Status init(SqliteDb &connection, Slice table_name) TD_WARN_UNUSED_RESULT { + return connection.exec(PSLICE() << "CREATE TABLE IF NOT EXISTS " << table_name << " (k BLOB PRIMARY KEY, v BLOB)"); } - using SeqNo = uint64; - Result<bool> init(string name) TD_WARN_UNUSED_RESULT { - name_ = std::move(name); - bool is_created = false; - SqliteDb db; - TRY_STATUS(db.init(name, &is_created)); - TRY_STATUS(db.exec("PRAGMA encoding=\"UTF-8\"")); - TRY_STATUS(db.exec("PRAGMA synchronous=NORMAL")); - TRY_STATUS(db.exec("PRAGMA journal_mode=WAL")); - TRY_STATUS(db.exec("PRAGMA temp_store=MEMORY")); - TRY_STATUS(init_with_connection(std::move(db), "KV")); - return is_created; + bool empty() const { + return db_.empty(); } - Status init_with_connection(SqliteDb connection, string kv_name) { - db_ = std::move(connection); - kv_name_ = std::move(kv_name); - TRY_STATUS(init(db_, kv_name_)); - TRY_STATUS(db_.exec(PSLICE() << "CREATE TABLE IF NOT EXISTS " << kv_name_ << " (k BLOB PRIMARY KEY, v BLOB)")); - - TRY_RESULT(set_stmt, db_.get_statement(PSLICE() << "REPLACE INTO " << kv_name_ << " (k, v) VALUES (?1, ?2)")); - set_stmt_ = std::move(set_stmt); - TRY_RESULT(get_stmt, db_.get_statement(PSLICE() << "SELECT v FROM " << kv_name_ << " WHERE k = ?1")); - get_stmt_ = std::move(get_stmt); - TRY_RESULT(erase_stmt, db_.get_statement(PSLICE() << "DELETE FROM " << kv_name_ << " WHERE k = ?1")); - erase_stmt_ = std::move(erase_stmt); - TRY_RESULT(get_all_stmt, db_.get_statement(PSLICE() << "SELECT k, v FROM " << kv_name_ << "")); - - TRY_RESULT(erase_by_prefix_stmt, - db_.get_statement(PSLICE() << "DELETE FROM " << kv_name_ << " WHERE ?1 <= k AND k < ?2")); - erase_by_prefix_stmt_ = std::move(erase_by_prefix_stmt); - - TRY_RESULT(erase_by_prefix_rare_stmt, - db_.get_statement(PSLICE() << "DELETE FROM " << kv_name_ << " WHERE ?1 <= k")); - erase_by_prefix_rare_stmt_ = std::move(erase_by_prefix_rare_stmt); - - TRY_RESULT(get_by_prefix_stmt, - db_.get_statement(PSLICE() << "SELECT k, v FROM " << kv_name_ << " WHERE ?1 <= k AND k < ?2")); - get_by_prefix_stmt_ = std::move(get_by_prefix_stmt); - - TRY_RESULT(get_by_prefix_rare_stmt, - db_.get_statement(PSLICE() << "SELECT k, v FROM " << kv_name_ << " WHERE ?1 <= k")); - get_by_prefix_rare_stmt_ = std::move(get_by_prefix_rare_stmt); - - get_all_stmt_ = std::move(get_all_stmt); - return Status::OK(); - } + Status init_with_connection(SqliteDb connection, string table_name) TD_WARN_UNUSED_RESULT; - Result<bool> try_regenerate_index() TD_WARN_UNUSED_RESULT { - return false; - } void close() { - clear(); - } - void close_silent() { - clear(); - } - static Status destroy(Slice name) { - return SqliteDb::destroy(name); - } - void close_and_destroy() { - db_.exec(PSLICE() << "DROP TABLE IF EXISTS " << kv_name_).ensure(); - auto name = std::move(name_); - clear(); - if (!name.empty()) { - SqliteDb::destroy(name).ignore(); - } + *this = SqliteKeyValue(); } - SeqNo set(Slice key, Slice value) { - set_stmt_.bind_blob(1, key).ensure(); - set_stmt_.bind_blob(2, value).ensure(); - set_stmt_.step().ensure(); - set_stmt_.reset(); - return 0; - } + Status drop(); - SeqNo erase(Slice key) { - erase_stmt_.bind_blob(1, key).ensure(); - erase_stmt_.step().ensure(); - erase_stmt_.reset(); - return 0; - } - string get(Slice key) { - SCOPE_EXIT { - get_stmt_.reset(); - }; - get_stmt_.bind_blob(1, key).ensure(); - get_stmt_.step().ensure(); - if (!get_stmt_.has_row()) { - return ""; - } - auto data = get_stmt_.view_blob(0).str(); - get_stmt_.step().ignore(); - return data; + void set(Slice key, Slice value); + + void set_all(const FlatHashMap<string, string> &key_values); + + string get(Slice key); + + void erase(Slice key); + + Status begin_read_transaction() TD_WARN_UNUSED_RESULT { + return db_.begin_read_transaction(); } - Status begin_transaction() { - return db_.begin_transaction(); + Status begin_write_transaction() TD_WARN_UNUSED_RESULT { + return db_.begin_write_transaction(); } - Status commit_transaction() { + + Status commit_transaction() TD_WARN_UNUSED_RESULT { return db_.commit_transaction(); } - void erase_by_prefix(Slice prefix) { - auto next = next_prefix(prefix); - if (next.empty()) { - SCOPE_EXIT { - erase_by_prefix_rare_stmt_.reset(); - }; - erase_by_prefix_rare_stmt_.bind_blob(1, prefix).ensure(); - erase_by_prefix_rare_stmt_.step().ensure(); - } else { - SCOPE_EXIT { - erase_by_prefix_stmt_.reset(); - }; - erase_by_prefix_stmt_.bind_blob(1, prefix).ensure(); - erase_by_prefix_stmt_.bind_blob(2, next).ensure(); - erase_by_prefix_stmt_.step().ensure(); - } - }; + void erase_by_prefix(Slice prefix); - std::unordered_map<string, string> get_all() { - std::unordered_map<string, string> res; - get_by_prefix("", [&](Slice key, Slice value) { res.emplace(key.str(), value.str()); }); + FlatHashMap<string, string> get_all() { + FlatHashMap<string, string> res; + get_by_prefix("", [&](Slice key, Slice value) { + CHECK(!key.empty()); + res.emplace(key.str(), value.str()); + return true; + }); return res; } @@ -163,6 +79,7 @@ class SqliteKeyValue { } get_by_range(prefix, next, callback); } + template <class CallbackT> void get_by_range(Slice from, Slice till, CallbackT &&callback) { SqliteStatement *stmt = nullptr; @@ -181,18 +98,15 @@ class SqliteKeyValue { auto guard = stmt->guard(); stmt->step().ensure(); while (stmt->has_row()) { - callback(stmt->view_blob(0), stmt->view_blob(1)); + if (!callback(stmt->view_blob(0), stmt->view_blob(1))) { + return; + } stmt->step().ensure(); } } - void clear() { - *this = SqliteKeyValue(); - } - private: - string name_; // deprecated - string kv_name_; + string table_name_; SqliteDb db_; SqliteStatement get_stmt_; SqliteStatement set_stmt_; @@ -203,19 +117,7 @@ class SqliteKeyValue { SqliteStatement get_by_prefix_stmt_; SqliteStatement get_by_prefix_rare_stmt_; - string next_prefix(Slice prefix) { - string next = prefix.str(); - size_t pos = next.size(); - while (pos) { - pos--; - auto value = static_cast<uint8>(next[pos]); - value++; - next[pos] = static_cast<char>(value); - if (value != 0) { - return next; - } - } - return string{}; - } + static string next_prefix(Slice prefix); }; + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp index e8575b62f1..a0a35a1e54 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp @@ -1,45 +1,57 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "td/db/SqliteKeyValueAsync.h" +#include "td/db/SqliteKeyValue.h" + +#include "td/actor/actor.h" + +#include "td/utils/common.h" #include "td/utils/optional.h" #include "td/utils/Time.h" -#include <unordered_map> - namespace td { -class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface { + +class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface { public: explicit SqliteKeyValueAsync(std::shared_ptr<SqliteKeyValueSafe> kv_safe, int32 scheduler_id = -1) { impl_ = create_actor_on_scheduler<Impl>("KV", scheduler_id, std::move(kv_safe)); } - void set(string key, string value, Promise<> promise) override { + void set(string key, string value, Promise<Unit> promise) final { send_closure_later(impl_, &Impl::set, std::move(key), std::move(value), std::move(promise)); } - void erase(string key, Promise<> promise) override { + void set_all(FlatHashMap<string, string> key_values, Promise<Unit> promise) final { + send_closure_later(impl_, &Impl::set_all, std::move(key_values), std::move(promise)); + } + void erase(string key, Promise<Unit> promise) final { send_closure_later(impl_, &Impl::erase, std::move(key), std::move(promise)); } - void get(string key, Promise<string> promise) override { + void erase_by_prefix(string key_prefix, Promise<Unit> promise) final { + send_closure_later(impl_, &Impl::erase_by_prefix, std::move(key_prefix), std::move(promise)); + } + void get(string key, Promise<string> promise) final { send_closure_later(impl_, &Impl::get, std::move(key), std::move(promise)); } - void close(Promise<> promise) override { + void close(Promise<Unit> promise) final { send_closure_later(impl_, &Impl::close, std::move(promise)); } private: - class Impl : public Actor { + class Impl final : public Actor { public: explicit Impl(std::shared_ptr<SqliteKeyValueSafe> kv_safe) : kv_safe_(std::move(kv_safe)) { } - void set(string key, string value, Promise<> promise) { + + void set(string key, string value, Promise<Unit> promise) { auto it = buffer_.find(key); if (it != buffer_.end()) { it->second = std::move(value); } else { + CHECK(!key.empty()); buffer_.emplace(std::move(key), std::move(value)); } if (promise) { @@ -48,11 +60,19 @@ class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface { cnt_++; do_flush(false /*force*/); } - void erase(string key, Promise<> promise) { + + void set_all(FlatHashMap<string, string> key_values, Promise<Unit> promise) { + do_flush(true /*force*/); + kv_->set_all(key_values); + promise.set_value(Unit()); + } + + void erase(string key, Promise<Unit> promise) { auto it = buffer_.find(key); if (it != buffer_.end()) { it->second = optional<string>(); } else { + CHECK(!key.empty()); buffer_.emplace(std::move(key), optional<string>()); } if (promise) { @@ -62,6 +82,12 @@ class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface { do_flush(false /*force*/); } + void erase_by_prefix(string key_prefix, Promise<Unit> promise) { + do_flush(true /*force*/); + kv_->erase_by_prefix(key_prefix); + promise.set_value(Unit()); + } + void get(const string &key, Promise<string> promise) { auto it = buffer_.find(key); if (it != buffer_.end()) { @@ -69,7 +95,8 @@ class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface { } promise.set_value(kv_->get(key)); } - void close(Promise<> promise) { + + void close(Promise<Unit> promise) { do_flush(true /*force*/); kv_safe_.reset(); kv_ = nullptr; @@ -81,10 +108,10 @@ class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface { std::shared_ptr<SqliteKeyValueSafe> kv_safe_; SqliteKeyValue *kv_ = nullptr; - static constexpr double MAX_PENDING_QUERIES_DELAY = 1; + static constexpr double MAX_PENDING_QUERIES_DELAY = 0.01; static constexpr size_t MAX_PENDING_QUERIES_COUNT = 100; - std::unordered_map<string, optional<string>> buffer_; - std::vector<Promise<>> buffer_promises_; + FlatHashMap<string, optional<string>> buffer_; + vector<Promise<Unit>> buffer_promises_; size_t cnt_ = 0; double wakeup_at_ = 0; @@ -107,7 +134,7 @@ class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface { wakeup_at_ = 0; cnt_ = 0; - kv_->begin_transaction().ensure(); + kv_->begin_write_transaction().ensure(); for (auto &it : buffer_) { if (it.second) { kv_->set(it.first, it.second.value()); @@ -117,25 +144,23 @@ class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface { } kv_->commit_transaction().ensure(); buffer_.clear(); - for (auto &promise : buffer_promises_) { - promise.set_value(Unit()); - } - buffer_promises_.clear(); + set_promises(buffer_promises_); } - void timeout_expired() override { + void timeout_expired() final { do_flush(false /*force*/); } - void start_up() override { + void start_up() final { kv_ = &kv_safe_->get(); } }; ActorOwn<Impl> impl_; }; -std::unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv, - int32 scheduler_id) { - return std::make_unique<SqliteKeyValueAsync>(std::move(kv), scheduler_id); + +unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv, + int32 scheduler_id) { + return td::make_unique<SqliteKeyValueAsync>(std::move(kv), scheduler_id); } } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h index 6015d26fb2..e5bc29b1e8 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -8,7 +8,9 @@ #include "td/db/SqliteKeyValueSafe.h" -#include "td/actor/PromiseFuture.h" +#include "td/utils/common.h" +#include "td/utils/FlatHashMap.h" +#include "td/utils/Promise.h" #include <memory> @@ -18,13 +20,19 @@ class SqliteKeyValueAsyncInterface { public: virtual ~SqliteKeyValueAsyncInterface() = default; - virtual void set(string key, string value, Promise<> promise) = 0; - virtual void erase(string key, Promise<> promise) = 0; + virtual void set(string key, string value, Promise<Unit> promise) = 0; + + virtual void set_all(FlatHashMap<string, string> key_values, Promise<Unit> promise) = 0; + + virtual void erase(string key, Promise<Unit> promise) = 0; + + virtual void erase_by_prefix(string key_prefix, Promise<Unit> promise) = 0; virtual void get(string key, Promise<string> promise) = 0; - virtual void close(Promise<> promise) = 0; + + virtual void close(Promise<Unit> promise) = 0; }; -std::unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv, - int32 scheduler_id = 1); +unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv, + int32 scheduler_id = 1); } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h index d63af3cfb2..b61a96e193 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -9,6 +9,8 @@ #include "td/db/SqliteConnectionSafe.h" #include "td/db/SqliteKeyValue.h" +#include "td/actor/SchedulerLocalStorage.h" + #include <memory> namespace td { diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.cpp index c9ce8c3e8e..a2ae3833e4 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -15,40 +15,42 @@ namespace td { +int VERBOSITY_NAME(sqlite) = VERBOSITY_NAME(DEBUG) + 10; + namespace { -int printExplainQueryPlan(StringBuilder &sb, sqlite3_stmt *pStmt) { - const char *zSql = sqlite3_sql(pStmt); +int printExplainQueryPlan(StringBuilder &sb, tdsqlite3_stmt *pStmt) { + const char *zSql = tdsqlite3_sql(pStmt); if (zSql == nullptr) { return SQLITE_ERROR; } - sb << "Explain " << tag("cmd", zSql); - char *zExplain = sqlite3_mprintf("EXPLAIN QUERY PLAN %s", zSql); + sb << "Explain query " << zSql; + char *zExplain = tdsqlite3_mprintf("EXPLAIN QUERY PLAN %s", zSql); if (zExplain == nullptr) { return SQLITE_NOMEM; } - sqlite3_stmt *pExplain; /* Compiled EXPLAIN QUERY PLAN command */ - int rc = sqlite3_prepare_v2(sqlite3_db_handle(pStmt), zExplain, -1, &pExplain, nullptr); - sqlite3_free(zExplain); + tdsqlite3_stmt *pExplain; /* Compiled EXPLAIN QUERY PLAN command */ + int rc = tdsqlite3_prepare_v2(tdsqlite3_db_handle(pStmt), zExplain, -1, &pExplain, nullptr); + tdsqlite3_free(zExplain); if (rc != SQLITE_OK) { return rc; } - while (SQLITE_ROW == sqlite3_step(pExplain)) { - int iSelectid = sqlite3_column_int(pExplain, 0); - int iOrder = sqlite3_column_int(pExplain, 1); - int iFrom = sqlite3_column_int(pExplain, 2); - const char *zDetail = reinterpret_cast<const char *>(sqlite3_column_text(pExplain, 3)); + while (SQLITE_ROW == tdsqlite3_step(pExplain)) { + int iSelectid = tdsqlite3_column_int(pExplain, 0); + int iOrder = tdsqlite3_column_int(pExplain, 1); + int iFrom = tdsqlite3_column_int(pExplain, 2); + const char *zDetail = reinterpret_cast<const char *>(tdsqlite3_column_text(pExplain, 3)); - sb << "\n" << iSelectid << " " << iOrder << " " << iFrom << " " << zDetail; + sb << '\n' << iSelectid << ' ' << iOrder << ' ' << iFrom << ' ' << zDetail; } - return sqlite3_finalize(pExplain); + return tdsqlite3_finalize(pExplain); } } // namespace -SqliteStatement::SqliteStatement(sqlite3_stmt *stmt, std::shared_ptr<detail::RawSqliteDb> db) +SqliteStatement::SqliteStatement(tdsqlite3_stmt *stmt, std::shared_ptr<detail::RawSqliteDb> db) : stmt_(stmt), db_(std::move(db)) { CHECK(stmt != nullptr); } @@ -70,14 +72,14 @@ Result<string> SqliteStatement::explain() { return sb.as_cslice().str(); } Status SqliteStatement::bind_blob(int id, Slice blob) { - auto rc = sqlite3_bind_blob(stmt_.get(), id, blob.data(), static_cast<int>(blob.size()), nullptr); + auto rc = tdsqlite3_bind_blob(stmt_.get(), id, blob.data(), static_cast<int>(blob.size()), nullptr); if (rc != SQLITE_OK) { return last_error(); } return Status::OK(); } Status SqliteStatement::bind_string(int id, Slice str) { - auto rc = sqlite3_bind_text(stmt_.get(), id, str.data(), static_cast<int>(str.size()), nullptr); + auto rc = tdsqlite3_bind_text(stmt_.get(), id, str.data(), static_cast<int>(str.size()), nullptr); if (rc != SQLITE_OK) { return last_error(); } @@ -85,21 +87,21 @@ Status SqliteStatement::bind_string(int id, Slice str) { } Status SqliteStatement::bind_int32(int id, int32 value) { - auto rc = sqlite3_bind_int(stmt_.get(), id, value); + auto rc = tdsqlite3_bind_int(stmt_.get(), id, value); if (rc != SQLITE_OK) { return last_error(); } return Status::OK(); } Status SqliteStatement::bind_int64(int id, int64 value) { - auto rc = sqlite3_bind_int64(stmt_.get(), id, value); + auto rc = tdsqlite3_bind_int64(stmt_.get(), id, value); if (rc != SQLITE_OK) { return last_error(); } return Status::OK(); } Status SqliteStatement::bind_null(int id) { - auto rc = sqlite3_bind_null(stmt_.get(), id); + auto rc = tdsqlite3_bind_null(stmt_.get(), id); if (rc != SQLITE_OK) { return last_error(); } @@ -125,8 +127,8 @@ StringBuilder &operator<<(StringBuilder &sb, SqliteStatement::Datatype type) { } Slice SqliteStatement::view_blob(int id) { LOG_IF(ERROR, view_datatype(id) != Datatype::Blob) << view_datatype(id); - auto *data = sqlite3_column_blob(stmt_.get(), id); - auto size = sqlite3_column_bytes(stmt_.get(), id); + auto *data = tdsqlite3_column_blob(stmt_.get(), id); + auto size = tdsqlite3_column_bytes(stmt_.get(), id); if (data == nullptr) { return Slice(); } @@ -134,8 +136,8 @@ Slice SqliteStatement::view_blob(int id) { } Slice SqliteStatement::view_string(int id) { LOG_IF(ERROR, view_datatype(id) != Datatype::Text) << view_datatype(id); - auto *data = sqlite3_column_text(stmt_.get(), id); - auto size = sqlite3_column_bytes(stmt_.get(), id); + auto *data = tdsqlite3_column_text(stmt_.get(), id); + auto size = tdsqlite3_column_bytes(stmt_.get(), id); if (data == nullptr) { return Slice(); } @@ -143,14 +145,14 @@ Slice SqliteStatement::view_string(int id) { } int32 SqliteStatement::view_int32(int id) { LOG_IF(ERROR, view_datatype(id) != Datatype::Integer) << view_datatype(id); - return sqlite3_column_int(stmt_.get(), id); + return tdsqlite3_column_int(stmt_.get(), id); } int64 SqliteStatement::view_int64(int id) { LOG_IF(ERROR, view_datatype(id) != Datatype::Integer) << view_datatype(id); - return sqlite3_column_int64(stmt_.get(), id); + return tdsqlite3_column_int64(stmt_.get(), id); } SqliteStatement::Datatype SqliteStatement::view_datatype(int id) { - auto type = sqlite3_column_type(stmt_.get(), id); + auto type = tdsqlite3_column_type(stmt_.get(), id); switch (type) { case SQLITE_INTEGER: return Datatype::Integer; @@ -168,36 +170,36 @@ SqliteStatement::Datatype SqliteStatement::view_datatype(int id) { } void SqliteStatement::reset() { - sqlite3_reset(stmt_.get()); - state_ = Start; + tdsqlite3_reset(stmt_.get()); + state_ = State::Start; } Status SqliteStatement::step() { - if (state_ == Finish) { + if (state_ == State::Finish) { return Status::Error("One has to reset statement"); } - VLOG(sqlite) << "Start step " << tag("cmd", sqlite3_sql(stmt_.get())) << tag("stmt", stmt_.get()) - << tag("db", db_.get()); - auto rc = sqlite3_step(stmt_.get()); - VLOG(sqlite) << "Finish step " << tag("cmd", sqlite3_sql(stmt_.get())) << tag("stmt", stmt_.get()) - << tag("db", db_.get()); + VLOG(sqlite) << "Start step " << tag("query", tdsqlite3_sql(stmt_.get())) << tag("statement", stmt_.get()) + << tag("database", db_.get()); + auto rc = tdsqlite3_step(stmt_.get()); + VLOG(sqlite) << "Finish step with response " << (rc == SQLITE_ROW ? "ROW" : (rc == SQLITE_DONE ? "DONE" : "ERROR")); if (rc == SQLITE_ROW) { - state_ = GotRow; + state_ = State::GotRow; return Status::OK(); } + + state_ = State::Finish; if (rc == SQLITE_DONE) { - state_ = Finish; return Status::OK(); } - state_ = Finish; return last_error(); } -void SqliteStatement::StmtDeleter::operator()(sqlite3_stmt *stmt) { - sqlite3_finalize(stmt); +void SqliteStatement::StmtDeleter::operator()(tdsqlite3_stmt *stmt) { + tdsqlite3_finalize(stmt); } Status SqliteStatement::last_error() { return db_->last_error(); } + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.h index 2e2182ff7e..35148508ce 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.h @@ -1,24 +1,28 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once +#include "td/db/detail/RawSqliteDb.h" + #include "td/utils/common.h" +#include "td/utils/logging.h" +#include "td/utils/ScopeGuard.h" #include "td/utils/Slice.h" #include "td/utils/Status.h" -#include "td/db/detail/RawSqliteDb.h" - #include <memory> -struct sqlite3; -struct sqlite3_stmt; +struct tdsqlite3; +struct tdsqlite3_stmt; namespace td { +extern int VERBOSITY_NAME(sqlite); + class SqliteStatement { public: SqliteStatement() = default; @@ -44,10 +48,10 @@ class SqliteStatement { Result<string> explain(); bool can_step() const { - return state_ != Finish; + return state_ != State::Finish; } bool has_row() const { - return state_ == GotRow; + return state_ == State::GotRow; } bool empty() const { return !stmt_; @@ -56,25 +60,29 @@ class SqliteStatement { void reset(); auto guard() { - return ScopeExit{} + [this] { this->reset(); }; + return ScopeExit{} + [this] { + this->reset(); + }; } // TODO get row private: friend class SqliteDb; - SqliteStatement(sqlite3_stmt *stmt, std::shared_ptr<detail::RawSqliteDb> db); + SqliteStatement(tdsqlite3_stmt *stmt, std::shared_ptr<detail::RawSqliteDb> db); class StmtDeleter { public: - void operator()(sqlite3_stmt *stmt); + void operator()(tdsqlite3_stmt *stmt); }; - enum { Start, GotRow, Finish } state_ = Start; + enum class State { Start, GotRow, Finish }; + State state_ = State::Start; - std::unique_ptr<sqlite3_stmt, StmtDeleter> stmt_; + std::unique_ptr<tdsqlite3_stmt, StmtDeleter> stmt_; std::shared_ptr<detail::RawSqliteDb> db_; Status last_error(); }; + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.cpp new file mode 100644 index 0000000000..c94d7defaa --- /dev/null +++ b/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.cpp @@ -0,0 +1,558 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/TQueue.h" + +#include "td/db/binlog/Binlog.h" +#include "td/db/binlog/BinlogEvent.h" +#include "td/db/binlog/BinlogHelper.h" +#include "td/db/binlog/BinlogInterface.h" + +#include "td/utils/FlatHashMap.h" +#include "td/utils/logging.h" +#include "td/utils/misc.h" +#include "td/utils/Random.h" +#include "td/utils/StorerBase.h" +#include "td/utils/Time.h" +#include "td/utils/tl_helpers.h" +#include "td/utils/tl_parsers.h" +#include "td/utils/tl_storers.h" + +#include <set> + +namespace td { + +using EventId = TQueue::EventId; + +EventId::EventId() { +} + +Result<EventId> EventId::from_int32(int32 id) { + if (!is_valid_id(id)) { + return Status::Error("Invalid ID"); + } + return EventId(id); +} + +bool EventId::is_valid() const { + return !empty() && is_valid_id(id_); +} + +int32 EventId::value() const { + return id_; +} + +Result<EventId> EventId::next() const { + return from_int32(id_ + 1); +} + +Result<EventId> EventId::advance(size_t offset) const { + TRY_RESULT(new_id, narrow_cast_safe<int32>(id_ + offset)); + return from_int32(new_id); +} + +bool EventId::empty() const { + return id_ == 0; +} + +bool EventId::operator==(const EventId &other) const { + return id_ == other.id_; +} + +bool EventId::operator!=(const EventId &other) const { + return !(*this == other); +} + +bool EventId::operator<(const EventId &other) const { + return id_ < other.id_; +} + +StringBuilder &operator<<(StringBuilder &string_builder, EventId id) { + return string_builder << "EventId{" << id.value() << "}"; +} + +EventId::EventId(int32 id) : id_(id) { + CHECK(is_valid_id(id)); +} + +bool EventId::is_valid_id(int32 id) { + return 0 <= id && id < MAX_ID; +} + +class TQueueImpl final : public TQueue { + static constexpr size_t MAX_EVENT_LENGTH = 65536 * 8; + static constexpr size_t MAX_QUEUE_EVENTS = 100000; + static constexpr size_t MAX_TOTAL_EVENT_LENGTH = 1 << 27; + + public: + void set_callback(unique_ptr<StorageCallback> callback) final { + callback_ = std::move(callback); + } + unique_ptr<StorageCallback> extract_callback() final { + return std::move(callback_); + } + + bool do_push(QueueId queue_id, RawEvent &&raw_event) final { + CHECK(raw_event.event_id.is_valid()); + // raw_event.data can be empty when replaying binlog + if (raw_event.data.size() > MAX_EVENT_LENGTH || queue_id == 0) { + return false; + } + auto &q = queues_[queue_id]; + if (q.events.size() >= MAX_QUEUE_EVENTS || q.total_event_length > MAX_TOTAL_EVENT_LENGTH - raw_event.data.size() || + raw_event.expires_at <= 0) { + return false; + } + auto event_id = raw_event.event_id; + if (event_id < q.tail_id) { + return false; + } + + if (!q.events.empty()) { + auto it = q.events.end(); + --it; + if (it->second.data.empty()) { + if (callback_ != nullptr && it->second.log_event_id != 0) { + callback_->pop(it->second.log_event_id); + } + q.events.erase(it); + } + } + if (q.events.empty() && !raw_event.data.empty()) { + schedule_queue_gc(queue_id, q, raw_event.expires_at); + } + + if (raw_event.log_event_id == 0 && callback_ != nullptr) { + raw_event.log_event_id = callback_->push(queue_id, raw_event); + } + q.tail_id = event_id.next().move_as_ok(); + q.total_event_length += raw_event.data.size(); + q.events.emplace(event_id, std::move(raw_event)); + return true; + } + + Result<EventId> push(QueueId queue_id, string data, int32 expires_at, int64 extra, EventId hint_new_id) final { + if (data.empty()) { + return Status::Error("Data is empty"); + } + if (data.size() > MAX_EVENT_LENGTH) { + return Status::Error("Data is too big"); + } + if (queue_id == 0) { + return Status::Error("Queue identifier is invalid"); + } + + auto &q = queues_[queue_id]; + if (q.events.size() >= MAX_QUEUE_EVENTS) { + return Status::Error("Queue is full"); + } + if (q.total_event_length > MAX_TOTAL_EVENT_LENGTH - data.size()) { + return Status::Error("Queue size is too big"); + } + if (expires_at <= 0) { + return Status::Error("Failed to add already expired event"); + } + EventId event_id; + while (true) { + if (q.tail_id.empty()) { + if (hint_new_id.empty()) { + q.tail_id = EventId::from_int32( + Random::fast(2 * max(static_cast<int>(MAX_QUEUE_EVENTS), 1000000) + 1, EventId::MAX_ID / 2)) + .move_as_ok(); + } else { + q.tail_id = hint_new_id; + } + } + event_id = q.tail_id; + CHECK(event_id.is_valid()); + if (event_id.next().is_ok()) { + break; + } + for (auto it = q.events.begin(); it != q.events.end();) { + pop(q, queue_id, it, {}); + } + q.tail_id = EventId(); + CHECK(hint_new_id.next().is_ok()); + } + + RawEvent raw_event; + raw_event.event_id = event_id; + raw_event.data = std::move(data); + raw_event.expires_at = expires_at; + raw_event.extra = extra; + bool is_added = do_push(queue_id, std::move(raw_event)); + CHECK(is_added); + return event_id; + } + + EventId get_head(QueueId queue_id) const final { + auto it = queues_.find(queue_id); + if (it == queues_.end()) { + return EventId(); + } + return get_queue_head(it->second); + } + + EventId get_tail(QueueId queue_id) const final { + auto it = queues_.find(queue_id); + if (it == queues_.end()) { + return EventId(); + } + auto &q = it->second; + return q.tail_id; + } + + void forget(QueueId queue_id, EventId event_id) final { + auto q_it = queues_.find(queue_id); + if (q_it == queues_.end()) { + return; + } + auto &q = q_it->second; + auto it = q.events.find(event_id); + if (it == q.events.end()) { + return; + } + pop(q, queue_id, it, q.tail_id); + } + + void clear(QueueId queue_id, size_t keep_count) final { + auto queue_it = queues_.find(queue_id); + if (queue_it == queues_.end()) { + return; + } + auto &q = queue_it->second; + auto size = get_size(q); + if (size <= keep_count) { + return; + } + + auto start_time = Time::now(); + auto total_event_length = q.total_event_length; + + auto end_it = q.events.end(); + for (size_t i = 0; i < keep_count; i++) { + --end_it; + } + for (auto it = q.events.begin(); it != end_it;) { + pop(q, queue_id, it, q.tail_id); + } + + auto clear_time = Time::now() - start_time; + if (clear_time > 0.1) { + LOG(WARNING) << "Cleared " << (size - keep_count) << " TQueue events with total size " + << (total_event_length - q.total_event_length) << " in " << clear_time << " seconds"; + } + } + + Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now, + MutableSpan<Event> &result_events) final { + auto it = queues_.find(queue_id); + if (it == queues_.end()) { + result_events.truncate(0); + return 0; + } + auto &q = it->second; + // Some sanity checks + if (from_id.value() > q.tail_id.value() + 10) { + return Status::Error("Specified from_id is in the future"); + } + if (from_id.value() < get_queue_head(q).value() - static_cast<int32>(MAX_QUEUE_EVENTS)) { + return Status::Error("Specified from_id is in the past"); + } + + do_get(queue_id, q, from_id, forget_previous, unix_time_now, result_events); + return get_size(q); + } + + std::pair<int64, bool> run_gc(int32 unix_time_now) final { + int64 deleted_events = 0; + auto max_finish_time = Time::now() + 0.05; + int64 counter = 0; + while (!queue_gc_at_.empty()) { + auto it = queue_gc_at_.begin(); + if (it->first >= unix_time_now) { + break; + } + auto queue_id = it->second; + auto &q = queues_[queue_id]; + CHECK(q.gc_at == it->first); + int32 new_gc_at = 0; + + if (!q.events.empty()) { + size_t size_before = get_size(q); + for (auto event_it = q.events.begin(); event_it != q.events.end();) { + auto &event = event_it->second; + if ((++counter & 128) == 0 && Time::now() >= max_finish_time) { + if (new_gc_at == 0) { + new_gc_at = event.expires_at; + } + break; + } + if (event.expires_at < unix_time_now || event.data.empty()) { + pop(q, queue_id, event_it, q.tail_id); + } else { + if (new_gc_at != 0) { + break; + } + new_gc_at = event.expires_at; + ++event_it; + } + } + size_t size_after = get_size(q); + CHECK(size_after <= size_before); + deleted_events += size_before - size_after; + } + schedule_queue_gc(queue_id, q, new_gc_at); + if (Time::now() >= max_finish_time) { + return {deleted_events, false}; + } + } + return {deleted_events, true}; + } + + size_t get_size(QueueId queue_id) const final { + auto it = queues_.find(queue_id); + if (it == queues_.end()) { + return 0; + } + return get_size(it->second); + } + + void close(Promise<> promise) final { + if (callback_ != nullptr) { + callback_->close(std::move(promise)); + callback_ = nullptr; + } + } + + private: + struct Queue { + EventId tail_id; + std::map<EventId, RawEvent> events; + size_t total_event_length = 0; + int32 gc_at = 0; + }; + + FlatHashMap<QueueId, Queue> queues_; + std::set<std::pair<int32, QueueId>> queue_gc_at_; + unique_ptr<StorageCallback> callback_; + + static EventId get_queue_head(const Queue &q) { + if (q.events.empty()) { + return q.tail_id; + } + return q.events.begin()->first; + } + + static size_t get_size(const Queue &q) { + if (q.events.empty()) { + return 0; + } + + return q.events.size() - (q.events.rbegin()->second.data.empty() ? 1 : 0); + } + + void pop(Queue &q, QueueId queue_id, std::map<EventId, RawEvent>::iterator &it, EventId tail_id) { + auto &event = it->second; + if (callback_ == nullptr || event.log_event_id == 0) { + remove_event(q, it); + return; + } + + if (event.event_id.next().ok() == tail_id) { + if (!event.data.empty()) { + clear_event_data(q, event); + callback_->push(queue_id, event); + } + ++it; + } else { + callback_->pop(event.log_event_id); + remove_event(q, it); + } + } + + static void remove_event(Queue &q, std::map<EventId, RawEvent>::iterator &it) { + q.total_event_length -= it->second.data.size(); + it = q.events.erase(it); + } + + static void clear_event_data(Queue &q, RawEvent &event) { + q.total_event_length -= event.data.size(); + event.data = {}; + } + + void do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, int32 unix_time_now, + MutableSpan<Event> &result_events) { + if (forget_previous) { + for (auto it = q.events.begin(); it != q.events.end() && it->first < from_id;) { + pop(q, queue_id, it, q.tail_id); + } + } + + size_t ready_n = 0; + for (auto it = q.events.lower_bound(from_id); it != q.events.end();) { + auto &event = it->second; + if (event.expires_at < unix_time_now || event.data.empty()) { + pop(q, queue_id, it, q.tail_id); + } else { + CHECK(!(event.event_id < from_id)); + if (ready_n == result_events.size()) { + break; + } + + auto &to = result_events[ready_n]; + to.data = event.data; + to.id = event.event_id; + to.expires_at = event.expires_at; + to.extra = event.extra; + ready_n++; + ++it; + } + } + + result_events.truncate(ready_n); + } + + void schedule_queue_gc(QueueId queue_id, Queue &q, int32 gc_at) { + if (q.gc_at != 0) { + bool is_deleted = queue_gc_at_.erase({q.gc_at, queue_id}) > 0; + CHECK(is_deleted); + } + q.gc_at = gc_at; + if (q.gc_at != 0) { + bool is_inserted = queue_gc_at_.emplace(gc_at, queue_id).second; + CHECK(is_inserted); + } + } +}; + +unique_ptr<TQueue> TQueue::create() { + return make_unique<TQueueImpl>(); +} + +struct TQueueLogEvent final : public Storer { + int64 queue_id; + int32 event_id; + int32 expires_at; + Slice data; + int64 extra; + + template <class StorerT> + void store(StorerT &&storer) const { + using td::store; + store(queue_id, storer); + store(event_id, storer); + store(expires_at, storer); + store(data, storer); + if (extra != 0) { + store(extra, storer); + } + } + + template <class ParserT> + void parse(ParserT &&parser, int32 has_extra) { + using td::parse; + parse(queue_id, parser); + parse(event_id, parser); + parse(expires_at, parser); + data = parser.template fetch_string<Slice>(); + if (has_extra == 0) { + extra = 0; + } else { + parse(extra, parser); + } + } + + size_t size() const final { + TlStorerCalcLength storer; + store(storer); + return storer.get_length(); + } + + size_t store(uint8 *ptr) const final { + TlStorerUnsafe storer(ptr); + store(storer); + return static_cast<size_t>(storer.get_buf() - ptr); + } +}; + +template <class BinlogT> +uint64 TQueueBinlog<BinlogT>::push(QueueId queue_id, const RawEvent &event) { + TQueueLogEvent log_event; + log_event.queue_id = queue_id; + log_event.event_id = event.event_id.value(); + log_event.expires_at = event.expires_at; + log_event.data = event.data; + log_event.extra = event.extra; + auto magic = BINLOG_EVENT_TYPE + (log_event.extra != 0); + if (event.log_event_id == 0) { + return binlog_->add(magic, log_event); + } + binlog_->rewrite(event.log_event_id, magic, log_event); + return event.log_event_id; +} + +template <class BinlogT> +void TQueueBinlog<BinlogT>::pop(uint64 log_event_id) { + binlog_->erase(log_event_id); +} + +template <class BinlogT> +Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q) const { + TQueueLogEvent event; + TlParser parser(binlog_event.data_); + int32 has_extra = binlog_event.type_ - BINLOG_EVENT_TYPE; + if (has_extra != 0 && has_extra != 1) { + return Status::Error("Wrong magic"); + } + event.parse(parser, has_extra); + parser.fetch_end(); + TRY_STATUS(parser.get_status()); + TRY_RESULT(event_id, EventId::from_int32(event.event_id)); + RawEvent raw_event; + raw_event.log_event_id = binlog_event.id_; + raw_event.event_id = event_id; + raw_event.expires_at = event.expires_at; + raw_event.data = event.data.str(); + raw_event.extra = event.extra; + if (!q.do_push(event.queue_id, std::move(raw_event))) { + return Status::Error("Failed to add event"); + } + return Status::OK(); +} + +template <class BinlogT> +void TQueueBinlog<BinlogT>::close(Promise<> promise) { + binlog_->close(std::move(promise)); +} + +template class TQueueBinlog<BinlogInterface>; +template class TQueueBinlog<Binlog>; + +uint64 TQueueMemoryStorage::push(QueueId queue_id, const RawEvent &event) { + auto log_event_id = event.log_event_id == 0 ? next_log_event_id_++ : event.log_event_id; + events_[log_event_id] = std::make_pair(queue_id, event); + return log_event_id; +} + +void TQueueMemoryStorage::pop(uint64 log_event_id) { + events_.erase(log_event_id); +} + +void TQueueMemoryStorage::replay(TQueue &q) const { + for (auto &e : events_) { + auto x = e.second; + x.second.log_event_id = e.first; + bool is_added = q.do_push(x.first, std::move(x.second)); + CHECK(is_added); + } +} +void TQueueMemoryStorage::close(Promise<> promise) { + events_.clear(); + promise.set_value({}); +} + +} // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.h b/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.h new file mode 100644 index 0000000000..117726851b --- /dev/null +++ b/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.h @@ -0,0 +1,155 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/common.h" +#include "td/utils/Promise.h" +#include "td/utils/Slice.h" +#include "td/utils/Span.h" +#include "td/utils/Status.h" +#include "td/utils/StringBuilder.h" + +#include <map> +#include <memory> +#include <utility> + +namespace td { + +class TQueue { + public: + class EventId { + public: + static constexpr int32 MAX_ID = 2000000000; + + EventId(); + + static Result<EventId> from_int32(int32 id); + + bool is_valid() const; + + int32 value() const; + + Result<EventId> next() const; + + Result<EventId> advance(size_t offset) const; + + bool empty() const; + + bool operator==(const EventId &other) const; + bool operator!=(const EventId &other) const; + bool operator<(const EventId &other) const; + + private: + int32 id_{0}; + + explicit EventId(int32 id); + + static bool is_valid_id(int32 id); + }; + + struct Event { + EventId id; + int32 expires_at{0}; + Slice data; + int64 extra{0}; + }; + + struct RawEvent { + uint64 log_event_id{0}; + EventId event_id; + string data; + int64 extra{0}; + int32 expires_at{0}; + }; + + using QueueId = int64; + + class StorageCallback { + public: + using QueueId = TQueue::QueueId; + using RawEvent = TQueue::RawEvent; + + StorageCallback() = default; + StorageCallback(const StorageCallback &) = delete; + StorageCallback &operator=(const StorageCallback &) = delete; + StorageCallback(StorageCallback &&) = delete; + StorageCallback &operator=(StorageCallback &&) = delete; + virtual ~StorageCallback() = default; + + virtual uint64 push(QueueId queue_id, const RawEvent &event) = 0; + virtual void pop(uint64 log_event_id) = 0; + virtual void close(Promise<> promise) = 0; + }; + + static unique_ptr<TQueue> create(); + + TQueue() = default; + TQueue(const TQueue &) = delete; + TQueue &operator=(const TQueue &) = delete; + TQueue(TQueue &&) = delete; + TQueue &operator=(TQueue &&) = delete; + + virtual ~TQueue() = default; + + virtual void set_callback(unique_ptr<StorageCallback> callback) = 0; + virtual unique_ptr<StorageCallback> extract_callback() = 0; + + virtual bool do_push(QueueId queue_id, RawEvent &&raw_event) = 0; + + virtual Result<EventId> push(QueueId queue_id, string data, int32 expires_at, int64 extra, EventId hint_new_id) = 0; + + virtual void forget(QueueId queue_id, EventId event_id) = 0; + + virtual void clear(QueueId queue_id, size_t keep_count) = 0; + + virtual EventId get_head(QueueId queue_id) const = 0; + virtual EventId get_tail(QueueId queue_id) const = 0; + + virtual Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now, + MutableSpan<Event> &result_events) = 0; + + virtual size_t get_size(QueueId queue_id) const = 0; + + // returns number of deleted events and whether garbage collection was completed + virtual std::pair<int64, bool> run_gc(int32 unix_time_now) = 0; + virtual void close(Promise<> promise) = 0; +}; + +StringBuilder &operator<<(StringBuilder &string_builder, TQueue::EventId id); + +struct BinlogEvent; + +template <class BinlogT> +class TQueueBinlog final : public TQueue::StorageCallback { + public: + uint64 push(QueueId queue_id, const RawEvent &event) final; + void pop(uint64 log_event_id) final; + Status replay(const BinlogEvent &binlog_event, TQueue &q) const TD_WARN_UNUSED_RESULT; + + void set_binlog(std::shared_ptr<BinlogT> binlog) { + binlog_ = std::move(binlog); + } + void close(Promise<> promise) final; + + private: + std::shared_ptr<BinlogT> binlog_; + static constexpr int32 BINLOG_EVENT_TYPE = 2314; +}; + +class TQueueMemoryStorage final : public TQueue::StorageCallback { + public: + uint64 push(QueueId queue_id, const RawEvent &event) final; + void pop(uint64 log_event_id) final; + void replay(TQueue &q) const; + void close(Promise<> promise) final; + + private: + uint64 next_log_event_id_{1}; + std::map<uint64, std::pair<QueueId, RawEvent>> events_; +}; + +} // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/TsSeqKeyValue.h b/protocols/Telegram/tdlib/td/tddb/td/db/TsSeqKeyValue.h index 8d94d79673..5663bcdfff 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/TsSeqKeyValue.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/TsSeqKeyValue.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -8,6 +8,7 @@ #include "td/db/SeqKeyValue.h" +#include "td/utils/HashTableUtils.h" #include "td/utils/port/RwMutex.h" #include "td/utils/Slice.h" @@ -15,6 +16,7 @@ #include <utility> namespace td { + class TsSeqKeyValue { public: using SeqNo = SeqKeyValue::SeqNo; @@ -32,30 +34,42 @@ class TsSeqKeyValue { auto lock = rw_mutex_.lock_write().move_as_ok(); return kv_.set(key, value); } + std::pair<SeqNo, RwMutex::WriteLock> set_and_lock(Slice key, Slice value) { auto lock = rw_mutex_.lock_write().move_as_ok(); return std::make_pair(kv_.set(key, value), std::move(lock)); } + SeqNo erase(const string &key) { auto lock = rw_mutex_.lock_write().move_as_ok(); return kv_.erase(key); } + std::pair<SeqNo, RwMutex::WriteLock> erase_and_lock(const string &key) { auto lock = rw_mutex_.lock_write().move_as_ok(); return std::make_pair(kv_.erase(key), std::move(lock)); } - string get(const string &key) { + + string get(const string &key) const { auto lock = rw_mutex_.lock_read().move_as_ok(); return kv_.get(key); } + + bool isset(const string &key) const { + auto lock = rw_mutex_.lock_read().move_as_ok(); + return kv_.isset(key); + } + size_t size() const { return kv_.size(); } - std::unordered_map<string, string> get_all() { + + std::unordered_map<string, string, Hash<string>> get_all() const { auto lock = rw_mutex_.lock_write().move_as_ok(); return kv_.get_all(); } - // not thread safe method + + // non-thread-safe method SeqKeyValue &inner() { return kv_; } @@ -65,7 +79,8 @@ class TsSeqKeyValue { } private: - RwMutex rw_mutex_; + mutable RwMutex rw_mutex_; SeqKeyValue kv_; }; + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.cpp index 5d76028dba..5b14eed69d 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -11,14 +11,16 @@ #include "td/utils/buffer.h" #include "td/utils/format.h" -#include "td/utils/logging.h" #include "td/utils/misc.h" #include "td/utils/port/Clocks.h" -#include "td/utils/port/Fd.h" +#include "td/utils/port/FileFd.h" #include "td/utils/port/path.h" +#include "td/utils/port/PollFlags.h" +#include "td/utils/port/sleep.h" #include "td/utils/port/Stat.h" #include "td/utils/Random.h" #include "td/utils/ScopeGuard.h" +#include "td/utils/SliceBuilder.h" #include "td/utils/Status.h" #include "td/utils/Time.h" #include "td/utils/tl_helpers.h" @@ -53,7 +55,7 @@ struct AesCtrEncryptionEvent { BufferSlice iv_; BufferSlice key_hash_; - BufferSlice generate_key(const DbKey &db_key) { + BufferSlice generate_key(const DbKey &db_key) const { CHECK(!db_key.is_empty()); BufferSlice key(key_size()); size_t iteration_count = kdf_iteration_count(); @@ -63,7 +65,8 @@ struct AesCtrEncryptionEvent { pbkdf2_sha256(db_key.data(), key_salt_.as_slice(), narrow_cast<int>(iteration_count), key.as_slice()); return key; } - BufferSlice generate_hash(Slice key) { + + static BufferSlice generate_hash(Slice key) { BufferSlice hash(hash_size()); hmac_sha256(key, "cucumbers everywhere", hash.as_slice()); return hash; @@ -91,18 +94,23 @@ struct AesCtrEncryptionEvent { class BinlogReader { public: - BinlogReader() = default; explicit BinlogReader(ChainBufferReader *input) : input_(input) { } - void set_input(ChainBufferReader *input) { + void set_input(ChainBufferReader *input, bool is_encrypted, int64 expected_size) { input_ = input; + is_encrypted_ = is_encrypted; + expected_size_ = expected_size; + } + + ChainBufferReader *input() { + return input_; } - int64 offset() { + int64 offset() const { return offset_; } Result<size_t> read_next(BinlogEvent *event) { - if (state_ == ReadLength) { + if (state_ == State::ReadLength) { if (input_->size() < 4) { return 4; } @@ -112,35 +120,52 @@ class BinlogReader { it.advance(4, MutableSlice(buf, 4)); size_ = static_cast<size_t>(TlParser(Slice(buf, 4)).fetch_int()); - if (size_ > MAX_EVENT_SIZE) { + if (size_ > BinlogEvent::MAX_SIZE) { return Status::Error(PSLICE() << "Too big event " << tag("size", size_)); } - if (size_ < MIN_EVENT_SIZE) { + if (size_ < BinlogEvent::MIN_SIZE) { return Status::Error(PSLICE() << "Too small event " << tag("size", size_)); } - state_ = ReadEvent; + if (size_ % 4 != 0) { + return Status::Error(-2, PSLICE() << "Event of size " << size_ << " at offset " << offset() << " out of " + << expected_size_ << ' ' << tag("is_encrypted", is_encrypted_) + << format::as_hex_dump<4>(Slice(input_->prepare_read().truncate(28)))); + } + state_ = State::ReadEvent; } if (input_->size() < size_) { return size_; } + event->debug_info_ = BinlogDebugInfo{__FILE__, __LINE__}; TRY_STATUS(event->init(input_->cut_head(size_).move_as_buffer_slice())); offset_ += size_; event->offset_ = offset_; - state_ = ReadLength; + state_ = State::ReadLength; return 0; } private: ChainBufferReader *input_; - enum { ReadLength, ReadEvent } state_ = ReadLength; + enum class State { ReadLength, ReadEvent }; + State state_ = State::ReadLength; size_t size_{0}; int64 offset_{0}; + int64 expected_size_{0}; + bool is_encrypted_{false}; }; + +static int64 file_size(CSlice path) { + auto r_stat = stat(path); + if (r_stat.is_error()) { + return 0; + } + return r_stat.ok().size_; +} } // namespace detail -bool Binlog::IGNORE_ERASE_HACK = false; +int32 VERBOSITY_NAME(binlog) = VERBOSITY_NAME(DEBUG) + 8; Binlog::Binlog() = default; @@ -148,9 +173,9 @@ Binlog::~Binlog() { close().ignore(); } -Result<FileFd> Binlog::open_binlog(CSlice path, int32 flags) { +Result<FileFd> Binlog::open_binlog(const string &path, int32 flags) { TRY_RESULT(fd, FileFd::open(path, flags)); - TRY_STATUS(fd.lock(FileFd::LockFlags::Write, 100)); + TRY_STATUS(fd.lock(FileFd::LockFlags::Write, path, 100)); return std::move(fd); } @@ -161,9 +186,9 @@ Status Binlog::init(string path, const Callback &callback, DbKey db_key, DbKey o db_key_ = std::move(db_key); old_db_key_ = std::move(old_db_key); - processor_ = std::make_unique<detail::BinlogEventsProcessor>(); + processor_ = make_unique<detail::BinlogEventsProcessor>(); // Turn off BinlogEventsBuffer - // events_buffer_ = std::make_unique<detail::BinlogEventsBuffer>(); + // events_buffer_ = make_unique<detail::BinlogEventsBuffer>(); // try to restore binlog from regenerated version if (stat(path).is_error()) { @@ -200,6 +225,10 @@ Status Binlog::init(string path, const Callback &callback, DbKey db_key, DbKey o } void Binlog::add_event(BinlogEvent &&event) { + if (event.size_ % 4 != 0) { + LOG(FATAL) << "Trying to add event with bad size " << event.public_to_string(); + } + if (!events_buffer_) { do_add_event(std::move(event)); } else { @@ -215,7 +244,7 @@ void Binlog::add_event(BinlogEvent &&event) { auto need_reindex = [&](int64 min_size, int rate) { return fd_size > min_size && fd_size / rate > processor_->total_raw_events_size(); }; - if (need_reindex(100000, 5) || need_reindex(500000, 2)) { + if (need_reindex(50000, 5) || need_reindex(100000, 4) || need_reindex(300000, 3) || need_reindex(500000, 2)) { LOG(INFO) << tag("fd_size", format::as_size(fd_size)) << tag("total events size", format::as_size(processor_->total_raw_events_size())); do_reindex(); @@ -254,20 +283,25 @@ Status Binlog::close(bool need_sync) { if (fd_.empty()) { return Status::OK(); } - SCOPE_EXIT { - path_ = ""; - info_.is_opened = false; - fd_.close(); - need_sync_ = false; - }; if (need_sync) { sync(); } else { flush(); } + + fd_.lock(FileFd::LockFlags::Unlock, path_, 1).ensure(); + fd_.close(); + path_.clear(); + info_.is_opened = false; + need_sync_ = false; return Status::OK(); } +void Binlog::close(Promise<> promise) { + TRY_STATUS_PROMISE(promise, close()); + promise.set_value({}); +} + void Binlog::change_key(DbKey new_db_key) { db_key_ = std::move(new_db_key); aes_ctr_key_salt_ = BufferSlice(); @@ -280,18 +314,24 @@ Status Binlog::close_and_destroy() { destroy(path).ignore(); return close_status; } + Status Binlog::destroy(Slice path) { + unlink(PSLICE() << path << ".new").ignore(); // delete regenerated version first to avoid it becoming main version unlink(PSLICE() << path).ignore(); - unlink(PSLICE() << path << ".new").ignore(); return Status::OK(); } void Binlog::do_event(BinlogEvent &&event) { - fd_events_++; - fd_size_ += event.raw_event_.size(); + auto event_size = event.raw_event_.size(); if (state_ == State::Run || state_ == State::Reindex) { - VLOG(binlog) << "Write binlog event: " << format::cond(state_ == State::Reindex, "[reindex] ") << event; + auto validate_status = event.validate(); + if (validate_status.is_error()) { + LOG(FATAL) << "Failed to validate binlog event " << validate_status << " " + << format::as_hex_dump<4>(Slice(event.raw_event_.as_slice().truncate(28))); + } + VLOG(binlog) << "Write binlog event: " << format::cond(state_ == State::Reindex, "[reindex] ") + << event.public_to_string(); switch (encryption_type_) { case EncryptionType::None: { buffer_writer_.append(event.raw_event_.clone()); @@ -311,16 +351,18 @@ void Binlog::do_event(BinlogEvent &&event) { BufferSlice key; if (aes_ctr_key_salt_.as_slice() == encryption_event.key_salt_.as_slice()) { - key = BufferSlice(Slice(aes_ctr_key_.raw, sizeof(aes_ctr_key_.raw))); + key = BufferSlice(as_slice(aes_ctr_key_)); } else if (!db_key_.is_empty()) { key = encryption_event.generate_key(db_key_); } - if (encryption_event.generate_hash(key.as_slice()).as_slice() != encryption_event.key_hash_.as_slice()) { + if (detail::AesCtrEncryptionEvent::generate_hash(key.as_slice()).as_slice() != + encryption_event.key_hash_.as_slice()) { CHECK(state_ == State::Load); if (!old_db_key_.is_empty()) { key = encryption_event.generate_key(old_db_key_); - if (encryption_event.generate_hash(key.as_slice()).as_slice() != encryption_event.key_hash_.as_slice()) { + if (detail::AesCtrEncryptionEvent::generate_hash(key.as_slice()).as_slice() != + encryption_event.key_hash_.as_slice()) { info_.wrong_password = true; } } else { @@ -344,13 +386,31 @@ void Binlog::do_event(BinlogEvent &&event) { update_write_encryption(); //LOG(INFO) << format::cond(state_ == State::Run, "Run", "Reindex") << ": init encryption"; } - return; } } if (state_ != State::Reindex) { - processor_->add_event(std::move(event)); + auto status = processor_->add_event(std::move(event)); + if (status.is_error()) { + auto old_size = detail::file_size(path_); + auto data = debug_get_binlog_data(fd_size_, old_size); + if (state_ == State::Load) { + fd_.seek(fd_size_).ensure(); + fd_.truncate_to_current_position(fd_size_).ensure(); + + if (data.empty()) { + return; + } + } + + LOG(FATAL) << "Truncate binlog \"" << path_ << "\" from size " << old_size << " to size " << fd_size_ + << " in state " << static_cast<int32>(state_) << " due to error: " << status << " after reading " + << data; + } } + + fd_events_++; + fd_size_ += event_size; } void Binlog::sync() { @@ -396,7 +456,9 @@ void Binlog::update_read_encryption() { CHECK(binlog_reader_ptr_); switch (encryption_type_) { case EncryptionType::None: { - binlog_reader_ptr_->set_input(&buffer_reader_); + auto r_file_size = fd_.get_size(); + r_file_size.ensure(); + binlog_reader_ptr_->set_input(&buffer_reader_, false, r_file_size.ok()); byte_flow_flag_ = false; break; } @@ -407,7 +469,9 @@ void Binlog::update_read_encryption() { byte_flow_sink_ = ByteFlowSink(); byte_flow_source_ >> aes_xcode_byte_flow_ >> byte_flow_sink_; byte_flow_flag_ = true; - binlog_reader_ptr_->set_input(byte_flow_sink_.get_output()); + auto r_file_size = fd_.get_size(); + r_file_size.ensure(); + binlog_reader_ptr_->set_input(byte_flow_sink_.get_output(), true, r_file_size.ok()); break; } } @@ -439,67 +503,70 @@ Status Binlog::load_binlog(const Callback &callback, const Callback &debug_callb buffer_writer_ = ChainBufferWriter(); buffer_reader_ = buffer_writer_.extract_reader(); fd_.set_input_writer(&buffer_writer_); - detail::BinlogReader reader; + detail::BinlogReader reader{nullptr}; binlog_reader_ptr_ = &reader; update_read_encryption(); - bool ready_flag = false; - fd_.update_flags(Fd::Flag::Read); + fd_.get_poll_info().add_flags(PollFlags::Read()); info_.wrong_password = false; while (true) { BinlogEvent event; auto r_need_size = reader.read_next(&event); if (r_need_size.is_error()) { + if (r_need_size.error().code() == -2) { + auto old_size = detail::file_size(path_); + auto offset = reader.offset(); + auto data = debug_get_binlog_data(offset, old_size); + fd_.seek(offset).ensure(); + fd_.truncate_to_current_position(offset).ensure(); + if (data.empty()) { + break; + } + LOG(FATAL) << "Truncate binlog \"" << path_ << "\" from size " << old_size << " to size " << offset + << " due to error: " << r_need_size.error() << " after reading " << data; + } LOG(ERROR) << r_need_size.error(); break; } auto need_size = r_need_size.move_as_ok(); - // LOG(ERROR) << "need size = " << need_size; + // LOG(ERROR) << "Need size = " << need_size; if (need_size == 0) { - if (IGNORE_ERASE_HACK && event.type_ == BinlogEvent::ServiceTypes::Empty && - (event.flags_ & BinlogEvent::Flags::Rewrite) != 0) { - // skip erase - } else { - if (debug_callback) { - debug_callback(event); - } - do_add_event(std::move(event)); - if (info_.wrong_password) { - return Status::OK(); - } + if (debug_callback) { + debug_callback(event); } - ready_flag = false; - } else { - // TODO(now): fix bug - if (ready_flag) { - break; + do_add_event(std::move(event)); + if (info_.wrong_password) { + return Status::OK(); } + } else { TRY_STATUS(fd_.flush_read(max(need_size, static_cast<size_t>(4096)))); buffer_reader_.sync_with_writer(); if (byte_flow_flag_) { byte_flow_source_.wakeup(); } - ready_flag = true; + if (reader.input()->size() < need_size) { + break; + } } } auto offset = processor_->offset(); processor_->for_each([&](BinlogEvent &event) { - VLOG(binlog) << "Replay binlog event: " << event; + VLOG(binlog) << "Replay binlog event: " << event.public_to_string(); if (callback) { callback(event); } }); - auto fd_size = fd_.get_size(); + TRY_RESULT(fd_size, fd_.get_size()); if (offset != fd_size) { LOG(ERROR) << "Truncate " << tag("path", path_) << tag("old_size", fd_size) << tag("new_size", offset); fd_.seek(offset).ensure(); fd_.truncate_to_current_position(offset).ensure(); db_key_used_ = false; // force reindex } - CHECK(IGNORE_ERASE_HACK || fd_size_ == offset) << fd_size << " " << fd_size_ << " " << offset; + LOG_CHECK(fd_size_ == offset) << fd_size << " " << fd_size_ << " " << offset; binlog_reader_ptr_ = nullptr; state_ = State::Run; @@ -515,19 +582,11 @@ Status Binlog::load_binlog(const Callback &callback, const Callback &debug_callb return Status::OK(); } -static int64 file_size(CSlice path) { - auto r_stat = stat(path); - if (r_stat.is_error()) { - return 0; - } - return r_stat.ok().size_; -} - void Binlog::update_encryption(Slice key, Slice iv) { - MutableSlice(aes_ctr_key_.raw, sizeof(aes_ctr_key_.raw)).copy_from(key); + as_slice(aes_ctr_key_).copy_from(key); UInt128 aes_ctr_iv; - MutableSlice(aes_ctr_iv.raw, sizeof(aes_ctr_iv.raw)).copy_from(iv); - aes_ctr_state_.init(aes_ctr_key_, aes_ctr_iv); + as_slice(aes_ctr_iv).copy_from(iv); + aes_ctr_state_.init(as_slice(aes_ctr_key_), as_slice(aes_ctr_iv)); } void Binlog::reset_encryption() { @@ -550,15 +609,16 @@ void Binlog::reset_encryption() { BufferSlice key; if (aes_ctr_key_salt_.as_slice() == event.key_salt_.as_slice()) { - key = BufferSlice(Slice(aes_ctr_key_.raw, sizeof(aes_ctr_key_.raw))); + key = BufferSlice(as_slice(aes_ctr_key_)); } else { key = event.generate_key(db_key_); } - event.key_hash_ = event.generate_hash(key.as_slice()); + event.key_hash_ = EncryptionEvent::generate_hash(key.as_slice()); do_event(BinlogEvent( - BinlogEvent::create_raw(0, BinlogEvent::ServiceTypes::AesCtrEncryption, 0, create_default_storer(event)))); + BinlogEvent::create_raw(0, BinlogEvent::ServiceTypes::AesCtrEncryption, 0, create_default_storer(event)), + BinlogDebugInfo{__FILE__, __LINE__})); } void Binlog::do_reindex() { @@ -571,7 +631,7 @@ void Binlog::do_reindex() { }; auto start_time = Clocks::monotonic(); - auto start_size = file_size(path_); + auto start_size = detail::file_size(path_); auto start_events = fd_events_; string new_path = path_ + ".new"; @@ -581,7 +641,7 @@ void Binlog::do_reindex() { LOG(ERROR) << "Can't open new binlog for regenerate: " << r_opened_file.error(); return; } - fd_.close(); + auto old_fd = std::move(fd_); // can't close fd_ now, because it will release file lock fd_ = BufferedFdBase<FileFd>(r_opened_file.move_as_ok()); buffer_writer_ = ChainBufferWriter(); @@ -594,27 +654,49 @@ void Binlog::do_reindex() { fd_events_ = 0; reset_encryption(); processor_->for_each([&](BinlogEvent &event) { + event.realloc(); do_event(std::move(event)); // NB: no move is actually happens }); - need_sync_ = true; // must sync creation of the file + need_sync_ = start_size != 0; // must sync creation of the file if it is non-empty sync(); // finish_reindex auto status = unlink(path_); LOG_IF(FATAL, status.is_error()) << "Failed to unlink old binlog: " << status; + old_fd.close(); // now we can close old file and release the system lock status = rename(new_path, path_); + FileFd::remove_local_lock(new_path); // now we can release local lock for temporary file LOG_IF(FATAL, status.is_error()) << "Failed to rename binlog: " << status; auto finish_time = Clocks::monotonic(); auto finish_size = fd_size_; auto finish_events = fd_events_; - CHECK(fd_size_ == file_size(path_)); + for (int left_tries = 10; left_tries > 0; left_tries--) { + auto r_stat = stat(path_); + if (r_stat.is_error()) { + if (left_tries != 1) { + usleep_for(200000 / left_tries); + continue; + } + LOG(FATAL) << "Failed to rename binlog of size " << fd_size_ << " to " << path_ << ": " << r_stat.error() + << ". Temp file size is " << detail::file_size(new_path) << ", new size " << detail::file_size(path_); + } + LOG_CHECK(fd_size_ == r_stat.ok().size_) << fd_size_ << ' ' << r_stat.ok().size_ << ' ' + << detail::file_size(new_path) << ' ' << fd_events_ << ' ' << path_; + break; + } - // TODO: print warning only if time or ratio is suspicious - double ratio = static_cast<double>(start_size) / static_cast<double>(finish_size + 1); - LOG(INFO) << "regenerate index " << tag("name", path_) << tag("time", format::as_time(finish_time - start_time)) - << tag("before_size", format::as_size(start_size)) << tag("after_size", format::as_size(finish_size)) - << tag("ratio", ratio) << tag("before_events", start_events) << tag("after_events", finish_events); + auto ratio = static_cast<double>(start_size) / static_cast<double>(finish_size + 1); + + [&](Slice msg) { + if (start_size > (10 << 20) || finish_time - start_time > 1) { + LOG(WARNING) << "Slow " << msg; + } else { + LOG(INFO) << msg; + } + }(PSLICE() << "Regenerate index " << tag("name", path_) << tag("time", format::as_time(finish_time - start_time)) + << tag("before_size", format::as_size(start_size)) << tag("after_size", format::as_size(finish_size)) + << tag("ratio", ratio) << tag("before_events", start_events) << tag("after_events", finish_events)); buffer_writer_ = ChainBufferWriter(); buffer_reader_ = buffer_writer_.extract_reader(); @@ -626,4 +708,58 @@ void Binlog::do_reindex() { update_write_encryption(); } +string Binlog::debug_get_binlog_data(int64 begin_offset, int64 end_offset) { + if (begin_offset > end_offset) { + return "Begin offset is bigger than end_offset"; + } + if (begin_offset == end_offset) { + return string(); + } + + static int64 MAX_DATA_LENGTH = 512; + if (end_offset - begin_offset > MAX_DATA_LENGTH) { + end_offset = begin_offset + MAX_DATA_LENGTH; + } + + auto r_fd = FileFd::open(path_, FileFd::Flags::Read); + if (r_fd.is_error()) { + return PSTRING() << "Failed to open binlog: " << r_fd.error(); + } + auto fd = r_fd.move_as_ok(); + + fd_.lock(FileFd::LockFlags::Unlock, path_, 1).ignore(); + SCOPE_EXIT { + fd_.lock(FileFd::LockFlags::Write, path_, 1).ensure(); + }; + auto expected_data_length = narrow_cast<size_t>(end_offset - begin_offset); + string data(expected_data_length, '\0'); + auto r_data_size = fd.pread(data, begin_offset); + if (r_data_size.is_error()) { + return PSTRING() << "Failed to read binlog: " << r_data_size.error(); + } + + if (r_data_size.ok() < expected_data_length) { + data.resize(r_data_size.ok()); + data = PSTRING() << format::as_hex_dump<4>(Slice(data)) << " | with " << expected_data_length - r_data_size.ok() + << " missed bytes"; + } else { + if (encryption_type_ == EncryptionType::AesCtr) { + bool is_zero = true; + for (auto &c : data) { + if (c != '\0') { + is_zero = false; + } + } + // very often we have '\0' bytes written to disk instead of a real log event + // this is clearly impossible content for a real encrypted log event, so just ignore it + if (is_zero) { + return string(); + } + } + + data = PSTRING() << format::as_hex_dump<4>(Slice(data)); + } + return data; +} + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.h index cdda868b4e..88dbacf58f 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -15,16 +15,23 @@ #include "td/utils/ByteFlow.h" #include "td/utils/common.h" #include "td/utils/crypto.h" +#include "td/utils/logging.h" #include "td/utils/port/FileFd.h" +#include "td/utils/Promise.h" #include "td/utils/Slice.h" #include "td/utils/Status.h" +#include "td/utils/StorerBase.h" +#include "td/utils/UInt.h" #include <functional> namespace td { + +extern int32 VERBOSITY_NAME(binlog); + struct BinlogInfo { - bool was_created; - uint64 last_id; + bool was_created{false}; + uint64 last_id{0}; bool is_encrypted{false}; bool wrong_password{false}; bool is_opened{false}; @@ -34,12 +41,11 @@ namespace detail { class BinlogReader; class BinlogEventsProcessor; class BinlogEventsBuffer; -}; // namespace detail +} // namespace detail class Binlog { public: enum Error : int { WrongPassword = -1 }; - static bool IGNORE_ERASE_HACK; Binlog(); Binlog(const Binlog &other) = delete; Binlog &operator=(const Binlog &other) = delete; @@ -67,8 +73,28 @@ class Binlog { return fd_.empty(); } - void add_raw_event(BufferSlice &&raw_event) { - add_event(BinlogEvent(std::move(raw_event))); + uint64 add(int32 type, const Storer &storer) { + auto log_event_id = next_id(); + add_raw_event(BinlogEvent::create_raw(log_event_id, type, 0, storer), {}); + return log_event_id; + } + + uint64 rewrite(uint64 log_event_id, int32 type, const Storer &storer) { + auto seq_no = next_id(); + add_raw_event(BinlogEvent::create_raw(log_event_id, type, BinlogEvent::Flags::Rewrite, storer), {}); + return seq_no; + } + + uint64 erase(uint64 log_event_id) { + auto seq_no = next_id(); + add_raw_event(BinlogEvent::create_raw(log_event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, + EmptyStorer()), + {}); + return seq_no; + } + + void add_raw_event(BufferSlice &&raw_event, BinlogDebugInfo info) { + add_event(BinlogEvent(std::move(raw_event), info)); } void add_event(BinlogEvent &&event); @@ -81,6 +107,7 @@ class Binlog { void change_key(DbKey new_db_key); Status close(bool need_sync = true) TD_WARN_UNUSED_RESULT; + void close(Promise<> promise); Status close_and_destroy() TD_WARN_UNUSED_RESULT; static Status destroy(Slice path) TD_WARN_UNUSED_RESULT; @@ -96,7 +123,7 @@ class Binlog { BufferedFdBase<FileFd> fd_; ChainBufferWriter buffer_writer_; ChainBufferReader buffer_reader_; - detail::BinlogReader *binlog_reader_ptr_; + detail::BinlogReader *binlog_reader_ptr_ = nullptr; BinlogInfo info_; DbKey db_key_; @@ -118,17 +145,15 @@ class Binlog { uint64 fd_events_{0}; string path_; std::vector<BinlogEvent> pending_events_; - std::unique_ptr<detail::BinlogEventsProcessor> processor_; - std::unique_ptr<detail::BinlogEventsBuffer> events_buffer_; + unique_ptr<detail::BinlogEventsProcessor> processor_; + unique_ptr<detail::BinlogEventsBuffer> events_buffer_; bool in_flush_events_buffer_{false}; uint64 last_id_{0}; double need_flush_since_ = 0; bool need_sync_{false}; enum class State { Empty, Load, Reindex, Run } state_{State::Empty}; - static constexpr uint32 MAX_EVENT_SIZE = 65536; - - Result<FileFd> open_binlog(CSlice path, int32 flags); + static Result<FileFd> open_binlog(const string &path, int32 flags); size_t flush_events_buffer(bool force); void do_add_event(BinlogEvent &&event); void do_event(BinlogEvent &&event); @@ -139,5 +164,8 @@ class Binlog { void reset_encryption(); void update_read_encryption(); void update_write_encryption(); + + string debug_get_binlog_data(int64 begin_offset, int64 end_offset); }; + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp index e4584e920e..09f9fc40e3 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp @@ -1,38 +1,78 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "td/db/binlog/BinlogEvent.h" +#include "td/utils/crypto.h" +#include "td/utils/logging.h" +#include "td/utils/misc.h" #include "td/utils/tl_parsers.h" +#include "td/utils/tl_storers.h" namespace td { -int32 VERBOSITY_NAME(binlog) = VERBOSITY_NAME(DEBUG) + 8; Status BinlogEvent::init(BufferSlice &&raw_event, bool check_crc) { TlParser parser(raw_event.as_slice()); size_ = parser.fetch_int(); - CHECK(size_ == raw_event.size()); + LOG_CHECK(size_ == raw_event.size()) << size_ << " " << raw_event.size() << debug_info_; id_ = parser.fetch_long(); type_ = parser.fetch_int(); flags_ = parser.fetch_int(); extra_ = parser.fetch_long(); - CHECK(size_ >= MIN_EVENT_SIZE); - auto slice_data = parser.fetch_string_raw<Slice>(size_ - MIN_EVENT_SIZE); + CHECK(size_ >= MIN_SIZE); + auto slice_data = parser.fetch_string_raw<Slice>(size_ - MIN_SIZE); data_ = MutableSlice(const_cast<char *>(slice_data.begin()), slice_data.size()); crc32_ = static_cast<uint32>(parser.fetch_int()); if (check_crc) { - CHECK(size_ >= EVENT_TAIL_SIZE); - auto calculated_crc = crc32(raw_event.as_slice().truncate(size_ - EVENT_TAIL_SIZE)); + auto calculated_crc = crc32(raw_event.as_slice().substr(0, size_ - TAIL_SIZE)); if (calculated_crc != crc32_) { return Status::Error(PSLICE() << "crc mismatch " << tag("actual", format::as_hex(calculated_crc)) - << tag("expected", format::as_hex(crc32_))); + << tag("expected", format::as_hex(crc32_)) << public_to_string()); } } raw_event_ = std::move(raw_event); return Status::OK(); } +Status BinlogEvent::validate() const { + BinlogEvent event; + if (raw_event_.size() < 4) { + return Status::Error("Too small event"); + } + uint32 size = TlParser(raw_event_.as_slice().substr(0, 4)).fetch_int(); + if (size_ != size) { + return Status::Error(PSLICE() << "Size of event changed: " << tag("was", size_) << tag("now", size)); + } + return event.init(raw_event_.clone(), true); +} + +BufferSlice BinlogEvent::create_raw(uint64 id, int32 type, int32 flags, const Storer &storer) { + auto raw_event = BufferSlice{storer.size() + MIN_SIZE}; + + TlStorerUnsafe tl_storer(raw_event.as_slice().ubegin()); + tl_storer.store_int(narrow_cast<int32>(raw_event.size())); + tl_storer.store_long(id); + tl_storer.store_int(type); + tl_storer.store_int(flags); + tl_storer.store_long(0); + + CHECK(tl_storer.get_buf() == raw_event.as_slice().ubegin() + HEADER_SIZE); + tl_storer.store_storer(storer); + + CHECK(tl_storer.get_buf() == raw_event.as_slice().uend() - TAIL_SIZE); + tl_storer.store_int(crc32(raw_event.as_slice().truncate(raw_event.size() - TAIL_SIZE))); + + return raw_event; +} + +void BinlogEvent::realloc() { + auto data_offset = data_.begin() - raw_event_.as_slice().begin(); + auto data_size = data_.size(); + raw_event_ = raw_event_.copy(); + data_ = raw_event_.as_slice().substr(data_offset, data_size); +} + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.h index 1874543dff..8fab08da98 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -8,17 +8,16 @@ #include "td/utils/buffer.h" #include "td/utils/common.h" -#include "td/utils/crypto.h" #include "td/utils/format.h" -#include "td/utils/logging.h" -#include "td/utils/misc.h" #include "td/utils/Slice.h" +#include "td/utils/SliceBuilder.h" #include "td/utils/Status.h" #include "td/utils/Storer.h" +#include "td/utils/StorerBase.h" #include "td/utils/StringBuilder.h" -#include "td/utils/tl_storers.h" namespace td { + struct EmptyStorerImpl { EmptyStorerImpl() { } @@ -33,15 +32,27 @@ inline auto EmptyStorer() { return create_default_storer(impl); } -static constexpr size_t MAX_EVENT_SIZE = 1 << 24; -static constexpr size_t EVENT_HEADER_SIZE = 4 + 8 + 4 + 4 + 8; -static constexpr size_t EVENT_TAIL_SIZE = 4; -static constexpr size_t MIN_EVENT_SIZE = EVENT_HEADER_SIZE + EVENT_TAIL_SIZE; +struct BinlogDebugInfo { + BinlogDebugInfo() = default; + BinlogDebugInfo(const char *file, int line) : file(file), line(line) { + } + const char *file{""}; + int line{0}; +}; -extern int32 VERBOSITY_NAME(binlog); +inline StringBuilder &operator<<(StringBuilder &sb, const BinlogDebugInfo &info) { + if (info.line == 0) { + return sb; + } + return sb << "[" << info.file << ":" << info.line << "]"; +} -// TODO: smaller BinlogEvent struct BinlogEvent { + static constexpr size_t MAX_SIZE = 1 << 24; + static constexpr size_t HEADER_SIZE = 4 + 8 + 4 + 4 + 8; + static constexpr size_t TAIL_SIZE = 4; + static constexpr size_t MIN_SIZE = HEADER_SIZE + TAIL_SIZE; + int64 offset_; uint32 size_; @@ -54,6 +65,8 @@ struct BinlogEvent { BufferSlice raw_event_; + BinlogDebugInfo debug_info_; + enum ServiceTypes { Header = -1, Empty = -2, AesCtrEncryption = -3, NoEncryption = -4 }; enum Flags { Rewrite = 1, Partial = 2 }; @@ -65,6 +78,7 @@ struct BinlogEvent { } BinlogEvent clone() const { BinlogEvent result; + result.debug_info_ = BinlogDebugInfo{__FILE__, __LINE__}; result.init(raw_event_.clone()).ensure(); return result; } @@ -74,36 +88,32 @@ struct BinlogEvent { } BinlogEvent() = default; - explicit BinlogEvent(BufferSlice &&raw_event) { + //explicit BinlogEvent(BufferSlice &&raw_event) { + //init(std::move(raw_event), false).ensure(); + //} + BinlogEvent(BufferSlice &&raw_event, BinlogDebugInfo info) { + debug_info_ = info; init(std::move(raw_event), false).ensure(); } + Status init(BufferSlice &&raw_event, bool check_crc = true) TD_WARN_UNUSED_RESULT; static BufferSlice create_raw(uint64 id, int32 type, int32 flags, const Storer &storer); + + std::string public_to_string() const { + return PSTRING() << "LogEvent[" << tag("id", format::as_hex(id_)) << tag("type", type_) << tag("flags", flags_) + << tag("data", data_.size()) << "]" << debug_info_; + } + + Status validate() const; + + void realloc(); }; inline StringBuilder &operator<<(StringBuilder &sb, const BinlogEvent &event) { return sb << "LogEvent[" << tag("id", format::as_hex(event.id_)) << tag("type", event.type_) - << tag("flags", event.flags_) << tag("data", format::as_hex_dump<4>(event.data_)) << "]"; + << tag("flags", event.flags_) << tag("data", format::as_hex_dump<4>(event.data_)) << "]" + << event.debug_info_; } -// Implementation -inline BufferSlice BinlogEvent::create_raw(uint64 id, int32 type, int32 flags, const Storer &storer) { - auto raw_event = BufferSlice{storer.size() + MIN_EVENT_SIZE}; - - TlStorerUnsafe tl_storer(raw_event.as_slice().begin()); - tl_storer.store_int(narrow_cast<int32>(raw_event.size())); - tl_storer.store_long(id); - tl_storer.store_int(type); - tl_storer.store_int(flags); - tl_storer.store_long(0); - - CHECK(tl_storer.get_buf() == raw_event.as_slice().begin() + EVENT_HEADER_SIZE); - tl_storer.store_storer(storer); - - CHECK(tl_storer.get_buf() == raw_event.as_slice().end() - EVENT_TAIL_SIZE); - tl_storer.store_int(::td::crc32(raw_event.as_slice().truncate(raw_event.size() - EVENT_TAIL_SIZE))); - - return raw_event; -} } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogHelper.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogHelper.h index 1224ac4ac2..66a65e060d 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogHelper.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogHelper.h @@ -1,45 +1,32 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once -#include "td/actor/PromiseFuture.h" - #include "td/db/binlog/BinlogEvent.h" +#include "td/db/binlog/BinlogInterface.h" #include "td/utils/common.h" -#include "td/utils/Storer.h" +#include "td/utils/Promise.h" +#include "td/utils/StorerBase.h" namespace td { -class BinlogHelper { - public: - template <class BinlogT, class StorerT> - static uint64 add(const BinlogT &binlog_ptr, int32 type, const StorerT &storer, Promise<> promise = Promise<>()) { - auto logevent_id = binlog_ptr->next_id(); - binlog_ptr->add_raw_event(logevent_id, BinlogEvent::create_raw(logevent_id, type, 0, storer), std::move(promise)); - return logevent_id; - } - template <class BinlogT, class StorerT> - static uint64 rewrite(const BinlogT &binlog_ptr, uint64 logevent_id, int32 type, const StorerT &storer, - Promise<> promise = Promise<>()) { - auto seq_no = binlog_ptr->next_id(); - binlog_ptr->add_raw_event(seq_no, BinlogEvent::create_raw(logevent_id, type, BinlogEvent::Flags::Rewrite, storer), - std::move(promise)); - return seq_no; - } +inline uint64 binlog_add(BinlogInterface *binlog_ptr, int32 type, const Storer &storer, + Promise<> promise = Promise<>()) { + return binlog_ptr->add(type, storer, std::move(promise)); +} + +inline uint64 binlog_rewrite(BinlogInterface *binlog_ptr, uint64 log_event_id, int32 type, const Storer &storer, + Promise<> promise = Promise<>()) { + return binlog_ptr->rewrite(log_event_id, type, storer, std::move(promise)); +} + +inline uint64 binlog_erase(BinlogInterface *binlog_ptr, uint64 log_event_id, Promise<> promise = Promise<>()) { + return binlog_ptr->erase(log_event_id, std::move(promise)); +} - template <class BinlogT> - static uint64 erase(const BinlogT &binlog_ptr, uint64 logevent_id, Promise<> promise = Promise<>()) { - auto seq_no = binlog_ptr->next_id(); - binlog_ptr->add_raw_event(seq_no, - BinlogEvent::create_raw(logevent_id, BinlogEvent::ServiceTypes::Empty, - BinlogEvent::Flags::Rewrite, EmptyStorer()), - std::move(promise)); - return seq_no; - } -}; } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogInterface.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogInterface.h index 2360f3c480..b3ec7b119f 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogInterface.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogInterface.h @@ -1,20 +1,21 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once -#include "td/actor/PromiseFuture.h" - #include "td/db/binlog/BinlogEvent.h" #include "td/db/DbKey.h" #include "td/utils/buffer.h" #include "td/utils/common.h" +#include "td/utils/Promise.h" +#include "td/utils/StorerBase.h" namespace td { + class BinlogInterface { public: BinlogInterface() = default; @@ -30,15 +31,41 @@ class BinlogInterface { void close_and_destroy(Promise<> promise = {}) { close_and_destroy_impl(std::move(promise)); } + void add_raw_event(BinlogDebugInfo info, uint64 id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) { + add_raw_event_impl(id, std::move(raw_event), std::move(promise), info); + } void add_raw_event(uint64 id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) { - add_raw_event_impl(id, std::move(raw_event), std::move(promise)); + add_raw_event_impl(id, std::move(raw_event), std::move(promise), {}); } void lazy_sync(Promise<> promise = Promise<>()) { - add_raw_event_impl(next_id(), BufferSlice(), std::move(promise)); + add_raw_event_impl(next_id(), BufferSlice(), std::move(promise), {}); } + + uint64 add(int32 type, const Storer &storer, Promise<> promise = Promise<>()) { + auto log_event_id = next_id(); + add_raw_event_impl(log_event_id, BinlogEvent::create_raw(log_event_id, type, 0, storer), std::move(promise), {}); + return log_event_id; + } + + uint64 rewrite(uint64 log_event_id, int32 type, const Storer &storer, Promise<> promise = Promise<>()) { + auto seq_no = next_id(); + add_raw_event_impl(seq_no, BinlogEvent::create_raw(log_event_id, type, BinlogEvent::Flags::Rewrite, storer), + std::move(promise), {}); + return seq_no; + } + + uint64 erase(uint64 log_event_id, Promise<> promise = Promise<>()) { + auto seq_no = next_id(); + add_raw_event_impl(seq_no, + BinlogEvent::create_raw(log_event_id, BinlogEvent::ServiceTypes::Empty, + BinlogEvent::Flags::Rewrite, EmptyStorer()), + std::move(promise), {}); + return seq_no; + } + virtual void force_sync(Promise<> promise) = 0; virtual void force_flush() = 0; - virtual void change_key(DbKey db_key, Promise<> promise = Promise<>()) = 0; + virtual void change_key(DbKey db_key, Promise<> promise) = 0; virtual uint64 next_id() = 0; virtual uint64 next_id(int32 shift) = 0; @@ -46,6 +73,7 @@ class BinlogInterface { protected: virtual void close_impl(Promise<> promise) = 0; virtual void close_and_destroy_impl(Promise<> promise) = 0; - virtual void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise) = 0; + virtual void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) = 0; }; + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp index aaf07f2967..16a4e2c010 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -8,37 +8,41 @@ #include "td/utils/logging.h" #include "td/utils/OrderedEventsProcessor.h" +#include "td/utils/SliceBuilder.h" #include "td/utils/Time.h" #include <map> namespace td { namespace detail { -class BinlogActor : public Actor { +class BinlogActor final : public Actor { public: - BinlogActor(std::unique_ptr<Binlog> binlog, uint64 seq_no) : binlog_(std::move(binlog)), processor_(seq_no) { + BinlogActor(unique_ptr<Binlog> binlog, uint64 seq_no) : binlog_(std::move(binlog)), processor_(seq_no) { } void close(Promise<> promise) { binlog_->close().ensure(); - promise.set_value(Unit()); - LOG(INFO) << "close: done"; + LOG(INFO) << "Finished to close binlog"; stop(); + + promise.set_value(Unit()); // setting promise can complete closing and destroy the current actor context } void close_and_destroy(Promise<> promise) { binlog_->close_and_destroy().ensure(); - promise.set_value(Unit()); - LOG(INFO) << "close_and_destroy: done"; + LOG(INFO) << "Finished to destroy binlog"; stop(); + + promise.set_value(Unit()); // setting promise can complete closing and destroy the current actor context } struct Event { BufferSlice raw_event; Promise<> sync_promise; + BinlogDebugInfo debug_info; }; - void add_raw_event(uint64 seq_no, BufferSlice &&raw_event, Promise<> &&promise) { - processor_.add(seq_no, Event{std::move(raw_event), std::move(promise)}, [&](uint64 id, Event &&event) { + void add_raw_event(uint64 seq_no, BufferSlice &&raw_event, Promise<> &&promise, BinlogDebugInfo info) { + processor_.add(seq_no, Event{std::move(raw_event), std::move(promise), info}, [&](uint64 id, Event &&event) { if (!event.raw_event.empty()) { - do_add_raw_event(std::move(event.raw_event)); + do_add_raw_event(std::move(event.raw_event), event.debug_info); } do_lazy_sync(std::move(event.sync_promise)); }); @@ -67,7 +71,7 @@ class BinlogActor : public Actor { } private: - std::unique_ptr<Binlog> binlog_; + unique_ptr<Binlog> binlog_; OrderedEventsProcessor<Event> processor_; @@ -78,7 +82,7 @@ class BinlogActor : public Actor { bool flush_flag_ = false; double wakeup_at_ = 0; - static constexpr int32 FLUSH_TIMEOUT = 1; // 1s + static constexpr double FLUSH_TIMEOUT = 0.001; // 1ms void wakeup_after(double after) { auto now = Time::now_cached(); @@ -92,8 +96,8 @@ class BinlogActor : public Actor { } } - void do_add_raw_event(BufferSlice &&raw_event) { - binlog_->add_raw_event(std::move(raw_event)); + void do_add_raw_event(BufferSlice &&raw_event, BinlogDebugInfo info) { + binlog_->add_raw_event(std::move(raw_event), info); } void try_flush() { @@ -138,7 +142,7 @@ class BinlogActor : public Actor { } } - void timeout_expired() override { + void timeout_expired() final { bool need_sync = lazy_sync_flag_ || force_sync_flag_; lazy_sync_flag_ = false; force_sync_flag_ = false; @@ -148,10 +152,7 @@ class BinlogActor : public Actor { if (need_sync) { binlog_->sync(); // LOG(ERROR) << "BINLOG SYNC"; - for (auto &promise : sync_promises_) { - promise.set_value(Unit()); - } - sync_promises_.clear(); + set_promises(sync_promises_); } else if (need_flush) { try_flush(); // LOG(ERROR) << "BINLOG FLUSH"; @@ -162,24 +163,24 @@ class BinlogActor : public Actor { ConcurrentBinlog::ConcurrentBinlog() = default; ConcurrentBinlog::~ConcurrentBinlog() = default; -ConcurrentBinlog::ConcurrentBinlog(std::unique_ptr<Binlog> binlog, int scheduler_id) { +ConcurrentBinlog::ConcurrentBinlog(unique_ptr<Binlog> binlog, int scheduler_id) { init_impl(std::move(binlog), scheduler_id); } Result<BinlogInfo> ConcurrentBinlog::init(string path, const Callback &callback, DbKey db_key, DbKey old_db_key, int scheduler_id) { - auto binlog = std::make_unique<Binlog>(); + auto binlog = make_unique<Binlog>(); TRY_STATUS(binlog->init(std::move(path), callback, std::move(db_key), std::move(old_db_key))); auto info = binlog->get_info(); init_impl(std::move(binlog), scheduler_id); return info; } -void ConcurrentBinlog::init_impl(std::unique_ptr<Binlog> binlog, int32 scheduler_id) { +void ConcurrentBinlog::init_impl(unique_ptr<Binlog> binlog, int32 scheduler_id) { path_ = binlog->get_path().str(); last_id_ = binlog->peek_next_id(); - binlog_actor_ = - create_actor_on_scheduler<detail::BinlogActor>("Binlog " + path_, scheduler_id, std::move(binlog), last_id_); + binlog_actor_ = create_actor_on_scheduler<detail::BinlogActor>(PSLICE() << "Binlog " << path_, scheduler_id, + std::move(binlog), last_id_); } void ConcurrentBinlog::close_impl(Promise<> promise) { @@ -188,8 +189,8 @@ void ConcurrentBinlog::close_impl(Promise<> promise) { void ConcurrentBinlog::close_and_destroy_impl(Promise<> promise) { send_closure(std::move(binlog_actor_), &detail::BinlogActor::close_and_destroy, std::move(promise)); } -void ConcurrentBinlog::add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise) { - send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, id, std::move(raw_event), std::move(promise)); +void ConcurrentBinlog::add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) { + send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, id, std::move(raw_event), std::move(promise), info); } void ConcurrentBinlog::force_sync(Promise<> promise) { send_closure(binlog_actor_, &detail::BinlogActor::force_sync, std::move(promise)); diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h index ce77a78a84..ad6c9bc5c3 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h @@ -1,19 +1,20 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once -#include "td/actor/actor.h" -#include "td/actor/PromiseFuture.h" - #include "td/db/binlog/Binlog.h" #include "td/db/binlog/BinlogInterface.h" +#include "td/db/DbKey.h" + +#include "td/actor/actor.h" #include "td/utils/buffer.h" #include "td/utils/common.h" +#include "td/utils/Promise.h" #include "td/utils/Slice.h" #include "td/utils/Status.h" @@ -26,28 +27,28 @@ namespace detail { class BinlogActor; } // namespace detail -class ConcurrentBinlog : public BinlogInterface { +class ConcurrentBinlog final : public BinlogInterface { public: using Callback = std::function<void(const BinlogEvent &)>; Result<BinlogInfo> init(string path, const Callback &callback, DbKey db_key = DbKey::empty(), DbKey old_db_key = DbKey::empty(), int scheduler_id = -1) TD_WARN_UNUSED_RESULT; ConcurrentBinlog(); - explicit ConcurrentBinlog(std::unique_ptr<Binlog> binlog, int scheduler_id = -1); + explicit ConcurrentBinlog(unique_ptr<Binlog> binlog, int scheduler_id = -1); ConcurrentBinlog(const ConcurrentBinlog &other) = delete; ConcurrentBinlog &operator=(const ConcurrentBinlog &other) = delete; ConcurrentBinlog(ConcurrentBinlog &&other) = delete; ConcurrentBinlog &operator=(ConcurrentBinlog &&other) = delete; - ~ConcurrentBinlog() override; + ~ConcurrentBinlog() final; - void force_sync(Promise<> promise) override; - void force_flush() override; - void change_key(DbKey db_key, Promise<> promise) override; + void force_sync(Promise<> promise) final; + void force_flush() final; + void change_key(DbKey db_key, Promise<> promise) final; - uint64 next_id() override { + uint64 next_id() final { return last_id_.fetch_add(1, std::memory_order_relaxed); } - uint64 next_id(int32 shift) override { + uint64 next_id(int32 shift) final { return last_id_.fetch_add(shift, std::memory_order_relaxed); } @@ -56,13 +57,14 @@ class ConcurrentBinlog : public BinlogInterface { } private: - void init_impl(std::unique_ptr<Binlog> binlog, int scheduler_id); - void close_impl(Promise<> promise) override; - void close_and_destroy_impl(Promise<> promise) override; - void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise) override; + void init_impl(unique_ptr<Binlog> binlog, int scheduler_id); + void close_impl(Promise<> promise) final; + void close_and_destroy_impl(Promise<> promise) final; + void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) final; ActorOwn<detail::BinlogActor> binlog_actor_; string path_; - std::atomic<uint64> last_id_; + std::atomic<uint64> last_id_{0}; }; + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp index a8b8bf9e1b..f3984062fc 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp @@ -1,52 +1,156 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "td/db/binlog/Binlog.h" +#include "td/db/DbKey.h" + #include "td/utils/common.h" #include "td/utils/format.h" #include "td/utils/logging.h" +#include "td/utils/misc.h" +#include "td/utils/port/Stat.h" +#include "td/utils/Slice.h" +#include "td/utils/SliceBuilder.h" +#include "td/utils/StringBuilder.h" +#include "td/utils/tl_parsers.h" -#include <cstdio> #include <map> +struct Trie { + Trie() { + nodes_.resize(1); + } + + void add(td::Slice value) { + do_add(0, PSLICE() << value << '\0'); + } + + void dump() { + if (nodes_[0].sum == 0) { // division by zero + return; + } + LOG(PLAIN) << "TOTAL: " << nodes_[0].sum; + do_dump("", 0); + } + + private: + struct FullNode { + int next[256] = {}; + int sum = 0; + }; + td::vector<FullNode> nodes_; + + void do_add(int id, td::Slice value) { + nodes_[id].sum++; + if (value.empty()) { + return; + } + + auto c = static_cast<td::uint8>(value[0]); + auto next_id = nodes_[id].next[c]; + if (next_id == 0) { + next_id = static_cast<int>(nodes_.size()); + nodes_.emplace_back(); + nodes_[id].next[c] = next_id; + } + do_add(next_id, value.substr(1)); + } + + void do_dump(td::string path, int v) { + bool is_word_end = !path.empty() && path.back() == '\0'; + + bool need_stop = false; + int next_count = 0; + for (int c = 0; c < 256; c++) { + if (nodes_[v].next[c] != 0) { + need_stop |= c >= 128 || !(td::is_alpha(static_cast<char>(c)) || c == '.' || c == '_'); + next_count++; + } + } + need_stop |= next_count == 0 || (next_count >= 2 && nodes_[v].sum <= nodes_[0].sum / 100); + + if (is_word_end || need_stop) { + if (is_word_end) { + path.pop_back(); + } else if (next_count != 1 || nodes_[v].next[0] == 0) { + path.push_back('*'); + } + LOG(PLAIN) << nodes_[v].sum << " " << td::StringBuilder::FixedDouble(nodes_[v].sum * 100.0 / nodes_[0].sum, 2) + << "% [" << td::format::escaped(path) << "]"; + return; + } + for (int c = 0; c < 256; c++) { + auto next_id = nodes_[v].next[c]; + if (next_id == 0) { + continue; + } + do_dump(path + static_cast<char>(c), next_id); + } + } +}; + +enum Magic { ConfigPmcMagic = 0x1f18, BinlogPmcMagic = 0x4327 }; + int main(int argc, char *argv[]) { if (argc < 2) { - std::fprintf(stderr, "Usage: binlog_dump <binlog_file_name>\n"); + LOG(PLAIN) << "Usage: binlog_dump <binlog_file_name>"; + return 1; + } + td::string binlog_file_name = argv[1]; + auto r_stat = td::stat(binlog_file_name); + if (r_stat.is_error() || r_stat.ok().size_ == 0 || !r_stat.ok().is_reg_) { + LOG(PLAIN) << "Wrong binlog file name specified"; + LOG(PLAIN) << "Usage: binlog_dump <binlog_file_name>"; return 1; } struct Info { std::size_t full_size = 0; std::size_t compressed_size = 0; + Trie trie; + Trie compressed_trie; }; std::map<td::uint64, Info> info; SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); td::Binlog binlog; binlog - .init(argv[1], - [&](auto &event) { - info[0].compressed_size += event.raw_event_.size(); - info[event.type_].compressed_size += event.raw_event_.size(); - }, - td::DbKey::empty(), td::DbKey::empty(), -1, - [&](auto &event) mutable { - info[0].full_size += event.raw_event_.size(); - info[event.type_].full_size += event.raw_event_.size(); - LOG(PLAIN) << "LogEvent[" << td::tag("id", td::format::as_hex(event.id_)) << td::tag("type", event.type_) - << td::tag("flags", event.flags_) << td::tag("data", td::format::escaped(event.data_)) - << "]\n"; - }) + .init( + binlog_file_name, + [&](auto &event) { + info[0].compressed_size += event.raw_event_.size(); + info[event.type_].compressed_size += event.raw_event_.size(); + if (event.type_ == ConfigPmcMagic || event.type_ == BinlogPmcMagic) { + auto key = td::TlParser(event.data_).fetch_string<td::Slice>(); + info[event.type_].compressed_trie.add(key); + } + }, + td::DbKey::raw_key("cucumber"), td::DbKey::empty(), -1, + [&](auto &event) mutable { + info[0].full_size += event.raw_event_.size(); + info[event.type_].full_size += event.raw_event_.size(); + if (event.type_ == ConfigPmcMagic || event.type_ == BinlogPmcMagic) { + auto key = td::TlParser(event.data_).fetch_string<td::Slice>(); + info[event.type_].trie.add(key); + } + LOG(PLAIN) << "LogEvent[" << td::tag("id", td::format::as_hex(event.id_)) << td::tag("type", event.type_) + << td::tag("flags", event.flags_) << td::tag("size", event.data_.size()) + << td::tag("data", td::format::escaped(event.data_)) << "]\n"; + }) .ensure(); for (auto &it : info) { - LOG(ERROR) << td::tag("handler", td::format::as_hex(it.first)) + LOG(PLAIN) << td::tag("handler", td::format::as_hex(it.first)) << td::tag("full_size", td::format::as_size(it.second.full_size)) << td::tag("compressed_size", td::format::as_size(it.second.compressed_size)); + it.second.trie.dump(); + if (it.second.full_size != it.second.compressed_size) { + it.second.compressed_trie.dump(); + } } return 0; diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp index b7ddc98ff1..cb4e4817c5 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -10,6 +10,7 @@ namespace td { namespace detail { + void BinlogEventsBuffer::add_event(BinlogEvent &&event) { total_events_++; if ((event.flags_ & BinlogEvent::Flags::Partial) == 0) { @@ -26,14 +27,17 @@ void BinlogEventsBuffer::add_event(BinlogEvent &&event) { size_ += event.size_; events_.push_back(std::move(event)); } + bool BinlogEventsBuffer::need_flush() const { return total_events_ > 5000 || ids_.size() > 100; } + void BinlogEventsBuffer::clear() { ids_.clear(); events_.clear(); total_events_ = 0; size_ = 0; } + } // namespace detail } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h index dcd6d7c1b3..5b7cab8212 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h @@ -1,16 +1,18 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once + #include "td/db/binlog/BinlogEvent.h" #include "td/utils/common.h" namespace td { namespace detail { + class BinlogEventsBuffer { public: void add_event(BinlogEvent &&event); @@ -23,7 +25,8 @@ class BinlogEventsBuffer { auto &event = events_[i]; if (i + 1 != ids_.size() && (event.flags_ & BinlogEvent::Flags::Partial) == 0) { callback(BinlogEvent(BinlogEvent::create_raw(event.id_, event.type_, event.flags_ | BinlogEvent::Flags::Partial, - create_storer(event.data_)))); + create_storer(event.data_)), + BinlogDebugInfo{__FILE__, __LINE__})); } else { callback(std::move(event)); } @@ -43,5 +46,6 @@ class BinlogEventsBuffer { void do_event(BinlogEvent &&event); void clear(); }; + } // namespace detail } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp index 50ad91bff8..7b2832b39a 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp @@ -1,25 +1,26 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "td/db/binlog/detail/BinlogEventsProcessor.h" -#include "td/utils/logging.h" +#include "td/utils/SliceBuilder.h" +#include "td/utils/Status.h" #include <algorithm> namespace td { namespace detail { -void BinlogEventsProcessor::do_event(BinlogEvent &&event) { + +Status BinlogEventsProcessor::do_event(BinlogEvent &&event) { offset_ = event.offset_; auto fixed_id = event.id_ * 2; if ((event.flags_ & BinlogEvent::Flags::Rewrite) && !ids_.empty() && ids_.back() >= fixed_id) { auto it = std::lower_bound(ids_.begin(), ids_.end(), fixed_id); if (it == ids_.end() || *it != fixed_id) { - LOG(FATAL) << "Ignore rewrite logevent"; - return; + return Status::Error(PSLICE() << "Ignore rewrite log event " << event.public_to_string()); } auto pos = it - ids_.begin(); total_raw_events_size_ -= static_cast<int64>(events_[pos].raw_event_.size()); @@ -32,10 +33,14 @@ void BinlogEventsProcessor::do_event(BinlogEvent &&event) { total_raw_events_size_ += static_cast<int64>(event.raw_event_.size()); events_[pos] = std::move(event); } - } else if (event.type_ == BinlogEvent::ServiceTypes::Empty) { - // just skip this event + } else if (event.type_ < 0) { + // just skip service events } else { - CHECK(ids_.empty() || ids_.back() < fixed_id); + if (!(ids_.empty() || ids_.back() < fixed_id)) { + return Status::Error(PSLICE() << offset_ << " " << ids_.size() << " " << ids_.back() << " " << fixed_id << " " + << event.public_to_string() << " " << total_events_ << " " + << total_raw_events_size_); + } last_id_ = event.id_; total_raw_events_size_ += static_cast<int64>(event.raw_event_.size()); total_events_++; @@ -46,6 +51,7 @@ void BinlogEventsProcessor::do_event(BinlogEvent &&event) { if (total_events_ > 10 && empty_events_ * 4 > total_events_ * 3) { compactify(); } + return Status::OK(); } void BinlogEventsProcessor::compactify() { @@ -66,5 +72,6 @@ void BinlogEventsProcessor::compactify() { empty_events_ = 0; CHECK(ids_.size() == events_.size()); } + } // namespace detail } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h index 645f8c50ab..8b276aef03 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h @@ -1,25 +1,31 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once + #include "td/db/binlog/BinlogEvent.h" #include "td/utils/common.h" +#include "td/utils/logging.h" +#include "td/utils/Status.h" namespace td { namespace detail { + class BinlogEventsProcessor { public: - void add_event(BinlogEvent &&event) { - do_event(std::move(event)); + Status add_event(BinlogEvent &&event) TD_WARN_UNUSED_RESULT { + return do_event(std::move(event)); } template <class CallbackT> void for_each(CallbackT &&callback) { for (size_t i = 0; i < ids_.size(); i++) { + LOG_CHECK(i == 0 || ids_[i - 1] < ids_[i]) << ids_[i - 1] << " " << events_[i - 1].public_to_string() << " " + << ids_[i] << " " << events_[i].public_to_string(); if ((ids_[i] & 1) == 0) { callback(events_[i]); } @@ -46,8 +52,9 @@ class BinlogEventsProcessor { int64 offset_{0}; int64 total_raw_events_size_{0}; - void do_event(BinlogEvent &&event); + Status do_event(BinlogEvent &&event); void compactify(); }; + } // namespace detail } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp index 6ee1b45af2..83147562e8 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -10,29 +10,50 @@ #include "td/utils/common.h" #include "td/utils/logging.h" +#include "td/utils/misc.h" #include "td/utils/port/path.h" +#include "td/utils/port/Stat.h" + +#include <atomic> namespace td { namespace detail { -Status RawSqliteDb::last_error(sqlite3 *db) { - return Status::Error(Slice(sqlite3_errmsg(db))); + +static std::atomic<bool> was_database_destroyed{false}; + +Status RawSqliteDb::last_error(tdsqlite3 *db, CSlice path) { + return Status::Error(PSLICE() << Slice(tdsqlite3_errmsg(db)) << " for database \"" << path << '"'); } + Status RawSqliteDb::destroy(Slice path) { - with_db_path(path, [](auto path) { unlink(path).ignore(); }); - return Status::OK(); + Status error; + with_db_path(path, [&](auto path) { + unlink(path).ignore(); + if (!ends_with(path, "-shm") && !stat(path).is_error()) { + error = Status::Error(PSLICE() << "Failed to delete file \"" << path << '"'); + } + }); + return error; } + Status RawSqliteDb::last_error() { //If database was corrupted, try to delete it. - auto code = sqlite3_errcode(db_); + auto code = tdsqlite3_errcode(db_); if (code == SQLITE_CORRUPT) { + was_database_destroyed.store(true, std::memory_order_relaxed); destroy(path_).ignore(); } - return last_error(db_); + return last_error(db_, path()); +} + +bool RawSqliteDb::was_any_database_destroyed() { + return was_database_destroyed.load(std::memory_order_relaxed); } + RawSqliteDb::~RawSqliteDb() { - auto rc = sqlite3_close(db_); - LOG_IF(FATAL, rc != SQLITE_OK) << last_error(db_); + auto rc = tdsqlite3_close(db_); + LOG_IF(FATAL, rc != SQLITE_OK) << last_error(db_, path()); } } // namespace detail diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.h b/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.h index 801f3b192a..c2227d1fa3 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.h @@ -1,22 +1,24 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once -#include "td/utils/logging.h" +#include "td/utils/optional.h" #include "td/utils/Slice.h" +#include "td/utils/SliceBuilder.h" #include "td/utils/Status.h" -struct sqlite3; +struct tdsqlite3; namespace td { namespace detail { + class RawSqliteDb { public: - RawSqliteDb(sqlite3 *db, std::string path) : db_(db), path_(std::move(path)) { + RawSqliteDb(tdsqlite3 *db, std::string path) : db_(db), path_(std::move(path)) { } RawSqliteDb(const RawSqliteDb &) = delete; RawSqliteDb(RawSqliteDb &&) = delete; @@ -28,12 +30,12 @@ class RawSqliteDb { static void with_db_path(Slice main_path, F &&f) { f(PSLICE() << main_path); f(PSLICE() << main_path << "-journal"); - f(PSLICE() << main_path << "-shm"); f(PSLICE() << main_path << "-wal"); + f(PSLICE() << main_path << "-shm"); } static Status destroy(Slice path) TD_WARN_UNUSED_RESULT; - sqlite3 *db() { + tdsqlite3 *db() { return db_; } CSlice path() const { @@ -41,11 +43,36 @@ class RawSqliteDb { } Status last_error(); - static Status last_error(sqlite3 *db); + static Status last_error(tdsqlite3 *db, CSlice path); + + static bool was_any_database_destroyed(); + + bool on_begin() { + begin_cnt_++; + return begin_cnt_ == 1; + } + Result<bool> on_commit() { + if (begin_cnt_ == 0) { + return Status::Error("No matching begin for commit"); + } + begin_cnt_--; + return begin_cnt_ == 0; + } + + void set_cipher_version(int32 cipher_version) { + cipher_version_ = cipher_version; + } + + optional<int32> get_cipher_version() const { + return cipher_version_.copy(); + } private: - sqlite3 *db_; + tdsqlite3 *db_; std::string path_; + size_t begin_cnt_{0}; + optional<int32> cipher_version_; }; -}; // namespace detail + +} // namespace detail } // namespace td |