summaryrefslogtreecommitdiff
path: root/protocols/Telegram/tdlib/td/tddb
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/Telegram/tdlib/td/tddb')
-rw-r--r--protocols/Telegram/tdlib/td/tddb/CMakeLists.txt23
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/BinlogKeyValue.h110
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/DbKey.h28
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/KeyValueSyncInterface.h17
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/Pmc.h27
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/SeqKeyValue.h26
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.cpp51
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.h48
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.cpp215
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.h47
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.cpp124
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.h184
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp75
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h22
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h4
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.cpp84
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.h32
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/TQueue.cpp558
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/TQueue.h155
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/TsSeqKeyValue.h25
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.cpp304
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.h54
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp56
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.h74
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogHelper.h47
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogInterface.h42
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp53
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h36
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp138
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp6
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h8
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp23
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h15
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp39
-rw-r--r--protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.h45
35 files changed, 2080 insertions, 715 deletions
diff --git a/protocols/Telegram/tdlib/td/tddb/CMakeLists.txt b/protocols/Telegram/tdlib/td/tddb/CMakeLists.txt
index 531dcc5c02..036f0ca61c 100644
--- a/protocols/Telegram/tdlib/td/tddb/CMakeLists.txt
+++ b/protocols/Telegram/tdlib/td/tddb/CMakeLists.txt
@@ -1,4 +1,10 @@
-cmake_minimum_required(VERSION 3.0.2 FATAL_ERROR)
+if ((CMAKE_MAJOR_VERSION LESS 3) OR (CMAKE_VERSION VERSION_LESS "3.0.2"))
+ message(FATAL_ERROR "CMake >= 3.0.2 is required")
+endif()
+
+if (NOT DEFINED CMAKE_INSTALL_LIBDIR)
+ set(CMAKE_INSTALL_LIBDIR "lib")
+endif()
#SOURCE SETS
set(TDDB_SOURCE
@@ -8,16 +14,19 @@ set(TDDB_SOURCE
td/db/binlog/detail/BinlogEventsBuffer.cpp
td/db/binlog/detail/BinlogEventsProcessor.cpp
+ td/db/SqliteConnectionSafe.cpp
td/db/SqliteDb.cpp
- td/db/SqliteStatement.cpp
+ td/db/SqliteKeyValue.cpp
td/db/SqliteKeyValueAsync.cpp
+ td/db/SqliteStatement.cpp
+ td/db/TQueue.cpp
td/db/detail/RawSqliteDb.cpp
td/db/binlog/Binlog.h
- td/db/binlog/BinlogInterface.h
td/db/binlog/BinlogEvent.h
td/db/binlog/BinlogHelper.h
+ td/db/binlog/BinlogInterface.h
td/db/binlog/ConcurrentBinlog.h
td/db/binlog/detail/BinlogEventsBuffer.h
td/db/binlog/detail/BinlogEventsProcessor.h
@@ -25,7 +34,6 @@ set(TDDB_SOURCE
td/db/BinlogKeyValue.h
td/db/DbKey.h
td/db/KeyValueSyncInterface.h
- td/db/Pmc.h
td/db/SeqKeyValue.h
td/db/SqliteConnectionSafe.h
td/db/SqliteDb.h
@@ -33,6 +41,7 @@ set(TDDB_SOURCE
td/db/SqliteKeyValueAsync.h
td/db/SqliteKeyValueSafe.h
td/db/SqliteStatement.h
+ td/db/TQueue.h
td/db/TsSeqKeyValue.h
td/db/detail/RawSqliteDb.h
@@ -48,8 +57,6 @@ if (NOT CMAKE_CROSSCOMPILING)
endif()
install(TARGETS tddb EXPORT TdTargets
- LIBRARY DESTINATION lib
- ARCHIVE DESTINATION lib
- RUNTIME DESTINATION bin
- INCLUDES DESTINATION include
+ LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}"
+ ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}"
)
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/BinlogKeyValue.h b/protocols/Telegram/tdlib/td/tddb/td/db/BinlogKeyValue.h
index 04df413d53..9ed4d92240 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/BinlogKeyValue.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/BinlogKeyValue.h
@@ -1,40 +1,42 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
-#include "td/actor/PromiseFuture.h"
-
#include "td/db/binlog/Binlog.h"
#include "td/db/binlog/BinlogEvent.h"
-#include "td/db/binlog/ConcurrentBinlog.h"
+#include "td/db/DbKey.h"
#include "td/db/KeyValueSyncInterface.h"
+#include "td/utils/algorithm.h"
#include "td/utils/buffer.h"
#include "td/utils/common.h"
+#include "td/utils/HashTableUtils.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/port/RwMutex.h"
+#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
+#include "td/utils/StorerBase.h"
#include "td/utils/tl_parsers.h"
#include "td/utils/tl_storers.h"
-#include <map>
#include <memory>
#include <unordered_map>
#include <utility>
namespace td {
+
template <class BinlogT>
-class BinlogKeyValue : public KeyValueSyncInterface {
+class BinlogKeyValue final : public KeyValueSyncInterface {
public:
- static constexpr int32 magic = 0x2a280000;
+ static constexpr int32 MAGIC = 0x2a280000;
- struct Event : public Storer {
+ struct Event final : public Storer {
Event() = default;
Event(Slice key, Slice value) : key(key), value(value) {
}
@@ -52,16 +54,15 @@ class BinlogKeyValue : public KeyValueSyncInterface {
value = parser.template fetch_string<Slice>();
}
- size_t size() const override {
+ size_t size() const final {
TlStorerCalcLength storer;
store(storer);
return storer.get_length();
}
- size_t store(uint8 *ptr_x) const override {
- auto ptr = reinterpret_cast<char *>(ptr_x);
+ size_t store(uint8 *ptr) const final {
TlStorerUnsafe storer(ptr);
store(storer);
- return storer.get_buf() - ptr;
+ return static_cast<size_t>(storer.get_buf() - ptr);
}
};
@@ -76,15 +77,15 @@ class BinlogKeyValue : public KeyValueSyncInterface {
magic_ = override_magic;
}
- binlog_ = std::make_unique<BinlogT>();
- TRY_STATUS(binlog_->init(name,
- [&](const BinlogEvent &binlog_event) {
- Event event;
- event.parse(TlParser(binlog_event.data_));
- map_.emplace(std::make_pair(event.key.str(),
- std::make_pair(event.value.str(), binlog_event.id_)));
- },
- std::move(db_key), DbKey::empty(), scheduler_id));
+ binlog_ = std::make_shared<BinlogT>();
+ TRY_STATUS(binlog_->init(
+ name,
+ [&](const BinlogEvent &binlog_event) {
+ Event event;
+ event.parse(TlParser(binlog_event.data_));
+ map_.emplace(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_));
+ },
+ std::move(db_key), DbKey::empty(), scheduler_id));
return Status::OK();
}
@@ -103,7 +104,7 @@ class BinlogKeyValue : public KeyValueSyncInterface {
void external_init_handle(const BinlogEvent &binlog_event) {
Event event;
event.parse(TlParser(binlog_event.data_));
- map_.emplace(std::make_pair(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_)));
+ map_.emplace(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_));
}
void external_init_finish(std::shared_ptr<BinlogT> binlog) {
@@ -113,17 +114,24 @@ class BinlogKeyValue : public KeyValueSyncInterface {
void close() {
*this = BinlogKeyValue();
}
+ void close(Promise<> promise) final {
+ binlog_->close(std::move(promise));
+ }
- SeqNo set(string key, string value) override {
+ SeqNo set(string key, string value) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
uint64 old_id = 0;
- auto it_ok = map_.insert({key, {value, 0}});
+ auto it_ok = map_.emplace(key, std::make_pair(value, 0));
if (!it_ok.second) {
if (it_ok.first->second.first == value) {
return 0;
}
+ VLOG(binlog) << "Change value of key " << key << " from " << hex_encode(it_ok.first->second.first) << " to "
+ << hex_encode(value);
old_id = it_ok.first->second.second;
it_ok.first->second.first = value;
+ } else {
+ VLOG(binlog) << "Set value of key " << key << " to " << hex_encode(value);
}
bool rewrite = false;
uint64 id;
@@ -142,15 +150,15 @@ class BinlogKeyValue : public KeyValueSyncInterface {
return seq_no;
}
- SeqNo erase(const string &key) override {
+ SeqNo erase(const string &key) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
auto it = map_.find(key);
if (it == map_.end()) {
return 0;
}
+ VLOG(binlog) << "Remove value of key " << key << ", which is " << hex_encode(it->second.first);
uint64 id = it->second.second;
map_.erase(it);
- // LOG(ERROR) << "ADD EVENT";
auto seq_no = binlog_->next_id();
lock.reset();
add_event(seq_no, BinlogEvent::create_raw(id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite,
@@ -159,62 +167,62 @@ class BinlogKeyValue : public KeyValueSyncInterface {
}
void add_event(uint64 seq_no, BufferSlice &&event) {
- binlog_->add_raw_event(seq_no, std::move(event));
+ binlog_->add_raw_event(BinlogDebugInfo{__FILE__, __LINE__}, seq_no, std::move(event));
}
- bool isset(const string &key) override {
+ bool isset(const string &key) final {
auto lock = rw_mutex_.lock_read().move_as_ok();
return map_.count(key) > 0;
}
- string get(const string &key) override {
+ string get(const string &key) final {
auto lock = rw_mutex_.lock_read().move_as_ok();
auto it = map_.find(key);
if (it == map_.end()) {
return string();
}
+ VLOG(binlog) << "Get value of key " << key << ", which is " << hex_encode(it->second.first);
return it->second.first;
}
- void force_sync(Promise<> &&promise) {
+ void force_sync(Promise<> &&promise) final {
binlog_->force_sync(std::move(promise));
}
+
void lazy_sync(Promise<> &&promise) {
binlog_->lazy_sync(std::move(promise));
}
- std::unordered_map<string, string> prefix_get(Slice prefix) {
- // TODO: optimize with std::map?
+ std::unordered_map<string, string, Hash<string>> prefix_get(Slice prefix) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
- std::unordered_map<string, string> res;
+ std::unordered_map<string, string, Hash<string>> res;
for (const auto &kv : map_) {
if (begins_with(kv.first, prefix)) {
- res[kv.first] = kv.second.first;
+ res.emplace(kv.first.substr(prefix.size()), kv.second.first);
}
}
return res;
}
- std::unordered_map<string, string> get_all() {
+ std::unordered_map<string, string, Hash<string>> get_all() final {
auto lock = rw_mutex_.lock_write().move_as_ok();
- std::unordered_map<string, string> res;
+ std::unordered_map<string, string, Hash<string>> res;
for (const auto &kv : map_) {
- res[kv.first] = kv.second.first;
+ res.emplace(kv.first, kv.second.first);
}
return res;
}
- void erase_by_prefix(Slice prefix) {
+ void erase_by_prefix(Slice prefix) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
- std::vector<uint64> ids;
- for (auto it = map_.begin(); it != map_.end();) {
- if (begins_with(it->first, prefix)) {
- ids.push_back(it->second.second);
- it = map_.erase(it);
- } else {
- ++it;
+ vector<uint64> ids;
+ table_remove_if(map_, [&](const auto &it) {
+ if (begins_with(it.first, prefix)) {
+ ids.push_back(it.second.second);
+ return true;
}
- }
+ return false;
+ });
auto seq_no = binlog_->next_id(narrow_cast<int32>(ids.size()));
lock.reset();
for (auto id : ids) {
@@ -231,22 +239,26 @@ class BinlogKeyValue : public KeyValueSyncInterface {
}
private:
- std::unordered_map<string, std::pair<string, uint64>> map_;
+ std::unordered_map<string, std::pair<string, uint64>, Hash<string>> map_;
std::shared_ptr<BinlogT> binlog_;
RwMutex rw_mutex_;
- int32 magic_ = magic;
+ int32 magic_ = MAGIC;
};
+
template <>
inline void BinlogKeyValue<Binlog>::add_event(uint64 seq_no, BufferSlice &&event) {
- binlog_->add_raw_event(std::move(event));
+ binlog_->add_raw_event(std::move(event), BinlogDebugInfo{__FILE__, __LINE__});
}
+
template <>
inline void BinlogKeyValue<Binlog>::force_sync(Promise<> &&promise) {
binlog_->sync();
promise.set_value(Unit());
}
+
template <>
inline void BinlogKeyValue<Binlog>::lazy_sync(Promise<> &&promise) {
force_sync(std::move(promise));
}
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/DbKey.h b/protocols/Telegram/tdlib/td/tddb/td/db/DbKey.h
index b0edde2ae1..084f6283de 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/DbKey.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/DbKey.h
@@ -1,51 +1,61 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
+
#include "td/utils/common.h"
#include "td/utils/Slice.h"
namespace td {
+
class DbKey {
- public:
- enum Type { Empty, RawKey, Password };
+ enum class Type { Empty, RawKey, Password };
Type type() const {
return type_;
}
+
+ public:
bool is_empty() const {
- return type_ == Empty;
+ return type_ == Type::Empty;
}
+
bool is_raw_key() const {
- return type_ == RawKey;
+ return type_ == Type::RawKey;
}
+
bool is_password() const {
- return type_ == Password;
+ return type_ == Type::Password;
}
+
CSlice data() const {
return data_;
}
+
static DbKey raw_key(string raw_key) {
DbKey res;
- res.type_ = RawKey;
+ res.type_ = Type::RawKey;
res.data_ = std::move(raw_key);
return res;
}
+
static DbKey password(string password) {
DbKey res;
- res.type_ = Password;
+ res.type_ = Type::Password;
res.data_ = std::move(password);
return res;
}
+
static DbKey empty() {
return DbKey();
}
private:
- Type type_{Empty};
+ Type type_{Type::Empty};
string data_;
};
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/KeyValueSyncInterface.h b/protocols/Telegram/tdlib/td/tddb/td/db/KeyValueSyncInterface.h
index 8d19d0e75c..4007d0bfd6 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/KeyValueSyncInterface.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/KeyValueSyncInterface.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -7,6 +7,11 @@
#pragma once
#include "td/utils/common.h"
+#include "td/utils/HashTableUtils.h"
+#include "td/utils/Promise.h"
+#include "td/utils/Slice.h"
+
+#include <unordered_map>
namespace td {
@@ -29,7 +34,17 @@ class KeyValueSyncInterface {
virtual string get(const string &key) = 0;
+ virtual std::unordered_map<string, string, Hash<string>> prefix_get(Slice prefix) = 0;
+
+ virtual std::unordered_map<string, string, Hash<string>> get_all() = 0;
+
virtual SeqNo erase(const string &key) = 0;
+
+ virtual void erase_by_prefix(Slice prefix) = 0;
+
+ virtual void force_sync(Promise<> &&promise) = 0;
+
+ virtual void close(Promise<> promise) = 0;
};
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/Pmc.h b/protocols/Telegram/tdlib/td/tddb/td/db/Pmc.h
deleted file mode 100644
index dcf0e0c351..0000000000
--- a/protocols/Telegram/tdlib/td/tddb/td/db/Pmc.h
+++ /dev/null
@@ -1,27 +0,0 @@
-//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
-//
-// Distributed under the Boost Software License, Version 1.0. (See accompanying
-// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-//
-#pragma once
-
-#include "td/db/binlog/ConcurrentBinlog.h"
-#include "td/db/BinlogKeyValue.h"
-#include "td/db/SqliteKeyValue.h"
-
-#include "td/utils/common.h"
-
-#include <memory>
-
-namespace td {
-
-using BinlogPmcBase = BinlogKeyValue<ConcurrentBinlog>;
-using BinlogPmc = std::shared_ptr<BinlogPmcBase>;
-using BinlogPmcPtr = BinlogPmcBase *;
-
-using BigPmcBase = SqliteKeyValue;
-using BigPmc = std::unique_ptr<BigPmcBase>;
-using BigPmcPtr = BigPmcBase *;
-
-}; // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SeqKeyValue.h b/protocols/Telegram/tdlib/td/tddb/td/db/SeqKeyValue.h
index ec6f2b99b6..0ec10b682a 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/SeqKeyValue.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/SeqKeyValue.h
@@ -1,16 +1,18 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
+#include "td/utils/HashTableUtils.h"
#include "td/utils/Slice.h"
#include <unordered_map>
namespace td {
+
class SeqKeyValue {
public:
using SeqNo = uint64;
@@ -22,7 +24,7 @@ class SeqKeyValue {
~SeqKeyValue() = default;
SeqNo set(Slice key, Slice value) {
- auto it_ok = map_.insert({key.str(), value.str()});
+ auto it_ok = map_.emplace(key.str(), value.str());
if (!it_ok.second) {
if (it_ok.first->second == value) {
return 0;
@@ -31,6 +33,7 @@ class SeqKeyValue {
}
return next_seq_no();
}
+
SeqNo erase(const string &key) {
auto it = map_.find(key);
if (it == map_.end()) {
@@ -39,9 +42,11 @@ class SeqKeyValue {
map_.erase(it);
return next_seq_no();
}
+
SeqNo seq_no() const {
return current_id_ + 1;
}
+
string get(const string &key) const {
auto it = map_.find(key);
if (it == map_.end()) {
@@ -50,29 +55,28 @@ class SeqKeyValue {
return it->second;
}
- template <class F>
- void foreach (const F &f) {
- for (auto &it : map_) {
- f(it.first, it.second);
+ bool isset(const string &key) const {
+ auto it = map_.find(key);
+ if (it == map_.end()) {
+ return false;
}
+ return true;
}
size_t size() const {
return map_.size();
}
- void reset_seq_no() {
- current_id_ = 0;
- }
- std::unordered_map<string, string> get_all() const {
+ std::unordered_map<string, string, Hash<string>> get_all() const {
return map_;
}
private:
- std::unordered_map<string, string> map_;
+ std::unordered_map<string, string, Hash<string>> map_;
SeqNo current_id_ = 0;
SeqNo next_seq_no() {
return ++current_id_;
}
};
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.cpp
new file mode 100644
index 0000000000..a3c13d7120
--- /dev/null
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.cpp
@@ -0,0 +1,51 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/db/SqliteConnectionSafe.h"
+
+#include "td/utils/common.h"
+#include "td/utils/format.h"
+#include "td/utils/logging.h"
+
+namespace td {
+
+SqliteConnectionSafe::SqliteConnectionSafe(string path, DbKey key, optional<int32> cipher_version)
+ : path_(std::move(path))
+ , lsls_connection_([path = path_, close_state_ptr = &close_state_, key = std::move(key),
+ cipher_version = std::move(cipher_version)] {
+ auto r_db = SqliteDb::open_with_key(path, false, key, cipher_version.copy());
+ if (r_db.is_error()) {
+ LOG(FATAL) << "Can't open database in state " << close_state_ptr->load() << ": " << r_db.error().message();
+ }
+ auto db = r_db.move_as_ok();
+ db.exec("PRAGMA journal_mode=WAL").ensure();
+ db.exec("PRAGMA secure_delete=1").ensure();
+ return db;
+ }) {
+}
+
+void SqliteConnectionSafe::set(SqliteDb &&db) {
+ lsls_connection_.set(std::move(db));
+}
+
+SqliteDb &SqliteConnectionSafe::get() {
+ return lsls_connection_.get();
+}
+
+void SqliteConnectionSafe::close() {
+ LOG(INFO) << "Close SQLite database " << tag("path", path_);
+ close_state_++;
+ lsls_connection_.clear_values();
+}
+
+void SqliteConnectionSafe::close_and_destroy() {
+ close();
+ LOG(INFO) << "Destroy SQLite database " << tag("path", path_);
+ close_state_ += 65536;
+ SqliteDb::destroy(path_).ignore();
+}
+
+} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.h
index 6e45e79e63..a8bae2b3c2 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.h
@@ -1,53 +1,39 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
-#include "td/actor/SchedulerLocalStorage.h"
-
+#include "td/db/DbKey.h"
#include "td/db/SqliteDb.h"
+#include "td/actor/SchedulerLocalStorage.h"
+
#include "td/utils/common.h"
-#include "td/utils/format.h"
-#include "td/utils/logging.h"
+#include "td/utils/optional.h"
+
+#include <atomic>
namespace td {
class SqliteConnectionSafe {
public:
SqliteConnectionSafe() = default;
- explicit SqliteConnectionSafe(string name, DbKey key = DbKey::empty())
- : lsls_connection_([name = name, key = std::move(key)] {
- auto db = SqliteDb::open_with_key(name, key).move_as_ok();
- db.exec("PRAGMA synchronous=NORMAL").ensure();
- db.exec("PRAGMA temp_store=MEMORY").ensure();
- db.exec("PRAGMA secure_delete=1").ensure();
- db.exec("PRAGMA recursive_triggers=1").ensure();
- return db;
- })
- , name_(std::move(name)) {
- }
-
- SqliteDb &get() {
- return lsls_connection_.get();
- }
-
- void close() {
- LOG(INFO) << "Close sqlite db " << tag("path", name_);
- lsls_connection_.clear_values();
- }
- void close_and_destroy() {
- close();
- LOG(INFO) << "Destroy sqlite db " << tag("path", name_);
- SqliteDb::destroy(name_).ignore();
- }
+ SqliteConnectionSafe(string path, DbKey key, optional<int32> cipher_version = {});
+
+ SqliteDb &get();
+ void set(SqliteDb &&db);
+
+ void close();
+
+ void close_and_destroy();
private:
+ string path_;
+ std::atomic<uint32> close_state_{0};
LazySchedulerLocalStorage<SqliteDb> lsls_connection_;
- string name_;
};
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.cpp
index 819818197d..2ae37b7f90 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.cpp
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.cpp
@@ -1,14 +1,17 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/SqliteDb.h"
+#include "td/utils/common.h"
#include "td/utils/format.h"
+#include "td/utils/logging.h"
#include "td/utils/port/path.h"
#include "td/utils/port/Stat.h"
+#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include "td/utils/StringBuilder.h"
#include "td/utils/Timer.h"
@@ -18,12 +21,34 @@
namespace td {
namespace {
+string quote_string(Slice str) {
+ size_t cnt = 0;
+ for (auto &c : str) {
+ if (c == '\'') {
+ cnt++;
+ }
+ }
+ if (cnt == 0) {
+ return str.str();
+ }
+
+ string result;
+ result.reserve(str.size() + cnt);
+ for (auto &c : str) {
+ if (c == '\'') {
+ result += '\'';
+ }
+ result += c;
+ }
+ return result;
+}
+
string db_key_to_sqlcipher_key(const DbKey &db_key) {
if (db_key.is_empty()) {
return "''";
}
if (db_key.is_password()) {
- return PSTRING() << "'" << db_key.data().str() << "'";
+ return PSTRING() << "'" << quote_string(db_key.data()) << "'";
}
CHECK(db_key.is_raw_key());
Slice raw_key = db_key.data();
@@ -46,27 +71,29 @@ string db_key_to_sqlcipher_key(const DbKey &db_key) {
SqliteDb::~SqliteDb() = default;
-Status SqliteDb::init(CSlice path, bool *was_created) {
- // If database does not exist, delete all other files which may left
- // from older database
- bool is_db_exists = stat(path).is_ok();
- if (!is_db_exists) {
- destroy(path).ignore();
+Status SqliteDb::init(CSlice path, bool allow_creation) {
+ // if database does not exist, delete all other files which could have been left from the old database
+ auto database_stat = stat(path);
+ if (database_stat.is_error()) {
+ if (!allow_creation) {
+ bool was_destroyed = detail::RawSqliteDb::was_any_database_destroyed();
+ auto reason = was_destroyed ? Slice("was corrupted and deleted") : Slice("disappeared");
+ return Status::Error(PSLICE() << "Database " << reason
+ << " during execution and can't be recreated: " << database_stat.error());
+ }
+ TRY_STATUS(destroy(path));
}
- if (was_created != nullptr) {
- *was_created = !is_db_exists;
- }
- sqlite3 *db;
- CHECK(sqlite3_threadsafe() != 0);
- int rc = sqlite3_open_v2(path.c_str(), &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE /*| SQLITE_OPEN_SHAREDCACHE*/,
- nullptr);
+ tdsqlite3 *db;
+ CHECK(tdsqlite3_threadsafe() != 0);
+ int rc =
+ tdsqlite3_open_v2(path.c_str(), &db, SQLITE_OPEN_READWRITE | (allow_creation ? SQLITE_OPEN_CREATE : 0), nullptr);
if (rc != SQLITE_OK) {
- auto res = Status::Error(PSLICE() << "Failed to open db: " << detail::RawSqliteDb::last_error(db));
- sqlite3_close(db);
+ auto res = detail::RawSqliteDb::last_error(db, path);
+ tdsqlite3_close(db);
return res;
}
- sqlite3_busy_timeout(db, 1000 * 5 /* 5 seconds */);
+ tdsqlite3_busy_timeout(db, 1000 * 5 /* 5 seconds */);
raw_ = std::make_shared<detail::RawSqliteDb>(db, path.str());
return Status::OK();
}
@@ -74,32 +101,41 @@ Status SqliteDb::init(CSlice path, bool *was_created) {
static void trace_callback(void *ptr, const char *query) {
LOG(ERROR) << query;
}
+
static int trace_v2_callback(unsigned code, void *ctx, void *p_raw, void *x_raw) {
CHECK(code == SQLITE_TRACE_STMT);
auto x = static_cast<const char *>(x_raw);
if (x[0] == '-' && x[1] == '-') {
trace_callback(ctx, x);
} else {
- trace_callback(ctx, sqlite3_expanded_sql(static_cast<sqlite3_stmt *>(p_raw)));
+ trace_callback(ctx, tdsqlite3_expanded_sql(static_cast<tdsqlite3_stmt *>(p_raw)));
}
return 0;
}
+
void SqliteDb::trace(bool flag) {
- sqlite3_trace_v2(raw_->db(), SQLITE_TRACE_STMT, flag ? trace_v2_callback : nullptr, nullptr);
+ tdsqlite3_trace_v2(raw_->db(), SQLITE_TRACE_STMT, flag ? trace_v2_callback : nullptr, nullptr);
}
Status SqliteDb::exec(CSlice cmd) {
CHECK(!empty());
char *msg;
- VLOG(sqlite) << "Start exec " << tag("cmd", cmd) << tag("db", raw_->db());
- auto rc = sqlite3_exec(raw_->db(), cmd.c_str(), nullptr, nullptr, &msg);
- VLOG(sqlite) << "Finish exec " << tag("cmd", cmd) << tag("db", raw_->db());
+ if (enable_logging_) {
+ VLOG(sqlite) << "Start exec " << tag("query", cmd) << tag("database", raw_->db());
+ }
+ auto rc = tdsqlite3_exec(raw_->db(), cmd.c_str(), nullptr, nullptr, &msg);
if (rc != SQLITE_OK) {
CHECK(msg != nullptr);
- return Status::Error(PSLICE() << tag("query", cmd) << " failed: " << msg);
+ if (enable_logging_) {
+ VLOG(sqlite) << "Finish exec with error " << msg;
+ }
+ return Status::Error(PSLICE() << tag("query", cmd) << " to database \"" << raw_->path() << "\" failed: " << msg);
}
CHECK(msg == nullptr);
+ if (enable_logging_) {
+ VLOG(sqlite) << "Finish exec";
+ }
return Status::OK();
}
@@ -111,6 +147,7 @@ Result<bool> SqliteDb::has_table(Slice table) {
auto cnt = stmt.view_int32(0);
return cnt == 1;
}
+
Result<string> SqliteDb::get_pragma(Slice name) {
TRY_RESULT(stmt, get_statement(PSLICE() << "PRAGMA " << name));
TRY_STATUS(stmt.step());
@@ -121,11 +158,21 @@ Result<string> SqliteDb::get_pragma(Slice name) {
return std::move(res);
}
+Result<string> SqliteDb::get_pragma_string(Slice name) {
+ TRY_RESULT(stmt, get_statement(PSLICE() << "PRAGMA " << name));
+ TRY_STATUS(stmt.step());
+ CHECK(stmt.has_row());
+ auto res = stmt.view_string(0).str();
+ TRY_STATUS(stmt.step());
+ CHECK(!stmt.can_step());
+ return std::move(res);
+}
+
Result<int32> SqliteDb::user_version() {
TRY_RESULT(get_version_stmt, get_statement("PRAGMA user_version"));
TRY_STATUS(get_version_stmt.step());
if (!get_version_stmt.has_row()) {
- return Status::Error("PRAGMA user_version failed");
+ return Status::Error(PSLICE() << "PRAGMA user_version failed for database \"" << raw_->path() << '"');
}
return get_version_stmt.view_int32(0);
}
@@ -134,56 +181,103 @@ Status SqliteDb::set_user_version(int32 version) {
return exec(PSLICE() << "PRAGMA user_version = " << version);
}
-Status SqliteDb::begin_transaction() {
- return exec("BEGIN");
+Status SqliteDb::begin_read_transaction() {
+ if (raw_->on_begin()) {
+ return exec("BEGIN");
+ }
+ return Status::OK();
+}
+
+Status SqliteDb::begin_write_transaction() {
+ if (raw_->on_begin()) {
+ return exec("BEGIN IMMEDIATE");
+ }
+ return Status::OK();
}
+
Status SqliteDb::commit_transaction() {
- return exec("COMMIT");
+ TRY_RESULT(need_commit, raw_->on_commit());
+ if (need_commit) {
+ return exec("COMMIT");
+ }
+ return Status::OK();
}
-bool SqliteDb::is_encrypted() {
- return exec("SELECT count(*) FROM sqlite_master").is_error();
+Status SqliteDb::check_encryption() {
+ auto status = exec("SELECT count(*) FROM sqlite_master");
+ if (status.is_ok()) {
+ enable_logging_ = true;
+ }
+ return status;
}
-Result<SqliteDb> SqliteDb::open_with_key(CSlice path, const DbKey &db_key) {
+Result<SqliteDb> SqliteDb::open_with_key(CSlice path, bool allow_creation, const DbKey &db_key,
+ optional<int32> cipher_version) {
+ auto res = do_open_with_key(path, allow_creation, db_key, cipher_version ? cipher_version.value() : 0);
+ if (res.is_error() && !cipher_version && !db_key.is_empty()) {
+ return do_open_with_key(path, false, db_key, 3);
+ }
+ return res;
+}
+
+Result<SqliteDb> SqliteDb::do_open_with_key(CSlice path, bool allow_creation, const DbKey &db_key,
+ int32 cipher_version) {
SqliteDb db;
- TRY_STATUS(db.init(path));
+ TRY_STATUS(db.init(path, allow_creation));
if (!db_key.is_empty()) {
- if (!db.is_encrypted()) {
- return Status::Error("No key is needed");
+ if (db.check_encryption().is_ok()) {
+ return Status::Error(PSLICE() << "No key is needed for database \"" << path << '"');
}
auto key = db_key_to_sqlcipher_key(db_key);
TRY_STATUS(db.exec(PSLICE() << "PRAGMA key = " << key));
+ if (cipher_version != 0) {
+ LOG(INFO) << "Trying SQLCipher compatibility mode with version = " << cipher_version;
+ TRY_STATUS(db.exec(PSLICE() << "PRAGMA cipher_compatibility = " << cipher_version));
+ }
+ db.set_cipher_version(cipher_version);
}
- if (db.is_encrypted()) {
- return Status::Error("Wrong key");
- }
+ TRY_STATUS_PREFIX(db.check_encryption(), "Can't check database: ");
return std::move(db);
}
-Status SqliteDb::change_key(CSlice path, const DbKey &new_db_key, const DbKey &old_db_key) {
+void SqliteDb::set_cipher_version(int32 cipher_version) {
+ raw_->set_cipher_version(cipher_version);
+}
+
+optional<int32> SqliteDb::get_cipher_version() const {
+ return raw_->get_cipher_version();
+}
+
+Result<SqliteDb> SqliteDb::change_key(CSlice path, bool allow_creation, const DbKey &new_db_key,
+ const DbKey &old_db_key) {
// fast path
{
- auto r_db = open_with_key(path, new_db_key);
+ PerfWarningTimer perf("open database", 0.05);
+ auto r_db = open_with_key(path, allow_creation, new_db_key);
if (r_db.is_ok()) {
- return Status::OK();
+ return r_db;
}
}
- TRY_RESULT(db, open_with_key(path, old_db_key));
+ PerfWarningTimer perf("change database key", 0.5);
+ auto create_database = [](CSlice tmp_path) -> Status {
+ TRY_STATUS(destroy(tmp_path));
+ SqliteDb db;
+ return db.init(tmp_path, true);
+ };
+
+ TRY_RESULT(db, open_with_key(path, false, old_db_key));
TRY_RESULT(user_version, db.user_version());
auto new_key = db_key_to_sqlcipher_key(new_db_key);
if (old_db_key.is_empty() && !new_db_key.is_empty()) {
LOG(DEBUG) << "ENCRYPT";
- // Encrypt
- PerfWarningTimer timer("Encrypt sqlite database", 0.1);
- auto tmp_path = path.str() + ".ecnrypted";
- unlink(tmp_path).ignore();
+ PerfWarningTimer timer("Encrypt SQLite database", 0.1);
+ auto tmp_path = path.str() + ".encrypted";
+ TRY_STATUS(create_database(tmp_path));
- // make shure that database is not empty
+ // make sure that database is not empty
TRY_STATUS(db.exec("CREATE TABLE IF NOT EXISTS encryption_dummy_table(id INT PRIMARY KEY)"));
- //NB: not really safe
- TRY_STATUS(db.exec(PSLICE() << "ATTACH DATABASE '" << tmp_path << "' AS encrypted KEY " << new_key));
+ TRY_STATUS(db.exec(PSLICE() << "ATTACH DATABASE '" << quote_string(tmp_path) << "' AS encrypted KEY " << new_key));
TRY_STATUS(db.exec("SELECT sqlcipher_export('encrypted')"));
TRY_STATUS(db.exec(PSLICE() << "PRAGMA encrypted.user_version = " << user_version));
TRY_STATUS(db.exec("DETACH DATABASE encrypted"));
@@ -191,13 +285,11 @@ Status SqliteDb::change_key(CSlice path, const DbKey &new_db_key, const DbKey &o
TRY_STATUS(rename(tmp_path, path));
} else if (!old_db_key.is_empty() && new_db_key.is_empty()) {
LOG(DEBUG) << "DECRYPT";
- // Dectypt
- PerfWarningTimer timer("Decrypt sqlite database", 0.1);
- auto tmp_path = path.str() + ".ecnrypted";
- unlink(tmp_path).ignore();
+ PerfWarningTimer timer("Decrypt SQLite database", 0.1);
+ auto tmp_path = path.str() + ".encrypted";
+ TRY_STATUS(create_database(tmp_path));
- //NB: not really safe
- TRY_STATUS(db.exec(PSLICE() << "ATTACH DATABASE '" << tmp_path << "' AS decrypted KEY ''"));
+ TRY_STATUS(db.exec(PSLICE() << "ATTACH DATABASE '" << quote_string(tmp_path) << "' AS decrypted KEY ''"));
TRY_STATUS(db.exec("SELECT sqlcipher_export('decrypted')"));
TRY_STATUS(db.exec(PSLICE() << "PRAGMA decrypted.user_version = " << user_version));
TRY_STATUS(db.exec("DETACH DATABASE decrypted"));
@@ -205,24 +297,27 @@ Status SqliteDb::change_key(CSlice path, const DbKey &new_db_key, const DbKey &o
TRY_STATUS(rename(tmp_path, path));
} else {
LOG(DEBUG) << "REKEY";
- PerfWarningTimer timer("Rekey sqlite database", 0.1);
+ PerfWarningTimer timer("Rekey SQLite database", 0.1);
TRY_STATUS(db.exec(PSLICE() << "PRAGMA rekey = " << new_key));
}
- TRY_RESULT(new_db, open_with_key(path, new_db_key));
+ TRY_RESULT(new_db, open_with_key(path, false, new_db_key));
CHECK(new_db.user_version().ok() == user_version);
- return Status::OK();
+ return std::move(new_db);
}
Status SqliteDb::destroy(Slice path) {
return detail::RawSqliteDb::destroy(path);
}
Result<SqliteStatement> SqliteDb::get_statement(CSlice statement) {
- sqlite3_stmt *stmt = nullptr;
- auto rc = sqlite3_prepare_v2(get_native(), statement.c_str(), static_cast<int>(statement.size()) + 1, &stmt, nullptr);
+ tdsqlite3_stmt *stmt = nullptr;
+ auto rc =
+ tdsqlite3_prepare_v2(get_native(), statement.c_str(), static_cast<int>(statement.size()) + 1, &stmt, nullptr);
if (rc != SQLITE_OK) {
- return Status::Error(PSLICE() << "Failed to prepare sqlite " << tag("stmt", statement) << raw_->last_error());
+ return Status::Error(PSLICE() << "Failed to prepare SQLite " << tag("statement", statement) << raw_->last_error());
}
+ LOG_CHECK(stmt != nullptr) << statement;
return SqliteStatement(stmt, raw_);
}
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.h
index 40137464ce..899b02e4a5 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -11,23 +11,19 @@
#include "td/db/detail/RawSqliteDb.h"
-#include "td/utils/logging.h"
+#include "td/utils/optional.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include <memory>
-struct sqlite3;
+struct tdsqlite3;
namespace td {
class SqliteDb {
public:
SqliteDb() = default;
- explicit SqliteDb(CSlice path) {
- auto status = init(path);
- LOG_IF(FATAL, status.is_error()) << status;
- }
SqliteDb(SqliteDb &&) = default;
SqliteDb &operator=(SqliteDb &&) = default;
SqliteDb(const SqliteDb &) = delete;
@@ -36,7 +32,7 @@ class SqliteDb {
// dangerous
SqliteDb clone() const {
- return SqliteDb(raw_);
+ return SqliteDb(raw_, enable_logging_);
}
bool empty() const {
@@ -46,26 +42,28 @@ class SqliteDb {
*this = SqliteDb();
}
- Status init(CSlice path, bool *was_created = nullptr) TD_WARN_UNUSED_RESULT;
Status exec(CSlice cmd) TD_WARN_UNUSED_RESULT;
Result<bool> has_table(Slice table);
Result<string> get_pragma(Slice name);
- Status begin_transaction();
- Status commit_transaction();
+ Result<string> get_pragma_string(Slice name);
+
+ Status begin_read_transaction() TD_WARN_UNUSED_RESULT;
+ Status begin_write_transaction() TD_WARN_UNUSED_RESULT;
+ Status commit_transaction() TD_WARN_UNUSED_RESULT;
Result<int32> user_version();
- Status set_user_version(int32 version);
+ Status set_user_version(int32 version) TD_WARN_UNUSED_RESULT;
void trace(bool flag);
static Status destroy(Slice path) TD_WARN_UNUSED_RESULT;
- // Anyway we can't change the key on the fly, so static functions is more than enough
- static Result<SqliteDb> open_with_key(CSlice path, const DbKey &db_key);
- static Status change_key(CSlice path, const DbKey &new_db_key, const DbKey &old_db_key);
-
- Status last_error();
+ // we can't change the key on the fly, so static functions are more than enough
+ static Result<SqliteDb> open_with_key(CSlice path, bool allow_creation, const DbKey &db_key,
+ optional<int32> cipher_version = {});
+ static Result<SqliteDb> change_key(CSlice path, bool allow_creation, const DbKey &new_db_key,
+ const DbKey &old_db_key);
- sqlite3 *get_native() const {
+ tdsqlite3 *get_native() const {
return raw_->db();
}
@@ -76,11 +74,20 @@ class SqliteDb {
detail::RawSqliteDb::with_db_path(main_path, f);
}
+ optional<int32> get_cipher_version() const;
+
private:
- explicit SqliteDb(std::shared_ptr<detail::RawSqliteDb> raw) : raw_(std::move(raw)) {
+ SqliteDb(std::shared_ptr<detail::RawSqliteDb> raw, bool enable_logging)
+ : raw_(std::move(raw)), enable_logging_(enable_logging) {
}
std::shared_ptr<detail::RawSqliteDb> raw_;
+ bool enable_logging_ = false;
+
+ Status init(CSlice path, bool allow_creation) TD_WARN_UNUSED_RESULT;
- bool is_encrypted();
+ Status check_encryption();
+ static Result<SqliteDb> do_open_with_key(CSlice path, bool allow_creation, const DbKey &db_key, int32 cipher_version);
+ void set_cipher_version(int32 cipher_version);
};
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.cpp
new file mode 100644
index 0000000000..97e2e6fe94
--- /dev/null
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.cpp
@@ -0,0 +1,124 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/db/SqliteKeyValue.h"
+
+#include "td/utils/base64.h"
+#include "td/utils/logging.h"
+#include "td/utils/ScopeGuard.h"
+
+namespace td {
+
+Status SqliteKeyValue::init_with_connection(SqliteDb connection, string table_name) {
+ auto init_guard = ScopeExit() + [&] {
+ close();
+ };
+ db_ = std::move(connection);
+ table_name_ = std::move(table_name);
+ TRY_STATUS(init(db_, table_name_));
+
+ TRY_RESULT_ASSIGN(set_stmt_,
+ db_.get_statement(PSLICE() << "REPLACE INTO " << table_name_ << " (k, v) VALUES (?1, ?2)"));
+ TRY_RESULT_ASSIGN(get_stmt_, db_.get_statement(PSLICE() << "SELECT v FROM " << table_name_ << " WHERE k = ?1"));
+ TRY_RESULT_ASSIGN(erase_stmt_, db_.get_statement(PSLICE() << "DELETE FROM " << table_name_ << " WHERE k = ?1"));
+ TRY_RESULT_ASSIGN(get_all_stmt_, db_.get_statement(PSLICE() << "SELECT k, v FROM " << table_name_));
+
+ TRY_RESULT_ASSIGN(erase_by_prefix_stmt_,
+ db_.get_statement(PSLICE() << "DELETE FROM " << table_name_ << " WHERE ?1 <= k AND k < ?2"));
+ TRY_RESULT_ASSIGN(erase_by_prefix_rare_stmt_,
+ db_.get_statement(PSLICE() << "DELETE FROM " << table_name_ << " WHERE ?1 <= k"));
+
+ TRY_RESULT_ASSIGN(get_by_prefix_stmt_,
+ db_.get_statement(PSLICE() << "SELECT k, v FROM " << table_name_ << " WHERE ?1 <= k AND k < ?2"));
+ TRY_RESULT_ASSIGN(get_by_prefix_rare_stmt_,
+ db_.get_statement(PSLICE() << "SELECT k, v FROM " << table_name_ << " WHERE ?1 <= k"));
+
+ init_guard.dismiss();
+ return Status::OK();
+}
+
+Status SqliteKeyValue::drop() {
+ if (empty()) {
+ return Status::OK();
+ }
+
+ auto result = drop(db_, table_name_);
+ close();
+ return result;
+}
+
+void SqliteKeyValue::set(Slice key, Slice value) {
+ set_stmt_.bind_blob(1, key).ensure();
+ set_stmt_.bind_blob(2, value).ensure();
+ auto status = set_stmt_.step();
+ if (status.is_error()) {
+ LOG(FATAL) << "Failed to set \"" << base64_encode(key) << "\": " << status;
+ }
+ set_stmt_.reset();
+}
+
+void SqliteKeyValue::set_all(const FlatHashMap<string, string> &key_values) {
+ begin_write_transaction().ensure();
+ for (auto &key_value : key_values) {
+ set(key_value.first, key_value.second);
+ }
+ commit_transaction().ensure();
+}
+
+string SqliteKeyValue::get(Slice key) {
+ SCOPE_EXIT {
+ get_stmt_.reset();
+ };
+ get_stmt_.bind_blob(1, key).ensure();
+ get_stmt_.step().ensure();
+ if (!get_stmt_.has_row()) {
+ return string();
+ }
+ auto data = get_stmt_.view_blob(0).str();
+ get_stmt_.step().ignore();
+ return data;
+}
+
+void SqliteKeyValue::erase(Slice key) {
+ erase_stmt_.bind_blob(1, key).ensure();
+ erase_stmt_.step().ensure();
+ erase_stmt_.reset();
+}
+
+void SqliteKeyValue::erase_by_prefix(Slice prefix) {
+ auto next = next_prefix(prefix);
+ if (next.empty()) {
+ SCOPE_EXIT {
+ erase_by_prefix_rare_stmt_.reset();
+ };
+ erase_by_prefix_rare_stmt_.bind_blob(1, prefix).ensure();
+ erase_by_prefix_rare_stmt_.step().ensure();
+ } else {
+ SCOPE_EXIT {
+ erase_by_prefix_stmt_.reset();
+ };
+ erase_by_prefix_stmt_.bind_blob(1, prefix).ensure();
+ erase_by_prefix_stmt_.bind_blob(2, next).ensure();
+ erase_by_prefix_stmt_.step().ensure();
+ }
+}
+
+string SqliteKeyValue::next_prefix(Slice prefix) {
+ string next = prefix.str();
+ size_t pos = next.size();
+ while (pos) {
+ pos--;
+ auto value = static_cast<uint8>(next[pos]);
+ value++;
+ next[pos] = static_cast<char>(value);
+ if (value != 0) {
+ return next;
+ }
+ }
+ return string{};
+}
+
+} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.h
index 6fe050e7f2..bc617ff05f 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -9,149 +9,65 @@
#include "td/db/SqliteDb.h"
#include "td/db/SqliteStatement.h"
-#include "td/utils/logging.h"
-#include "td/utils/ScopeGuard.h"
+#include "td/utils/common.h"
+#include "td/utils/FlatHashMap.h"
#include "td/utils/Slice.h"
+#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
-#include <unordered_map>
-
namespace td {
+
class SqliteKeyValue {
public:
- static Status drop(SqliteDb &connection, Slice kv_name) TD_WARN_UNUSED_RESULT {
- return connection.exec(PSLICE() << "DROP TABLE IF EXISTS " << kv_name);
+ static Status drop(SqliteDb &connection, Slice table_name) TD_WARN_UNUSED_RESULT {
+ return connection.exec(PSLICE() << "DROP TABLE IF EXISTS " << table_name);
}
- static Status init(SqliteDb &connection, Slice kv_name) TD_WARN_UNUSED_RESULT {
- return connection.exec(PSLICE() << "CREATE TABLE IF NOT EXISTS " << kv_name << " (k BLOB PRIMARY KEY, v BLOB)");
+ static Status init(SqliteDb &connection, Slice table_name) TD_WARN_UNUSED_RESULT {
+ return connection.exec(PSLICE() << "CREATE TABLE IF NOT EXISTS " << table_name << " (k BLOB PRIMARY KEY, v BLOB)");
}
- using SeqNo = uint64;
- Result<bool> init(string name) TD_WARN_UNUSED_RESULT {
- name_ = std::move(name);
- bool is_created = false;
- SqliteDb db;
- TRY_STATUS(db.init(name, &is_created));
- TRY_STATUS(db.exec("PRAGMA encoding=\"UTF-8\""));
- TRY_STATUS(db.exec("PRAGMA synchronous=NORMAL"));
- TRY_STATUS(db.exec("PRAGMA journal_mode=WAL"));
- TRY_STATUS(db.exec("PRAGMA temp_store=MEMORY"));
- TRY_STATUS(init_with_connection(std::move(db), "KV"));
- return is_created;
+ bool empty() const {
+ return db_.empty();
}
- Status init_with_connection(SqliteDb connection, string kv_name) {
- db_ = std::move(connection);
- kv_name_ = std::move(kv_name);
- TRY_STATUS(init(db_, kv_name_));
- TRY_STATUS(db_.exec(PSLICE() << "CREATE TABLE IF NOT EXISTS " << kv_name_ << " (k BLOB PRIMARY KEY, v BLOB)"));
-
- TRY_RESULT(set_stmt, db_.get_statement(PSLICE() << "REPLACE INTO " << kv_name_ << " (k, v) VALUES (?1, ?2)"));
- set_stmt_ = std::move(set_stmt);
- TRY_RESULT(get_stmt, db_.get_statement(PSLICE() << "SELECT v FROM " << kv_name_ << " WHERE k = ?1"));
- get_stmt_ = std::move(get_stmt);
- TRY_RESULT(erase_stmt, db_.get_statement(PSLICE() << "DELETE FROM " << kv_name_ << " WHERE k = ?1"));
- erase_stmt_ = std::move(erase_stmt);
- TRY_RESULT(get_all_stmt, db_.get_statement(PSLICE() << "SELECT k, v FROM " << kv_name_ << ""));
-
- TRY_RESULT(erase_by_prefix_stmt,
- db_.get_statement(PSLICE() << "DELETE FROM " << kv_name_ << " WHERE ?1 <= k AND k < ?2"));
- erase_by_prefix_stmt_ = std::move(erase_by_prefix_stmt);
-
- TRY_RESULT(erase_by_prefix_rare_stmt,
- db_.get_statement(PSLICE() << "DELETE FROM " << kv_name_ << " WHERE ?1 <= k"));
- erase_by_prefix_rare_stmt_ = std::move(erase_by_prefix_rare_stmt);
-
- TRY_RESULT(get_by_prefix_stmt,
- db_.get_statement(PSLICE() << "SELECT k, v FROM " << kv_name_ << " WHERE ?1 <= k AND k < ?2"));
- get_by_prefix_stmt_ = std::move(get_by_prefix_stmt);
-
- TRY_RESULT(get_by_prefix_rare_stmt,
- db_.get_statement(PSLICE() << "SELECT k, v FROM " << kv_name_ << " WHERE ?1 <= k"));
- get_by_prefix_rare_stmt_ = std::move(get_by_prefix_rare_stmt);
-
- get_all_stmt_ = std::move(get_all_stmt);
- return Status::OK();
- }
+ Status init_with_connection(SqliteDb connection, string table_name) TD_WARN_UNUSED_RESULT;
- Result<bool> try_regenerate_index() TD_WARN_UNUSED_RESULT {
- return false;
- }
void close() {
- clear();
- }
- void close_silent() {
- clear();
- }
- static Status destroy(Slice name) {
- return SqliteDb::destroy(name);
- }
- void close_and_destroy() {
- db_.exec(PSLICE() << "DROP TABLE IF EXISTS " << kv_name_).ensure();
- auto name = std::move(name_);
- clear();
- if (!name.empty()) {
- SqliteDb::destroy(name).ignore();
- }
+ *this = SqliteKeyValue();
}
- SeqNo set(Slice key, Slice value) {
- set_stmt_.bind_blob(1, key).ensure();
- set_stmt_.bind_blob(2, value).ensure();
- set_stmt_.step().ensure();
- set_stmt_.reset();
- return 0;
- }
+ Status drop();
- SeqNo erase(Slice key) {
- erase_stmt_.bind_blob(1, key).ensure();
- erase_stmt_.step().ensure();
- erase_stmt_.reset();
- return 0;
- }
- string get(Slice key) {
- SCOPE_EXIT {
- get_stmt_.reset();
- };
- get_stmt_.bind_blob(1, key).ensure();
- get_stmt_.step().ensure();
- if (!get_stmt_.has_row()) {
- return "";
- }
- auto data = get_stmt_.view_blob(0).str();
- get_stmt_.step().ignore();
- return data;
+ void set(Slice key, Slice value);
+
+ void set_all(const FlatHashMap<string, string> &key_values);
+
+ string get(Slice key);
+
+ void erase(Slice key);
+
+ Status begin_read_transaction() TD_WARN_UNUSED_RESULT {
+ return db_.begin_read_transaction();
}
- Status begin_transaction() {
- return db_.begin_transaction();
+ Status begin_write_transaction() TD_WARN_UNUSED_RESULT {
+ return db_.begin_write_transaction();
}
- Status commit_transaction() {
+
+ Status commit_transaction() TD_WARN_UNUSED_RESULT {
return db_.commit_transaction();
}
- void erase_by_prefix(Slice prefix) {
- auto next = next_prefix(prefix);
- if (next.empty()) {
- SCOPE_EXIT {
- erase_by_prefix_rare_stmt_.reset();
- };
- erase_by_prefix_rare_stmt_.bind_blob(1, prefix).ensure();
- erase_by_prefix_rare_stmt_.step().ensure();
- } else {
- SCOPE_EXIT {
- erase_by_prefix_stmt_.reset();
- };
- erase_by_prefix_stmt_.bind_blob(1, prefix).ensure();
- erase_by_prefix_stmt_.bind_blob(2, next).ensure();
- erase_by_prefix_stmt_.step().ensure();
- }
- };
+ void erase_by_prefix(Slice prefix);
- std::unordered_map<string, string> get_all() {
- std::unordered_map<string, string> res;
- get_by_prefix("", [&](Slice key, Slice value) { res.emplace(key.str(), value.str()); });
+ FlatHashMap<string, string> get_all() {
+ FlatHashMap<string, string> res;
+ get_by_prefix("", [&](Slice key, Slice value) {
+ CHECK(!key.empty());
+ res.emplace(key.str(), value.str());
+ return true;
+ });
return res;
}
@@ -163,6 +79,7 @@ class SqliteKeyValue {
}
get_by_range(prefix, next, callback);
}
+
template <class CallbackT>
void get_by_range(Slice from, Slice till, CallbackT &&callback) {
SqliteStatement *stmt = nullptr;
@@ -181,18 +98,15 @@ class SqliteKeyValue {
auto guard = stmt->guard();
stmt->step().ensure();
while (stmt->has_row()) {
- callback(stmt->view_blob(0), stmt->view_blob(1));
+ if (!callback(stmt->view_blob(0), stmt->view_blob(1))) {
+ return;
+ }
stmt->step().ensure();
}
}
- void clear() {
- *this = SqliteKeyValue();
- }
-
private:
- string name_; // deprecated
- string kv_name_;
+ string table_name_;
SqliteDb db_;
SqliteStatement get_stmt_;
SqliteStatement set_stmt_;
@@ -203,19 +117,7 @@ class SqliteKeyValue {
SqliteStatement get_by_prefix_stmt_;
SqliteStatement get_by_prefix_rare_stmt_;
- string next_prefix(Slice prefix) {
- string next = prefix.str();
- size_t pos = next.size();
- while (pos) {
- pos--;
- auto value = static_cast<uint8>(next[pos]);
- value++;
- next[pos] = static_cast<char>(value);
- if (value != 0) {
- return next;
- }
- }
- return string{};
- }
+ static string next_prefix(Slice prefix);
};
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp
index e8575b62f1..a0a35a1e54 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp
@@ -1,45 +1,57 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/SqliteKeyValueAsync.h"
+#include "td/db/SqliteKeyValue.h"
+
+#include "td/actor/actor.h"
+
+#include "td/utils/common.h"
#include "td/utils/optional.h"
#include "td/utils/Time.h"
-#include <unordered_map>
-
namespace td {
-class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface {
+
+class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
public:
explicit SqliteKeyValueAsync(std::shared_ptr<SqliteKeyValueSafe> kv_safe, int32 scheduler_id = -1) {
impl_ = create_actor_on_scheduler<Impl>("KV", scheduler_id, std::move(kv_safe));
}
- void set(string key, string value, Promise<> promise) override {
+ void set(string key, string value, Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::set, std::move(key), std::move(value), std::move(promise));
}
- void erase(string key, Promise<> promise) override {
+ void set_all(FlatHashMap<string, string> key_values, Promise<Unit> promise) final {
+ send_closure_later(impl_, &Impl::set_all, std::move(key_values), std::move(promise));
+ }
+ void erase(string key, Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::erase, std::move(key), std::move(promise));
}
- void get(string key, Promise<string> promise) override {
+ void erase_by_prefix(string key_prefix, Promise<Unit> promise) final {
+ send_closure_later(impl_, &Impl::erase_by_prefix, std::move(key_prefix), std::move(promise));
+ }
+ void get(string key, Promise<string> promise) final {
send_closure_later(impl_, &Impl::get, std::move(key), std::move(promise));
}
- void close(Promise<> promise) override {
+ void close(Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::close, std::move(promise));
}
private:
- class Impl : public Actor {
+ class Impl final : public Actor {
public:
explicit Impl(std::shared_ptr<SqliteKeyValueSafe> kv_safe) : kv_safe_(std::move(kv_safe)) {
}
- void set(string key, string value, Promise<> promise) {
+
+ void set(string key, string value, Promise<Unit> promise) {
auto it = buffer_.find(key);
if (it != buffer_.end()) {
it->second = std::move(value);
} else {
+ CHECK(!key.empty());
buffer_.emplace(std::move(key), std::move(value));
}
if (promise) {
@@ -48,11 +60,19 @@ class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface {
cnt_++;
do_flush(false /*force*/);
}
- void erase(string key, Promise<> promise) {
+
+ void set_all(FlatHashMap<string, string> key_values, Promise<Unit> promise) {
+ do_flush(true /*force*/);
+ kv_->set_all(key_values);
+ promise.set_value(Unit());
+ }
+
+ void erase(string key, Promise<Unit> promise) {
auto it = buffer_.find(key);
if (it != buffer_.end()) {
it->second = optional<string>();
} else {
+ CHECK(!key.empty());
buffer_.emplace(std::move(key), optional<string>());
}
if (promise) {
@@ -62,6 +82,12 @@ class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface {
do_flush(false /*force*/);
}
+ void erase_by_prefix(string key_prefix, Promise<Unit> promise) {
+ do_flush(true /*force*/);
+ kv_->erase_by_prefix(key_prefix);
+ promise.set_value(Unit());
+ }
+
void get(const string &key, Promise<string> promise) {
auto it = buffer_.find(key);
if (it != buffer_.end()) {
@@ -69,7 +95,8 @@ class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface {
}
promise.set_value(kv_->get(key));
}
- void close(Promise<> promise) {
+
+ void close(Promise<Unit> promise) {
do_flush(true /*force*/);
kv_safe_.reset();
kv_ = nullptr;
@@ -81,10 +108,10 @@ class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface {
std::shared_ptr<SqliteKeyValueSafe> kv_safe_;
SqliteKeyValue *kv_ = nullptr;
- static constexpr double MAX_PENDING_QUERIES_DELAY = 1;
+ static constexpr double MAX_PENDING_QUERIES_DELAY = 0.01;
static constexpr size_t MAX_PENDING_QUERIES_COUNT = 100;
- std::unordered_map<string, optional<string>> buffer_;
- std::vector<Promise<>> buffer_promises_;
+ FlatHashMap<string, optional<string>> buffer_;
+ vector<Promise<Unit>> buffer_promises_;
size_t cnt_ = 0;
double wakeup_at_ = 0;
@@ -107,7 +134,7 @@ class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface {
wakeup_at_ = 0;
cnt_ = 0;
- kv_->begin_transaction().ensure();
+ kv_->begin_write_transaction().ensure();
for (auto &it : buffer_) {
if (it.second) {
kv_->set(it.first, it.second.value());
@@ -117,25 +144,23 @@ class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface {
}
kv_->commit_transaction().ensure();
buffer_.clear();
- for (auto &promise : buffer_promises_) {
- promise.set_value(Unit());
- }
- buffer_promises_.clear();
+ set_promises(buffer_promises_);
}
- void timeout_expired() override {
+ void timeout_expired() final {
do_flush(false /*force*/);
}
- void start_up() override {
+ void start_up() final {
kv_ = &kv_safe_->get();
}
};
ActorOwn<Impl> impl_;
};
-std::unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv,
- int32 scheduler_id) {
- return std::make_unique<SqliteKeyValueAsync>(std::move(kv), scheduler_id);
+
+unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv,
+ int32 scheduler_id) {
+ return td::make_unique<SqliteKeyValueAsync>(std::move(kv), scheduler_id);
}
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h
index 6015d26fb2..e5bc29b1e8 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -8,7 +8,9 @@
#include "td/db/SqliteKeyValueSafe.h"
-#include "td/actor/PromiseFuture.h"
+#include "td/utils/common.h"
+#include "td/utils/FlatHashMap.h"
+#include "td/utils/Promise.h"
#include <memory>
@@ -18,13 +20,19 @@ class SqliteKeyValueAsyncInterface {
public:
virtual ~SqliteKeyValueAsyncInterface() = default;
- virtual void set(string key, string value, Promise<> promise) = 0;
- virtual void erase(string key, Promise<> promise) = 0;
+ virtual void set(string key, string value, Promise<Unit> promise) = 0;
+
+ virtual void set_all(FlatHashMap<string, string> key_values, Promise<Unit> promise) = 0;
+
+ virtual void erase(string key, Promise<Unit> promise) = 0;
+
+ virtual void erase_by_prefix(string key_prefix, Promise<Unit> promise) = 0;
virtual void get(string key, Promise<string> promise) = 0;
- virtual void close(Promise<> promise) = 0;
+
+ virtual void close(Promise<Unit> promise) = 0;
};
-std::unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv,
- int32 scheduler_id = 1);
+unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv,
+ int32 scheduler_id = 1);
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h
index d63af3cfb2..b61a96e193 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -9,6 +9,8 @@
#include "td/db/SqliteConnectionSafe.h"
#include "td/db/SqliteKeyValue.h"
+#include "td/actor/SchedulerLocalStorage.h"
+
#include <memory>
namespace td {
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.cpp
index c9ce8c3e8e..a2ae3833e4 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.cpp
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.cpp
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -15,40 +15,42 @@
namespace td {
+int VERBOSITY_NAME(sqlite) = VERBOSITY_NAME(DEBUG) + 10;
+
namespace {
-int printExplainQueryPlan(StringBuilder &sb, sqlite3_stmt *pStmt) {
- const char *zSql = sqlite3_sql(pStmt);
+int printExplainQueryPlan(StringBuilder &sb, tdsqlite3_stmt *pStmt) {
+ const char *zSql = tdsqlite3_sql(pStmt);
if (zSql == nullptr) {
return SQLITE_ERROR;
}
- sb << "Explain " << tag("cmd", zSql);
- char *zExplain = sqlite3_mprintf("EXPLAIN QUERY PLAN %s", zSql);
+ sb << "Explain query " << zSql;
+ char *zExplain = tdsqlite3_mprintf("EXPLAIN QUERY PLAN %s", zSql);
if (zExplain == nullptr) {
return SQLITE_NOMEM;
}
- sqlite3_stmt *pExplain; /* Compiled EXPLAIN QUERY PLAN command */
- int rc = sqlite3_prepare_v2(sqlite3_db_handle(pStmt), zExplain, -1, &pExplain, nullptr);
- sqlite3_free(zExplain);
+ tdsqlite3_stmt *pExplain; /* Compiled EXPLAIN QUERY PLAN command */
+ int rc = tdsqlite3_prepare_v2(tdsqlite3_db_handle(pStmt), zExplain, -1, &pExplain, nullptr);
+ tdsqlite3_free(zExplain);
if (rc != SQLITE_OK) {
return rc;
}
- while (SQLITE_ROW == sqlite3_step(pExplain)) {
- int iSelectid = sqlite3_column_int(pExplain, 0);
- int iOrder = sqlite3_column_int(pExplain, 1);
- int iFrom = sqlite3_column_int(pExplain, 2);
- const char *zDetail = reinterpret_cast<const char *>(sqlite3_column_text(pExplain, 3));
+ while (SQLITE_ROW == tdsqlite3_step(pExplain)) {
+ int iSelectid = tdsqlite3_column_int(pExplain, 0);
+ int iOrder = tdsqlite3_column_int(pExplain, 1);
+ int iFrom = tdsqlite3_column_int(pExplain, 2);
+ const char *zDetail = reinterpret_cast<const char *>(tdsqlite3_column_text(pExplain, 3));
- sb << "\n" << iSelectid << " " << iOrder << " " << iFrom << " " << zDetail;
+ sb << '\n' << iSelectid << ' ' << iOrder << ' ' << iFrom << ' ' << zDetail;
}
- return sqlite3_finalize(pExplain);
+ return tdsqlite3_finalize(pExplain);
}
} // namespace
-SqliteStatement::SqliteStatement(sqlite3_stmt *stmt, std::shared_ptr<detail::RawSqliteDb> db)
+SqliteStatement::SqliteStatement(tdsqlite3_stmt *stmt, std::shared_ptr<detail::RawSqliteDb> db)
: stmt_(stmt), db_(std::move(db)) {
CHECK(stmt != nullptr);
}
@@ -70,14 +72,14 @@ Result<string> SqliteStatement::explain() {
return sb.as_cslice().str();
}
Status SqliteStatement::bind_blob(int id, Slice blob) {
- auto rc = sqlite3_bind_blob(stmt_.get(), id, blob.data(), static_cast<int>(blob.size()), nullptr);
+ auto rc = tdsqlite3_bind_blob(stmt_.get(), id, blob.data(), static_cast<int>(blob.size()), nullptr);
if (rc != SQLITE_OK) {
return last_error();
}
return Status::OK();
}
Status SqliteStatement::bind_string(int id, Slice str) {
- auto rc = sqlite3_bind_text(stmt_.get(), id, str.data(), static_cast<int>(str.size()), nullptr);
+ auto rc = tdsqlite3_bind_text(stmt_.get(), id, str.data(), static_cast<int>(str.size()), nullptr);
if (rc != SQLITE_OK) {
return last_error();
}
@@ -85,21 +87,21 @@ Status SqliteStatement::bind_string(int id, Slice str) {
}
Status SqliteStatement::bind_int32(int id, int32 value) {
- auto rc = sqlite3_bind_int(stmt_.get(), id, value);
+ auto rc = tdsqlite3_bind_int(stmt_.get(), id, value);
if (rc != SQLITE_OK) {
return last_error();
}
return Status::OK();
}
Status SqliteStatement::bind_int64(int id, int64 value) {
- auto rc = sqlite3_bind_int64(stmt_.get(), id, value);
+ auto rc = tdsqlite3_bind_int64(stmt_.get(), id, value);
if (rc != SQLITE_OK) {
return last_error();
}
return Status::OK();
}
Status SqliteStatement::bind_null(int id) {
- auto rc = sqlite3_bind_null(stmt_.get(), id);
+ auto rc = tdsqlite3_bind_null(stmt_.get(), id);
if (rc != SQLITE_OK) {
return last_error();
}
@@ -125,8 +127,8 @@ StringBuilder &operator<<(StringBuilder &sb, SqliteStatement::Datatype type) {
}
Slice SqliteStatement::view_blob(int id) {
LOG_IF(ERROR, view_datatype(id) != Datatype::Blob) << view_datatype(id);
- auto *data = sqlite3_column_blob(stmt_.get(), id);
- auto size = sqlite3_column_bytes(stmt_.get(), id);
+ auto *data = tdsqlite3_column_blob(stmt_.get(), id);
+ auto size = tdsqlite3_column_bytes(stmt_.get(), id);
if (data == nullptr) {
return Slice();
}
@@ -134,8 +136,8 @@ Slice SqliteStatement::view_blob(int id) {
}
Slice SqliteStatement::view_string(int id) {
LOG_IF(ERROR, view_datatype(id) != Datatype::Text) << view_datatype(id);
- auto *data = sqlite3_column_text(stmt_.get(), id);
- auto size = sqlite3_column_bytes(stmt_.get(), id);
+ auto *data = tdsqlite3_column_text(stmt_.get(), id);
+ auto size = tdsqlite3_column_bytes(stmt_.get(), id);
if (data == nullptr) {
return Slice();
}
@@ -143,14 +145,14 @@ Slice SqliteStatement::view_string(int id) {
}
int32 SqliteStatement::view_int32(int id) {
LOG_IF(ERROR, view_datatype(id) != Datatype::Integer) << view_datatype(id);
- return sqlite3_column_int(stmt_.get(), id);
+ return tdsqlite3_column_int(stmt_.get(), id);
}
int64 SqliteStatement::view_int64(int id) {
LOG_IF(ERROR, view_datatype(id) != Datatype::Integer) << view_datatype(id);
- return sqlite3_column_int64(stmt_.get(), id);
+ return tdsqlite3_column_int64(stmt_.get(), id);
}
SqliteStatement::Datatype SqliteStatement::view_datatype(int id) {
- auto type = sqlite3_column_type(stmt_.get(), id);
+ auto type = tdsqlite3_column_type(stmt_.get(), id);
switch (type) {
case SQLITE_INTEGER:
return Datatype::Integer;
@@ -168,36 +170,36 @@ SqliteStatement::Datatype SqliteStatement::view_datatype(int id) {
}
void SqliteStatement::reset() {
- sqlite3_reset(stmt_.get());
- state_ = Start;
+ tdsqlite3_reset(stmt_.get());
+ state_ = State::Start;
}
Status SqliteStatement::step() {
- if (state_ == Finish) {
+ if (state_ == State::Finish) {
return Status::Error("One has to reset statement");
}
- VLOG(sqlite) << "Start step " << tag("cmd", sqlite3_sql(stmt_.get())) << tag("stmt", stmt_.get())
- << tag("db", db_.get());
- auto rc = sqlite3_step(stmt_.get());
- VLOG(sqlite) << "Finish step " << tag("cmd", sqlite3_sql(stmt_.get())) << tag("stmt", stmt_.get())
- << tag("db", db_.get());
+ VLOG(sqlite) << "Start step " << tag("query", tdsqlite3_sql(stmt_.get())) << tag("statement", stmt_.get())
+ << tag("database", db_.get());
+ auto rc = tdsqlite3_step(stmt_.get());
+ VLOG(sqlite) << "Finish step with response " << (rc == SQLITE_ROW ? "ROW" : (rc == SQLITE_DONE ? "DONE" : "ERROR"));
if (rc == SQLITE_ROW) {
- state_ = GotRow;
+ state_ = State::GotRow;
return Status::OK();
}
+
+ state_ = State::Finish;
if (rc == SQLITE_DONE) {
- state_ = Finish;
return Status::OK();
}
- state_ = Finish;
return last_error();
}
-void SqliteStatement::StmtDeleter::operator()(sqlite3_stmt *stmt) {
- sqlite3_finalize(stmt);
+void SqliteStatement::StmtDeleter::operator()(tdsqlite3_stmt *stmt) {
+ tdsqlite3_finalize(stmt);
}
Status SqliteStatement::last_error() {
return db_->last_error();
}
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.h
index 2e2182ff7e..35148508ce 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.h
@@ -1,24 +1,28 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
+#include "td/db/detail/RawSqliteDb.h"
+
#include "td/utils/common.h"
+#include "td/utils/logging.h"
+#include "td/utils/ScopeGuard.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
-#include "td/db/detail/RawSqliteDb.h"
-
#include <memory>
-struct sqlite3;
-struct sqlite3_stmt;
+struct tdsqlite3;
+struct tdsqlite3_stmt;
namespace td {
+extern int VERBOSITY_NAME(sqlite);
+
class SqliteStatement {
public:
SqliteStatement() = default;
@@ -44,10 +48,10 @@ class SqliteStatement {
Result<string> explain();
bool can_step() const {
- return state_ != Finish;
+ return state_ != State::Finish;
}
bool has_row() const {
- return state_ == GotRow;
+ return state_ == State::GotRow;
}
bool empty() const {
return !stmt_;
@@ -56,25 +60,29 @@ class SqliteStatement {
void reset();
auto guard() {
- return ScopeExit{} + [this] { this->reset(); };
+ return ScopeExit{} + [this] {
+ this->reset();
+ };
}
// TODO get row
private:
friend class SqliteDb;
- SqliteStatement(sqlite3_stmt *stmt, std::shared_ptr<detail::RawSqliteDb> db);
+ SqliteStatement(tdsqlite3_stmt *stmt, std::shared_ptr<detail::RawSqliteDb> db);
class StmtDeleter {
public:
- void operator()(sqlite3_stmt *stmt);
+ void operator()(tdsqlite3_stmt *stmt);
};
- enum { Start, GotRow, Finish } state_ = Start;
+ enum class State { Start, GotRow, Finish };
+ State state_ = State::Start;
- std::unique_ptr<sqlite3_stmt, StmtDeleter> stmt_;
+ std::unique_ptr<tdsqlite3_stmt, StmtDeleter> stmt_;
std::shared_ptr<detail::RawSqliteDb> db_;
Status last_error();
};
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.cpp
new file mode 100644
index 0000000000..c94d7defaa
--- /dev/null
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.cpp
@@ -0,0 +1,558 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/db/TQueue.h"
+
+#include "td/db/binlog/Binlog.h"
+#include "td/db/binlog/BinlogEvent.h"
+#include "td/db/binlog/BinlogHelper.h"
+#include "td/db/binlog/BinlogInterface.h"
+
+#include "td/utils/FlatHashMap.h"
+#include "td/utils/logging.h"
+#include "td/utils/misc.h"
+#include "td/utils/Random.h"
+#include "td/utils/StorerBase.h"
+#include "td/utils/Time.h"
+#include "td/utils/tl_helpers.h"
+#include "td/utils/tl_parsers.h"
+#include "td/utils/tl_storers.h"
+
+#include <set>
+
+namespace td {
+
+using EventId = TQueue::EventId;
+
+EventId::EventId() {
+}
+
+Result<EventId> EventId::from_int32(int32 id) {
+ if (!is_valid_id(id)) {
+ return Status::Error("Invalid ID");
+ }
+ return EventId(id);
+}
+
+bool EventId::is_valid() const {
+ return !empty() && is_valid_id(id_);
+}
+
+int32 EventId::value() const {
+ return id_;
+}
+
+Result<EventId> EventId::next() const {
+ return from_int32(id_ + 1);
+}
+
+Result<EventId> EventId::advance(size_t offset) const {
+ TRY_RESULT(new_id, narrow_cast_safe<int32>(id_ + offset));
+ return from_int32(new_id);
+}
+
+bool EventId::empty() const {
+ return id_ == 0;
+}
+
+bool EventId::operator==(const EventId &other) const {
+ return id_ == other.id_;
+}
+
+bool EventId::operator!=(const EventId &other) const {
+ return !(*this == other);
+}
+
+bool EventId::operator<(const EventId &other) const {
+ return id_ < other.id_;
+}
+
+StringBuilder &operator<<(StringBuilder &string_builder, EventId id) {
+ return string_builder << "EventId{" << id.value() << "}";
+}
+
+EventId::EventId(int32 id) : id_(id) {
+ CHECK(is_valid_id(id));
+}
+
+bool EventId::is_valid_id(int32 id) {
+ return 0 <= id && id < MAX_ID;
+}
+
+class TQueueImpl final : public TQueue {
+ static constexpr size_t MAX_EVENT_LENGTH = 65536 * 8;
+ static constexpr size_t MAX_QUEUE_EVENTS = 100000;
+ static constexpr size_t MAX_TOTAL_EVENT_LENGTH = 1 << 27;
+
+ public:
+ void set_callback(unique_ptr<StorageCallback> callback) final {
+ callback_ = std::move(callback);
+ }
+ unique_ptr<StorageCallback> extract_callback() final {
+ return std::move(callback_);
+ }
+
+ bool do_push(QueueId queue_id, RawEvent &&raw_event) final {
+ CHECK(raw_event.event_id.is_valid());
+ // raw_event.data can be empty when replaying binlog
+ if (raw_event.data.size() > MAX_EVENT_LENGTH || queue_id == 0) {
+ return false;
+ }
+ auto &q = queues_[queue_id];
+ if (q.events.size() >= MAX_QUEUE_EVENTS || q.total_event_length > MAX_TOTAL_EVENT_LENGTH - raw_event.data.size() ||
+ raw_event.expires_at <= 0) {
+ return false;
+ }
+ auto event_id = raw_event.event_id;
+ if (event_id < q.tail_id) {
+ return false;
+ }
+
+ if (!q.events.empty()) {
+ auto it = q.events.end();
+ --it;
+ if (it->second.data.empty()) {
+ if (callback_ != nullptr && it->second.log_event_id != 0) {
+ callback_->pop(it->second.log_event_id);
+ }
+ q.events.erase(it);
+ }
+ }
+ if (q.events.empty() && !raw_event.data.empty()) {
+ schedule_queue_gc(queue_id, q, raw_event.expires_at);
+ }
+
+ if (raw_event.log_event_id == 0 && callback_ != nullptr) {
+ raw_event.log_event_id = callback_->push(queue_id, raw_event);
+ }
+ q.tail_id = event_id.next().move_as_ok();
+ q.total_event_length += raw_event.data.size();
+ q.events.emplace(event_id, std::move(raw_event));
+ return true;
+ }
+
+ Result<EventId> push(QueueId queue_id, string data, int32 expires_at, int64 extra, EventId hint_new_id) final {
+ if (data.empty()) {
+ return Status::Error("Data is empty");
+ }
+ if (data.size() > MAX_EVENT_LENGTH) {
+ return Status::Error("Data is too big");
+ }
+ if (queue_id == 0) {
+ return Status::Error("Queue identifier is invalid");
+ }
+
+ auto &q = queues_[queue_id];
+ if (q.events.size() >= MAX_QUEUE_EVENTS) {
+ return Status::Error("Queue is full");
+ }
+ if (q.total_event_length > MAX_TOTAL_EVENT_LENGTH - data.size()) {
+ return Status::Error("Queue size is too big");
+ }
+ if (expires_at <= 0) {
+ return Status::Error("Failed to add already expired event");
+ }
+ EventId event_id;
+ while (true) {
+ if (q.tail_id.empty()) {
+ if (hint_new_id.empty()) {
+ q.tail_id = EventId::from_int32(
+ Random::fast(2 * max(static_cast<int>(MAX_QUEUE_EVENTS), 1000000) + 1, EventId::MAX_ID / 2))
+ .move_as_ok();
+ } else {
+ q.tail_id = hint_new_id;
+ }
+ }
+ event_id = q.tail_id;
+ CHECK(event_id.is_valid());
+ if (event_id.next().is_ok()) {
+ break;
+ }
+ for (auto it = q.events.begin(); it != q.events.end();) {
+ pop(q, queue_id, it, {});
+ }
+ q.tail_id = EventId();
+ CHECK(hint_new_id.next().is_ok());
+ }
+
+ RawEvent raw_event;
+ raw_event.event_id = event_id;
+ raw_event.data = std::move(data);
+ raw_event.expires_at = expires_at;
+ raw_event.extra = extra;
+ bool is_added = do_push(queue_id, std::move(raw_event));
+ CHECK(is_added);
+ return event_id;
+ }
+
+ EventId get_head(QueueId queue_id) const final {
+ auto it = queues_.find(queue_id);
+ if (it == queues_.end()) {
+ return EventId();
+ }
+ return get_queue_head(it->second);
+ }
+
+ EventId get_tail(QueueId queue_id) const final {
+ auto it = queues_.find(queue_id);
+ if (it == queues_.end()) {
+ return EventId();
+ }
+ auto &q = it->second;
+ return q.tail_id;
+ }
+
+ void forget(QueueId queue_id, EventId event_id) final {
+ auto q_it = queues_.find(queue_id);
+ if (q_it == queues_.end()) {
+ return;
+ }
+ auto &q = q_it->second;
+ auto it = q.events.find(event_id);
+ if (it == q.events.end()) {
+ return;
+ }
+ pop(q, queue_id, it, q.tail_id);
+ }
+
+ void clear(QueueId queue_id, size_t keep_count) final {
+ auto queue_it = queues_.find(queue_id);
+ if (queue_it == queues_.end()) {
+ return;
+ }
+ auto &q = queue_it->second;
+ auto size = get_size(q);
+ if (size <= keep_count) {
+ return;
+ }
+
+ auto start_time = Time::now();
+ auto total_event_length = q.total_event_length;
+
+ auto end_it = q.events.end();
+ for (size_t i = 0; i < keep_count; i++) {
+ --end_it;
+ }
+ for (auto it = q.events.begin(); it != end_it;) {
+ pop(q, queue_id, it, q.tail_id);
+ }
+
+ auto clear_time = Time::now() - start_time;
+ if (clear_time > 0.1) {
+ LOG(WARNING) << "Cleared " << (size - keep_count) << " TQueue events with total size "
+ << (total_event_length - q.total_event_length) << " in " << clear_time << " seconds";
+ }
+ }
+
+ Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now,
+ MutableSpan<Event> &result_events) final {
+ auto it = queues_.find(queue_id);
+ if (it == queues_.end()) {
+ result_events.truncate(0);
+ return 0;
+ }
+ auto &q = it->second;
+ // Some sanity checks
+ if (from_id.value() > q.tail_id.value() + 10) {
+ return Status::Error("Specified from_id is in the future");
+ }
+ if (from_id.value() < get_queue_head(q).value() - static_cast<int32>(MAX_QUEUE_EVENTS)) {
+ return Status::Error("Specified from_id is in the past");
+ }
+
+ do_get(queue_id, q, from_id, forget_previous, unix_time_now, result_events);
+ return get_size(q);
+ }
+
+ std::pair<int64, bool> run_gc(int32 unix_time_now) final {
+ int64 deleted_events = 0;
+ auto max_finish_time = Time::now() + 0.05;
+ int64 counter = 0;
+ while (!queue_gc_at_.empty()) {
+ auto it = queue_gc_at_.begin();
+ if (it->first >= unix_time_now) {
+ break;
+ }
+ auto queue_id = it->second;
+ auto &q = queues_[queue_id];
+ CHECK(q.gc_at == it->first);
+ int32 new_gc_at = 0;
+
+ if (!q.events.empty()) {
+ size_t size_before = get_size(q);
+ for (auto event_it = q.events.begin(); event_it != q.events.end();) {
+ auto &event = event_it->second;
+ if ((++counter & 128) == 0 && Time::now() >= max_finish_time) {
+ if (new_gc_at == 0) {
+ new_gc_at = event.expires_at;
+ }
+ break;
+ }
+ if (event.expires_at < unix_time_now || event.data.empty()) {
+ pop(q, queue_id, event_it, q.tail_id);
+ } else {
+ if (new_gc_at != 0) {
+ break;
+ }
+ new_gc_at = event.expires_at;
+ ++event_it;
+ }
+ }
+ size_t size_after = get_size(q);
+ CHECK(size_after <= size_before);
+ deleted_events += size_before - size_after;
+ }
+ schedule_queue_gc(queue_id, q, new_gc_at);
+ if (Time::now() >= max_finish_time) {
+ return {deleted_events, false};
+ }
+ }
+ return {deleted_events, true};
+ }
+
+ size_t get_size(QueueId queue_id) const final {
+ auto it = queues_.find(queue_id);
+ if (it == queues_.end()) {
+ return 0;
+ }
+ return get_size(it->second);
+ }
+
+ void close(Promise<> promise) final {
+ if (callback_ != nullptr) {
+ callback_->close(std::move(promise));
+ callback_ = nullptr;
+ }
+ }
+
+ private:
+ struct Queue {
+ EventId tail_id;
+ std::map<EventId, RawEvent> events;
+ size_t total_event_length = 0;
+ int32 gc_at = 0;
+ };
+
+ FlatHashMap<QueueId, Queue> queues_;
+ std::set<std::pair<int32, QueueId>> queue_gc_at_;
+ unique_ptr<StorageCallback> callback_;
+
+ static EventId get_queue_head(const Queue &q) {
+ if (q.events.empty()) {
+ return q.tail_id;
+ }
+ return q.events.begin()->first;
+ }
+
+ static size_t get_size(const Queue &q) {
+ if (q.events.empty()) {
+ return 0;
+ }
+
+ return q.events.size() - (q.events.rbegin()->second.data.empty() ? 1 : 0);
+ }
+
+ void pop(Queue &q, QueueId queue_id, std::map<EventId, RawEvent>::iterator &it, EventId tail_id) {
+ auto &event = it->second;
+ if (callback_ == nullptr || event.log_event_id == 0) {
+ remove_event(q, it);
+ return;
+ }
+
+ if (event.event_id.next().ok() == tail_id) {
+ if (!event.data.empty()) {
+ clear_event_data(q, event);
+ callback_->push(queue_id, event);
+ }
+ ++it;
+ } else {
+ callback_->pop(event.log_event_id);
+ remove_event(q, it);
+ }
+ }
+
+ static void remove_event(Queue &q, std::map<EventId, RawEvent>::iterator &it) {
+ q.total_event_length -= it->second.data.size();
+ it = q.events.erase(it);
+ }
+
+ static void clear_event_data(Queue &q, RawEvent &event) {
+ q.total_event_length -= event.data.size();
+ event.data = {};
+ }
+
+ void do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, int32 unix_time_now,
+ MutableSpan<Event> &result_events) {
+ if (forget_previous) {
+ for (auto it = q.events.begin(); it != q.events.end() && it->first < from_id;) {
+ pop(q, queue_id, it, q.tail_id);
+ }
+ }
+
+ size_t ready_n = 0;
+ for (auto it = q.events.lower_bound(from_id); it != q.events.end();) {
+ auto &event = it->second;
+ if (event.expires_at < unix_time_now || event.data.empty()) {
+ pop(q, queue_id, it, q.tail_id);
+ } else {
+ CHECK(!(event.event_id < from_id));
+ if (ready_n == result_events.size()) {
+ break;
+ }
+
+ auto &to = result_events[ready_n];
+ to.data = event.data;
+ to.id = event.event_id;
+ to.expires_at = event.expires_at;
+ to.extra = event.extra;
+ ready_n++;
+ ++it;
+ }
+ }
+
+ result_events.truncate(ready_n);
+ }
+
+ void schedule_queue_gc(QueueId queue_id, Queue &q, int32 gc_at) {
+ if (q.gc_at != 0) {
+ bool is_deleted = queue_gc_at_.erase({q.gc_at, queue_id}) > 0;
+ CHECK(is_deleted);
+ }
+ q.gc_at = gc_at;
+ if (q.gc_at != 0) {
+ bool is_inserted = queue_gc_at_.emplace(gc_at, queue_id).second;
+ CHECK(is_inserted);
+ }
+ }
+};
+
+unique_ptr<TQueue> TQueue::create() {
+ return make_unique<TQueueImpl>();
+}
+
+struct TQueueLogEvent final : public Storer {
+ int64 queue_id;
+ int32 event_id;
+ int32 expires_at;
+ Slice data;
+ int64 extra;
+
+ template <class StorerT>
+ void store(StorerT &&storer) const {
+ using td::store;
+ store(queue_id, storer);
+ store(event_id, storer);
+ store(expires_at, storer);
+ store(data, storer);
+ if (extra != 0) {
+ store(extra, storer);
+ }
+ }
+
+ template <class ParserT>
+ void parse(ParserT &&parser, int32 has_extra) {
+ using td::parse;
+ parse(queue_id, parser);
+ parse(event_id, parser);
+ parse(expires_at, parser);
+ data = parser.template fetch_string<Slice>();
+ if (has_extra == 0) {
+ extra = 0;
+ } else {
+ parse(extra, parser);
+ }
+ }
+
+ size_t size() const final {
+ TlStorerCalcLength storer;
+ store(storer);
+ return storer.get_length();
+ }
+
+ size_t store(uint8 *ptr) const final {
+ TlStorerUnsafe storer(ptr);
+ store(storer);
+ return static_cast<size_t>(storer.get_buf() - ptr);
+ }
+};
+
+template <class BinlogT>
+uint64 TQueueBinlog<BinlogT>::push(QueueId queue_id, const RawEvent &event) {
+ TQueueLogEvent log_event;
+ log_event.queue_id = queue_id;
+ log_event.event_id = event.event_id.value();
+ log_event.expires_at = event.expires_at;
+ log_event.data = event.data;
+ log_event.extra = event.extra;
+ auto magic = BINLOG_EVENT_TYPE + (log_event.extra != 0);
+ if (event.log_event_id == 0) {
+ return binlog_->add(magic, log_event);
+ }
+ binlog_->rewrite(event.log_event_id, magic, log_event);
+ return event.log_event_id;
+}
+
+template <class BinlogT>
+void TQueueBinlog<BinlogT>::pop(uint64 log_event_id) {
+ binlog_->erase(log_event_id);
+}
+
+template <class BinlogT>
+Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q) const {
+ TQueueLogEvent event;
+ TlParser parser(binlog_event.data_);
+ int32 has_extra = binlog_event.type_ - BINLOG_EVENT_TYPE;
+ if (has_extra != 0 && has_extra != 1) {
+ return Status::Error("Wrong magic");
+ }
+ event.parse(parser, has_extra);
+ parser.fetch_end();
+ TRY_STATUS(parser.get_status());
+ TRY_RESULT(event_id, EventId::from_int32(event.event_id));
+ RawEvent raw_event;
+ raw_event.log_event_id = binlog_event.id_;
+ raw_event.event_id = event_id;
+ raw_event.expires_at = event.expires_at;
+ raw_event.data = event.data.str();
+ raw_event.extra = event.extra;
+ if (!q.do_push(event.queue_id, std::move(raw_event))) {
+ return Status::Error("Failed to add event");
+ }
+ return Status::OK();
+}
+
+template <class BinlogT>
+void TQueueBinlog<BinlogT>::close(Promise<> promise) {
+ binlog_->close(std::move(promise));
+}
+
+template class TQueueBinlog<BinlogInterface>;
+template class TQueueBinlog<Binlog>;
+
+uint64 TQueueMemoryStorage::push(QueueId queue_id, const RawEvent &event) {
+ auto log_event_id = event.log_event_id == 0 ? next_log_event_id_++ : event.log_event_id;
+ events_[log_event_id] = std::make_pair(queue_id, event);
+ return log_event_id;
+}
+
+void TQueueMemoryStorage::pop(uint64 log_event_id) {
+ events_.erase(log_event_id);
+}
+
+void TQueueMemoryStorage::replay(TQueue &q) const {
+ for (auto &e : events_) {
+ auto x = e.second;
+ x.second.log_event_id = e.first;
+ bool is_added = q.do_push(x.first, std::move(x.second));
+ CHECK(is_added);
+ }
+}
+void TQueueMemoryStorage::close(Promise<> promise) {
+ events_.clear();
+ promise.set_value({});
+}
+
+} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.h b/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.h
new file mode 100644
index 0000000000..117726851b
--- /dev/null
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.h
@@ -0,0 +1,155 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/utils/common.h"
+#include "td/utils/Promise.h"
+#include "td/utils/Slice.h"
+#include "td/utils/Span.h"
+#include "td/utils/Status.h"
+#include "td/utils/StringBuilder.h"
+
+#include <map>
+#include <memory>
+#include <utility>
+
+namespace td {
+
+class TQueue {
+ public:
+ class EventId {
+ public:
+ static constexpr int32 MAX_ID = 2000000000;
+
+ EventId();
+
+ static Result<EventId> from_int32(int32 id);
+
+ bool is_valid() const;
+
+ int32 value() const;
+
+ Result<EventId> next() const;
+
+ Result<EventId> advance(size_t offset) const;
+
+ bool empty() const;
+
+ bool operator==(const EventId &other) const;
+ bool operator!=(const EventId &other) const;
+ bool operator<(const EventId &other) const;
+
+ private:
+ int32 id_{0};
+
+ explicit EventId(int32 id);
+
+ static bool is_valid_id(int32 id);
+ };
+
+ struct Event {
+ EventId id;
+ int32 expires_at{0};
+ Slice data;
+ int64 extra{0};
+ };
+
+ struct RawEvent {
+ uint64 log_event_id{0};
+ EventId event_id;
+ string data;
+ int64 extra{0};
+ int32 expires_at{0};
+ };
+
+ using QueueId = int64;
+
+ class StorageCallback {
+ public:
+ using QueueId = TQueue::QueueId;
+ using RawEvent = TQueue::RawEvent;
+
+ StorageCallback() = default;
+ StorageCallback(const StorageCallback &) = delete;
+ StorageCallback &operator=(const StorageCallback &) = delete;
+ StorageCallback(StorageCallback &&) = delete;
+ StorageCallback &operator=(StorageCallback &&) = delete;
+ virtual ~StorageCallback() = default;
+
+ virtual uint64 push(QueueId queue_id, const RawEvent &event) = 0;
+ virtual void pop(uint64 log_event_id) = 0;
+ virtual void close(Promise<> promise) = 0;
+ };
+
+ static unique_ptr<TQueue> create();
+
+ TQueue() = default;
+ TQueue(const TQueue &) = delete;
+ TQueue &operator=(const TQueue &) = delete;
+ TQueue(TQueue &&) = delete;
+ TQueue &operator=(TQueue &&) = delete;
+
+ virtual ~TQueue() = default;
+
+ virtual void set_callback(unique_ptr<StorageCallback> callback) = 0;
+ virtual unique_ptr<StorageCallback> extract_callback() = 0;
+
+ virtual bool do_push(QueueId queue_id, RawEvent &&raw_event) = 0;
+
+ virtual Result<EventId> push(QueueId queue_id, string data, int32 expires_at, int64 extra, EventId hint_new_id) = 0;
+
+ virtual void forget(QueueId queue_id, EventId event_id) = 0;
+
+ virtual void clear(QueueId queue_id, size_t keep_count) = 0;
+
+ virtual EventId get_head(QueueId queue_id) const = 0;
+ virtual EventId get_tail(QueueId queue_id) const = 0;
+
+ virtual Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now,
+ MutableSpan<Event> &result_events) = 0;
+
+ virtual size_t get_size(QueueId queue_id) const = 0;
+
+ // returns number of deleted events and whether garbage collection was completed
+ virtual std::pair<int64, bool> run_gc(int32 unix_time_now) = 0;
+ virtual void close(Promise<> promise) = 0;
+};
+
+StringBuilder &operator<<(StringBuilder &string_builder, TQueue::EventId id);
+
+struct BinlogEvent;
+
+template <class BinlogT>
+class TQueueBinlog final : public TQueue::StorageCallback {
+ public:
+ uint64 push(QueueId queue_id, const RawEvent &event) final;
+ void pop(uint64 log_event_id) final;
+ Status replay(const BinlogEvent &binlog_event, TQueue &q) const TD_WARN_UNUSED_RESULT;
+
+ void set_binlog(std::shared_ptr<BinlogT> binlog) {
+ binlog_ = std::move(binlog);
+ }
+ void close(Promise<> promise) final;
+
+ private:
+ std::shared_ptr<BinlogT> binlog_;
+ static constexpr int32 BINLOG_EVENT_TYPE = 2314;
+};
+
+class TQueueMemoryStorage final : public TQueue::StorageCallback {
+ public:
+ uint64 push(QueueId queue_id, const RawEvent &event) final;
+ void pop(uint64 log_event_id) final;
+ void replay(TQueue &q) const;
+ void close(Promise<> promise) final;
+
+ private:
+ uint64 next_log_event_id_{1};
+ std::map<uint64, std::pair<QueueId, RawEvent>> events_;
+};
+
+} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/TsSeqKeyValue.h b/protocols/Telegram/tdlib/td/tddb/td/db/TsSeqKeyValue.h
index 8d94d79673..5663bcdfff 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/TsSeqKeyValue.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/TsSeqKeyValue.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -8,6 +8,7 @@
#include "td/db/SeqKeyValue.h"
+#include "td/utils/HashTableUtils.h"
#include "td/utils/port/RwMutex.h"
#include "td/utils/Slice.h"
@@ -15,6 +16,7 @@
#include <utility>
namespace td {
+
class TsSeqKeyValue {
public:
using SeqNo = SeqKeyValue::SeqNo;
@@ -32,30 +34,42 @@ class TsSeqKeyValue {
auto lock = rw_mutex_.lock_write().move_as_ok();
return kv_.set(key, value);
}
+
std::pair<SeqNo, RwMutex::WriteLock> set_and_lock(Slice key, Slice value) {
auto lock = rw_mutex_.lock_write().move_as_ok();
return std::make_pair(kv_.set(key, value), std::move(lock));
}
+
SeqNo erase(const string &key) {
auto lock = rw_mutex_.lock_write().move_as_ok();
return kv_.erase(key);
}
+
std::pair<SeqNo, RwMutex::WriteLock> erase_and_lock(const string &key) {
auto lock = rw_mutex_.lock_write().move_as_ok();
return std::make_pair(kv_.erase(key), std::move(lock));
}
- string get(const string &key) {
+
+ string get(const string &key) const {
auto lock = rw_mutex_.lock_read().move_as_ok();
return kv_.get(key);
}
+
+ bool isset(const string &key) const {
+ auto lock = rw_mutex_.lock_read().move_as_ok();
+ return kv_.isset(key);
+ }
+
size_t size() const {
return kv_.size();
}
- std::unordered_map<string, string> get_all() {
+
+ std::unordered_map<string, string, Hash<string>> get_all() const {
auto lock = rw_mutex_.lock_write().move_as_ok();
return kv_.get_all();
}
- // not thread safe method
+
+ // non-thread-safe method
SeqKeyValue &inner() {
return kv_;
}
@@ -65,7 +79,8 @@ class TsSeqKeyValue {
}
private:
- RwMutex rw_mutex_;
+ mutable RwMutex rw_mutex_;
SeqKeyValue kv_;
};
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.cpp
index 5d76028dba..5b14eed69d 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.cpp
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.cpp
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -11,14 +11,16 @@
#include "td/utils/buffer.h"
#include "td/utils/format.h"
-#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/port/Clocks.h"
-#include "td/utils/port/Fd.h"
+#include "td/utils/port/FileFd.h"
#include "td/utils/port/path.h"
+#include "td/utils/port/PollFlags.h"
+#include "td/utils/port/sleep.h"
#include "td/utils/port/Stat.h"
#include "td/utils/Random.h"
#include "td/utils/ScopeGuard.h"
+#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include "td/utils/Time.h"
#include "td/utils/tl_helpers.h"
@@ -53,7 +55,7 @@ struct AesCtrEncryptionEvent {
BufferSlice iv_;
BufferSlice key_hash_;
- BufferSlice generate_key(const DbKey &db_key) {
+ BufferSlice generate_key(const DbKey &db_key) const {
CHECK(!db_key.is_empty());
BufferSlice key(key_size());
size_t iteration_count = kdf_iteration_count();
@@ -63,7 +65,8 @@ struct AesCtrEncryptionEvent {
pbkdf2_sha256(db_key.data(), key_salt_.as_slice(), narrow_cast<int>(iteration_count), key.as_slice());
return key;
}
- BufferSlice generate_hash(Slice key) {
+
+ static BufferSlice generate_hash(Slice key) {
BufferSlice hash(hash_size());
hmac_sha256(key, "cucumbers everywhere", hash.as_slice());
return hash;
@@ -91,18 +94,23 @@ struct AesCtrEncryptionEvent {
class BinlogReader {
public:
- BinlogReader() = default;
explicit BinlogReader(ChainBufferReader *input) : input_(input) {
}
- void set_input(ChainBufferReader *input) {
+ void set_input(ChainBufferReader *input, bool is_encrypted, int64 expected_size) {
input_ = input;
+ is_encrypted_ = is_encrypted;
+ expected_size_ = expected_size;
+ }
+
+ ChainBufferReader *input() {
+ return input_;
}
- int64 offset() {
+ int64 offset() const {
return offset_;
}
Result<size_t> read_next(BinlogEvent *event) {
- if (state_ == ReadLength) {
+ if (state_ == State::ReadLength) {
if (input_->size() < 4) {
return 4;
}
@@ -112,35 +120,52 @@ class BinlogReader {
it.advance(4, MutableSlice(buf, 4));
size_ = static_cast<size_t>(TlParser(Slice(buf, 4)).fetch_int());
- if (size_ > MAX_EVENT_SIZE) {
+ if (size_ > BinlogEvent::MAX_SIZE) {
return Status::Error(PSLICE() << "Too big event " << tag("size", size_));
}
- if (size_ < MIN_EVENT_SIZE) {
+ if (size_ < BinlogEvent::MIN_SIZE) {
return Status::Error(PSLICE() << "Too small event " << tag("size", size_));
}
- state_ = ReadEvent;
+ if (size_ % 4 != 0) {
+ return Status::Error(-2, PSLICE() << "Event of size " << size_ << " at offset " << offset() << " out of "
+ << expected_size_ << ' ' << tag("is_encrypted", is_encrypted_)
+ << format::as_hex_dump<4>(Slice(input_->prepare_read().truncate(28))));
+ }
+ state_ = State::ReadEvent;
}
if (input_->size() < size_) {
return size_;
}
+ event->debug_info_ = BinlogDebugInfo{__FILE__, __LINE__};
TRY_STATUS(event->init(input_->cut_head(size_).move_as_buffer_slice()));
offset_ += size_;
event->offset_ = offset_;
- state_ = ReadLength;
+ state_ = State::ReadLength;
return 0;
}
private:
ChainBufferReader *input_;
- enum { ReadLength, ReadEvent } state_ = ReadLength;
+ enum class State { ReadLength, ReadEvent };
+ State state_ = State::ReadLength;
size_t size_{0};
int64 offset_{0};
+ int64 expected_size_{0};
+ bool is_encrypted_{false};
};
+
+static int64 file_size(CSlice path) {
+ auto r_stat = stat(path);
+ if (r_stat.is_error()) {
+ return 0;
+ }
+ return r_stat.ok().size_;
+}
} // namespace detail
-bool Binlog::IGNORE_ERASE_HACK = false;
+int32 VERBOSITY_NAME(binlog) = VERBOSITY_NAME(DEBUG) + 8;
Binlog::Binlog() = default;
@@ -148,9 +173,9 @@ Binlog::~Binlog() {
close().ignore();
}
-Result<FileFd> Binlog::open_binlog(CSlice path, int32 flags) {
+Result<FileFd> Binlog::open_binlog(const string &path, int32 flags) {
TRY_RESULT(fd, FileFd::open(path, flags));
- TRY_STATUS(fd.lock(FileFd::LockFlags::Write, 100));
+ TRY_STATUS(fd.lock(FileFd::LockFlags::Write, path, 100));
return std::move(fd);
}
@@ -161,9 +186,9 @@ Status Binlog::init(string path, const Callback &callback, DbKey db_key, DbKey o
db_key_ = std::move(db_key);
old_db_key_ = std::move(old_db_key);
- processor_ = std::make_unique<detail::BinlogEventsProcessor>();
+ processor_ = make_unique<detail::BinlogEventsProcessor>();
// Turn off BinlogEventsBuffer
- // events_buffer_ = std::make_unique<detail::BinlogEventsBuffer>();
+ // events_buffer_ = make_unique<detail::BinlogEventsBuffer>();
// try to restore binlog from regenerated version
if (stat(path).is_error()) {
@@ -200,6 +225,10 @@ Status Binlog::init(string path, const Callback &callback, DbKey db_key, DbKey o
}
void Binlog::add_event(BinlogEvent &&event) {
+ if (event.size_ % 4 != 0) {
+ LOG(FATAL) << "Trying to add event with bad size " << event.public_to_string();
+ }
+
if (!events_buffer_) {
do_add_event(std::move(event));
} else {
@@ -215,7 +244,7 @@ void Binlog::add_event(BinlogEvent &&event) {
auto need_reindex = [&](int64 min_size, int rate) {
return fd_size > min_size && fd_size / rate > processor_->total_raw_events_size();
};
- if (need_reindex(100000, 5) || need_reindex(500000, 2)) {
+ if (need_reindex(50000, 5) || need_reindex(100000, 4) || need_reindex(300000, 3) || need_reindex(500000, 2)) {
LOG(INFO) << tag("fd_size", format::as_size(fd_size))
<< tag("total events size", format::as_size(processor_->total_raw_events_size()));
do_reindex();
@@ -254,20 +283,25 @@ Status Binlog::close(bool need_sync) {
if (fd_.empty()) {
return Status::OK();
}
- SCOPE_EXIT {
- path_ = "";
- info_.is_opened = false;
- fd_.close();
- need_sync_ = false;
- };
if (need_sync) {
sync();
} else {
flush();
}
+
+ fd_.lock(FileFd::LockFlags::Unlock, path_, 1).ensure();
+ fd_.close();
+ path_.clear();
+ info_.is_opened = false;
+ need_sync_ = false;
return Status::OK();
}
+void Binlog::close(Promise<> promise) {
+ TRY_STATUS_PROMISE(promise, close());
+ promise.set_value({});
+}
+
void Binlog::change_key(DbKey new_db_key) {
db_key_ = std::move(new_db_key);
aes_ctr_key_salt_ = BufferSlice();
@@ -280,18 +314,24 @@ Status Binlog::close_and_destroy() {
destroy(path).ignore();
return close_status;
}
+
Status Binlog::destroy(Slice path) {
+ unlink(PSLICE() << path << ".new").ignore(); // delete regenerated version first to avoid it becoming main version
unlink(PSLICE() << path).ignore();
- unlink(PSLICE() << path << ".new").ignore();
return Status::OK();
}
void Binlog::do_event(BinlogEvent &&event) {
- fd_events_++;
- fd_size_ += event.raw_event_.size();
+ auto event_size = event.raw_event_.size();
if (state_ == State::Run || state_ == State::Reindex) {
- VLOG(binlog) << "Write binlog event: " << format::cond(state_ == State::Reindex, "[reindex] ") << event;
+ auto validate_status = event.validate();
+ if (validate_status.is_error()) {
+ LOG(FATAL) << "Failed to validate binlog event " << validate_status << " "
+ << format::as_hex_dump<4>(Slice(event.raw_event_.as_slice().truncate(28)));
+ }
+ VLOG(binlog) << "Write binlog event: " << format::cond(state_ == State::Reindex, "[reindex] ")
+ << event.public_to_string();
switch (encryption_type_) {
case EncryptionType::None: {
buffer_writer_.append(event.raw_event_.clone());
@@ -311,16 +351,18 @@ void Binlog::do_event(BinlogEvent &&event) {
BufferSlice key;
if (aes_ctr_key_salt_.as_slice() == encryption_event.key_salt_.as_slice()) {
- key = BufferSlice(Slice(aes_ctr_key_.raw, sizeof(aes_ctr_key_.raw)));
+ key = BufferSlice(as_slice(aes_ctr_key_));
} else if (!db_key_.is_empty()) {
key = encryption_event.generate_key(db_key_);
}
- if (encryption_event.generate_hash(key.as_slice()).as_slice() != encryption_event.key_hash_.as_slice()) {
+ if (detail::AesCtrEncryptionEvent::generate_hash(key.as_slice()).as_slice() !=
+ encryption_event.key_hash_.as_slice()) {
CHECK(state_ == State::Load);
if (!old_db_key_.is_empty()) {
key = encryption_event.generate_key(old_db_key_);
- if (encryption_event.generate_hash(key.as_slice()).as_slice() != encryption_event.key_hash_.as_slice()) {
+ if (detail::AesCtrEncryptionEvent::generate_hash(key.as_slice()).as_slice() !=
+ encryption_event.key_hash_.as_slice()) {
info_.wrong_password = true;
}
} else {
@@ -344,13 +386,31 @@ void Binlog::do_event(BinlogEvent &&event) {
update_write_encryption();
//LOG(INFO) << format::cond(state_ == State::Run, "Run", "Reindex") << ": init encryption";
}
- return;
}
}
if (state_ != State::Reindex) {
- processor_->add_event(std::move(event));
+ auto status = processor_->add_event(std::move(event));
+ if (status.is_error()) {
+ auto old_size = detail::file_size(path_);
+ auto data = debug_get_binlog_data(fd_size_, old_size);
+ if (state_ == State::Load) {
+ fd_.seek(fd_size_).ensure();
+ fd_.truncate_to_current_position(fd_size_).ensure();
+
+ if (data.empty()) {
+ return;
+ }
+ }
+
+ LOG(FATAL) << "Truncate binlog \"" << path_ << "\" from size " << old_size << " to size " << fd_size_
+ << " in state " << static_cast<int32>(state_) << " due to error: " << status << " after reading "
+ << data;
+ }
}
+
+ fd_events_++;
+ fd_size_ += event_size;
}
void Binlog::sync() {
@@ -396,7 +456,9 @@ void Binlog::update_read_encryption() {
CHECK(binlog_reader_ptr_);
switch (encryption_type_) {
case EncryptionType::None: {
- binlog_reader_ptr_->set_input(&buffer_reader_);
+ auto r_file_size = fd_.get_size();
+ r_file_size.ensure();
+ binlog_reader_ptr_->set_input(&buffer_reader_, false, r_file_size.ok());
byte_flow_flag_ = false;
break;
}
@@ -407,7 +469,9 @@ void Binlog::update_read_encryption() {
byte_flow_sink_ = ByteFlowSink();
byte_flow_source_ >> aes_xcode_byte_flow_ >> byte_flow_sink_;
byte_flow_flag_ = true;
- binlog_reader_ptr_->set_input(byte_flow_sink_.get_output());
+ auto r_file_size = fd_.get_size();
+ r_file_size.ensure();
+ binlog_reader_ptr_->set_input(byte_flow_sink_.get_output(), true, r_file_size.ok());
break;
}
}
@@ -439,67 +503,70 @@ Status Binlog::load_binlog(const Callback &callback, const Callback &debug_callb
buffer_writer_ = ChainBufferWriter();
buffer_reader_ = buffer_writer_.extract_reader();
fd_.set_input_writer(&buffer_writer_);
- detail::BinlogReader reader;
+ detail::BinlogReader reader{nullptr};
binlog_reader_ptr_ = &reader;
update_read_encryption();
- bool ready_flag = false;
- fd_.update_flags(Fd::Flag::Read);
+ fd_.get_poll_info().add_flags(PollFlags::Read());
info_.wrong_password = false;
while (true) {
BinlogEvent event;
auto r_need_size = reader.read_next(&event);
if (r_need_size.is_error()) {
+ if (r_need_size.error().code() == -2) {
+ auto old_size = detail::file_size(path_);
+ auto offset = reader.offset();
+ auto data = debug_get_binlog_data(offset, old_size);
+ fd_.seek(offset).ensure();
+ fd_.truncate_to_current_position(offset).ensure();
+ if (data.empty()) {
+ break;
+ }
+ LOG(FATAL) << "Truncate binlog \"" << path_ << "\" from size " << old_size << " to size " << offset
+ << " due to error: " << r_need_size.error() << " after reading " << data;
+ }
LOG(ERROR) << r_need_size.error();
break;
}
auto need_size = r_need_size.move_as_ok();
- // LOG(ERROR) << "need size = " << need_size;
+ // LOG(ERROR) << "Need size = " << need_size;
if (need_size == 0) {
- if (IGNORE_ERASE_HACK && event.type_ == BinlogEvent::ServiceTypes::Empty &&
- (event.flags_ & BinlogEvent::Flags::Rewrite) != 0) {
- // skip erase
- } else {
- if (debug_callback) {
- debug_callback(event);
- }
- do_add_event(std::move(event));
- if (info_.wrong_password) {
- return Status::OK();
- }
+ if (debug_callback) {
+ debug_callback(event);
}
- ready_flag = false;
- } else {
- // TODO(now): fix bug
- if (ready_flag) {
- break;
+ do_add_event(std::move(event));
+ if (info_.wrong_password) {
+ return Status::OK();
}
+ } else {
TRY_STATUS(fd_.flush_read(max(need_size, static_cast<size_t>(4096))));
buffer_reader_.sync_with_writer();
if (byte_flow_flag_) {
byte_flow_source_.wakeup();
}
- ready_flag = true;
+ if (reader.input()->size() < need_size) {
+ break;
+ }
}
}
auto offset = processor_->offset();
processor_->for_each([&](BinlogEvent &event) {
- VLOG(binlog) << "Replay binlog event: " << event;
+ VLOG(binlog) << "Replay binlog event: " << event.public_to_string();
if (callback) {
callback(event);
}
});
- auto fd_size = fd_.get_size();
+ TRY_RESULT(fd_size, fd_.get_size());
if (offset != fd_size) {
LOG(ERROR) << "Truncate " << tag("path", path_) << tag("old_size", fd_size) << tag("new_size", offset);
fd_.seek(offset).ensure();
fd_.truncate_to_current_position(offset).ensure();
db_key_used_ = false; // force reindex
}
- CHECK(IGNORE_ERASE_HACK || fd_size_ == offset) << fd_size << " " << fd_size_ << " " << offset;
+ LOG_CHECK(fd_size_ == offset) << fd_size << " " << fd_size_ << " " << offset;
binlog_reader_ptr_ = nullptr;
state_ = State::Run;
@@ -515,19 +582,11 @@ Status Binlog::load_binlog(const Callback &callback, const Callback &debug_callb
return Status::OK();
}
-static int64 file_size(CSlice path) {
- auto r_stat = stat(path);
- if (r_stat.is_error()) {
- return 0;
- }
- return r_stat.ok().size_;
-}
-
void Binlog::update_encryption(Slice key, Slice iv) {
- MutableSlice(aes_ctr_key_.raw, sizeof(aes_ctr_key_.raw)).copy_from(key);
+ as_slice(aes_ctr_key_).copy_from(key);
UInt128 aes_ctr_iv;
- MutableSlice(aes_ctr_iv.raw, sizeof(aes_ctr_iv.raw)).copy_from(iv);
- aes_ctr_state_.init(aes_ctr_key_, aes_ctr_iv);
+ as_slice(aes_ctr_iv).copy_from(iv);
+ aes_ctr_state_.init(as_slice(aes_ctr_key_), as_slice(aes_ctr_iv));
}
void Binlog::reset_encryption() {
@@ -550,15 +609,16 @@ void Binlog::reset_encryption() {
BufferSlice key;
if (aes_ctr_key_salt_.as_slice() == event.key_salt_.as_slice()) {
- key = BufferSlice(Slice(aes_ctr_key_.raw, sizeof(aes_ctr_key_.raw)));
+ key = BufferSlice(as_slice(aes_ctr_key_));
} else {
key = event.generate_key(db_key_);
}
- event.key_hash_ = event.generate_hash(key.as_slice());
+ event.key_hash_ = EncryptionEvent::generate_hash(key.as_slice());
do_event(BinlogEvent(
- BinlogEvent::create_raw(0, BinlogEvent::ServiceTypes::AesCtrEncryption, 0, create_default_storer(event))));
+ BinlogEvent::create_raw(0, BinlogEvent::ServiceTypes::AesCtrEncryption, 0, create_default_storer(event)),
+ BinlogDebugInfo{__FILE__, __LINE__}));
}
void Binlog::do_reindex() {
@@ -571,7 +631,7 @@ void Binlog::do_reindex() {
};
auto start_time = Clocks::monotonic();
- auto start_size = file_size(path_);
+ auto start_size = detail::file_size(path_);
auto start_events = fd_events_;
string new_path = path_ + ".new";
@@ -581,7 +641,7 @@ void Binlog::do_reindex() {
LOG(ERROR) << "Can't open new binlog for regenerate: " << r_opened_file.error();
return;
}
- fd_.close();
+ auto old_fd = std::move(fd_); // can't close fd_ now, because it will release file lock
fd_ = BufferedFdBase<FileFd>(r_opened_file.move_as_ok());
buffer_writer_ = ChainBufferWriter();
@@ -594,27 +654,49 @@ void Binlog::do_reindex() {
fd_events_ = 0;
reset_encryption();
processor_->for_each([&](BinlogEvent &event) {
+ event.realloc();
do_event(std::move(event)); // NB: no move is actually happens
});
- need_sync_ = true; // must sync creation of the file
+ need_sync_ = start_size != 0; // must sync creation of the file if it is non-empty
sync();
// finish_reindex
auto status = unlink(path_);
LOG_IF(FATAL, status.is_error()) << "Failed to unlink old binlog: " << status;
+ old_fd.close(); // now we can close old file and release the system lock
status = rename(new_path, path_);
+ FileFd::remove_local_lock(new_path); // now we can release local lock for temporary file
LOG_IF(FATAL, status.is_error()) << "Failed to rename binlog: " << status;
auto finish_time = Clocks::monotonic();
auto finish_size = fd_size_;
auto finish_events = fd_events_;
- CHECK(fd_size_ == file_size(path_));
+ for (int left_tries = 10; left_tries > 0; left_tries--) {
+ auto r_stat = stat(path_);
+ if (r_stat.is_error()) {
+ if (left_tries != 1) {
+ usleep_for(200000 / left_tries);
+ continue;
+ }
+ LOG(FATAL) << "Failed to rename binlog of size " << fd_size_ << " to " << path_ << ": " << r_stat.error()
+ << ". Temp file size is " << detail::file_size(new_path) << ", new size " << detail::file_size(path_);
+ }
+ LOG_CHECK(fd_size_ == r_stat.ok().size_) << fd_size_ << ' ' << r_stat.ok().size_ << ' '
+ << detail::file_size(new_path) << ' ' << fd_events_ << ' ' << path_;
+ break;
+ }
- // TODO: print warning only if time or ratio is suspicious
- double ratio = static_cast<double>(start_size) / static_cast<double>(finish_size + 1);
- LOG(INFO) << "regenerate index " << tag("name", path_) << tag("time", format::as_time(finish_time - start_time))
- << tag("before_size", format::as_size(start_size)) << tag("after_size", format::as_size(finish_size))
- << tag("ratio", ratio) << tag("before_events", start_events) << tag("after_events", finish_events);
+ auto ratio = static_cast<double>(start_size) / static_cast<double>(finish_size + 1);
+
+ [&](Slice msg) {
+ if (start_size > (10 << 20) || finish_time - start_time > 1) {
+ LOG(WARNING) << "Slow " << msg;
+ } else {
+ LOG(INFO) << msg;
+ }
+ }(PSLICE() << "Regenerate index " << tag("name", path_) << tag("time", format::as_time(finish_time - start_time))
+ << tag("before_size", format::as_size(start_size)) << tag("after_size", format::as_size(finish_size))
+ << tag("ratio", ratio) << tag("before_events", start_events) << tag("after_events", finish_events));
buffer_writer_ = ChainBufferWriter();
buffer_reader_ = buffer_writer_.extract_reader();
@@ -626,4 +708,58 @@ void Binlog::do_reindex() {
update_write_encryption();
}
+string Binlog::debug_get_binlog_data(int64 begin_offset, int64 end_offset) {
+ if (begin_offset > end_offset) {
+ return "Begin offset is bigger than end_offset";
+ }
+ if (begin_offset == end_offset) {
+ return string();
+ }
+
+ static int64 MAX_DATA_LENGTH = 512;
+ if (end_offset - begin_offset > MAX_DATA_LENGTH) {
+ end_offset = begin_offset + MAX_DATA_LENGTH;
+ }
+
+ auto r_fd = FileFd::open(path_, FileFd::Flags::Read);
+ if (r_fd.is_error()) {
+ return PSTRING() << "Failed to open binlog: " << r_fd.error();
+ }
+ auto fd = r_fd.move_as_ok();
+
+ fd_.lock(FileFd::LockFlags::Unlock, path_, 1).ignore();
+ SCOPE_EXIT {
+ fd_.lock(FileFd::LockFlags::Write, path_, 1).ensure();
+ };
+ auto expected_data_length = narrow_cast<size_t>(end_offset - begin_offset);
+ string data(expected_data_length, '\0');
+ auto r_data_size = fd.pread(data, begin_offset);
+ if (r_data_size.is_error()) {
+ return PSTRING() << "Failed to read binlog: " << r_data_size.error();
+ }
+
+ if (r_data_size.ok() < expected_data_length) {
+ data.resize(r_data_size.ok());
+ data = PSTRING() << format::as_hex_dump<4>(Slice(data)) << " | with " << expected_data_length - r_data_size.ok()
+ << " missed bytes";
+ } else {
+ if (encryption_type_ == EncryptionType::AesCtr) {
+ bool is_zero = true;
+ for (auto &c : data) {
+ if (c != '\0') {
+ is_zero = false;
+ }
+ }
+ // very often we have '\0' bytes written to disk instead of a real log event
+ // this is clearly impossible content for a real encrypted log event, so just ignore it
+ if (is_zero) {
+ return string();
+ }
+ }
+
+ data = PSTRING() << format::as_hex_dump<4>(Slice(data));
+ }
+ return data;
+}
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.h
index cdda868b4e..88dbacf58f 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -15,16 +15,23 @@
#include "td/utils/ByteFlow.h"
#include "td/utils/common.h"
#include "td/utils/crypto.h"
+#include "td/utils/logging.h"
#include "td/utils/port/FileFd.h"
+#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
+#include "td/utils/StorerBase.h"
+#include "td/utils/UInt.h"
#include <functional>
namespace td {
+
+extern int32 VERBOSITY_NAME(binlog);
+
struct BinlogInfo {
- bool was_created;
- uint64 last_id;
+ bool was_created{false};
+ uint64 last_id{0};
bool is_encrypted{false};
bool wrong_password{false};
bool is_opened{false};
@@ -34,12 +41,11 @@ namespace detail {
class BinlogReader;
class BinlogEventsProcessor;
class BinlogEventsBuffer;
-}; // namespace detail
+} // namespace detail
class Binlog {
public:
enum Error : int { WrongPassword = -1 };
- static bool IGNORE_ERASE_HACK;
Binlog();
Binlog(const Binlog &other) = delete;
Binlog &operator=(const Binlog &other) = delete;
@@ -67,8 +73,28 @@ class Binlog {
return fd_.empty();
}
- void add_raw_event(BufferSlice &&raw_event) {
- add_event(BinlogEvent(std::move(raw_event)));
+ uint64 add(int32 type, const Storer &storer) {
+ auto log_event_id = next_id();
+ add_raw_event(BinlogEvent::create_raw(log_event_id, type, 0, storer), {});
+ return log_event_id;
+ }
+
+ uint64 rewrite(uint64 log_event_id, int32 type, const Storer &storer) {
+ auto seq_no = next_id();
+ add_raw_event(BinlogEvent::create_raw(log_event_id, type, BinlogEvent::Flags::Rewrite, storer), {});
+ return seq_no;
+ }
+
+ uint64 erase(uint64 log_event_id) {
+ auto seq_no = next_id();
+ add_raw_event(BinlogEvent::create_raw(log_event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite,
+ EmptyStorer()),
+ {});
+ return seq_no;
+ }
+
+ void add_raw_event(BufferSlice &&raw_event, BinlogDebugInfo info) {
+ add_event(BinlogEvent(std::move(raw_event), info));
}
void add_event(BinlogEvent &&event);
@@ -81,6 +107,7 @@ class Binlog {
void change_key(DbKey new_db_key);
Status close(bool need_sync = true) TD_WARN_UNUSED_RESULT;
+ void close(Promise<> promise);
Status close_and_destroy() TD_WARN_UNUSED_RESULT;
static Status destroy(Slice path) TD_WARN_UNUSED_RESULT;
@@ -96,7 +123,7 @@ class Binlog {
BufferedFdBase<FileFd> fd_;
ChainBufferWriter buffer_writer_;
ChainBufferReader buffer_reader_;
- detail::BinlogReader *binlog_reader_ptr_;
+ detail::BinlogReader *binlog_reader_ptr_ = nullptr;
BinlogInfo info_;
DbKey db_key_;
@@ -118,17 +145,15 @@ class Binlog {
uint64 fd_events_{0};
string path_;
std::vector<BinlogEvent> pending_events_;
- std::unique_ptr<detail::BinlogEventsProcessor> processor_;
- std::unique_ptr<detail::BinlogEventsBuffer> events_buffer_;
+ unique_ptr<detail::BinlogEventsProcessor> processor_;
+ unique_ptr<detail::BinlogEventsBuffer> events_buffer_;
bool in_flush_events_buffer_{false};
uint64 last_id_{0};
double need_flush_since_ = 0;
bool need_sync_{false};
enum class State { Empty, Load, Reindex, Run } state_{State::Empty};
- static constexpr uint32 MAX_EVENT_SIZE = 65536;
-
- Result<FileFd> open_binlog(CSlice path, int32 flags);
+ static Result<FileFd> open_binlog(const string &path, int32 flags);
size_t flush_events_buffer(bool force);
void do_add_event(BinlogEvent &&event);
void do_event(BinlogEvent &&event);
@@ -139,5 +164,8 @@ class Binlog {
void reset_encryption();
void update_read_encryption();
void update_write_encryption();
+
+ string debug_get_binlog_data(int64 begin_offset, int64 end_offset);
};
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp
index e4584e920e..09f9fc40e3 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp
@@ -1,38 +1,78 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/binlog/BinlogEvent.h"
+#include "td/utils/crypto.h"
+#include "td/utils/logging.h"
+#include "td/utils/misc.h"
#include "td/utils/tl_parsers.h"
+#include "td/utils/tl_storers.h"
namespace td {
-int32 VERBOSITY_NAME(binlog) = VERBOSITY_NAME(DEBUG) + 8;
Status BinlogEvent::init(BufferSlice &&raw_event, bool check_crc) {
TlParser parser(raw_event.as_slice());
size_ = parser.fetch_int();
- CHECK(size_ == raw_event.size());
+ LOG_CHECK(size_ == raw_event.size()) << size_ << " " << raw_event.size() << debug_info_;
id_ = parser.fetch_long();
type_ = parser.fetch_int();
flags_ = parser.fetch_int();
extra_ = parser.fetch_long();
- CHECK(size_ >= MIN_EVENT_SIZE);
- auto slice_data = parser.fetch_string_raw<Slice>(size_ - MIN_EVENT_SIZE);
+ CHECK(size_ >= MIN_SIZE);
+ auto slice_data = parser.fetch_string_raw<Slice>(size_ - MIN_SIZE);
data_ = MutableSlice(const_cast<char *>(slice_data.begin()), slice_data.size());
crc32_ = static_cast<uint32>(parser.fetch_int());
if (check_crc) {
- CHECK(size_ >= EVENT_TAIL_SIZE);
- auto calculated_crc = crc32(raw_event.as_slice().truncate(size_ - EVENT_TAIL_SIZE));
+ auto calculated_crc = crc32(raw_event.as_slice().substr(0, size_ - TAIL_SIZE));
if (calculated_crc != crc32_) {
return Status::Error(PSLICE() << "crc mismatch " << tag("actual", format::as_hex(calculated_crc))
- << tag("expected", format::as_hex(crc32_)));
+ << tag("expected", format::as_hex(crc32_)) << public_to_string());
}
}
raw_event_ = std::move(raw_event);
return Status::OK();
}
+Status BinlogEvent::validate() const {
+ BinlogEvent event;
+ if (raw_event_.size() < 4) {
+ return Status::Error("Too small event");
+ }
+ uint32 size = TlParser(raw_event_.as_slice().substr(0, 4)).fetch_int();
+ if (size_ != size) {
+ return Status::Error(PSLICE() << "Size of event changed: " << tag("was", size_) << tag("now", size));
+ }
+ return event.init(raw_event_.clone(), true);
+}
+
+BufferSlice BinlogEvent::create_raw(uint64 id, int32 type, int32 flags, const Storer &storer) {
+ auto raw_event = BufferSlice{storer.size() + MIN_SIZE};
+
+ TlStorerUnsafe tl_storer(raw_event.as_slice().ubegin());
+ tl_storer.store_int(narrow_cast<int32>(raw_event.size()));
+ tl_storer.store_long(id);
+ tl_storer.store_int(type);
+ tl_storer.store_int(flags);
+ tl_storer.store_long(0);
+
+ CHECK(tl_storer.get_buf() == raw_event.as_slice().ubegin() + HEADER_SIZE);
+ tl_storer.store_storer(storer);
+
+ CHECK(tl_storer.get_buf() == raw_event.as_slice().uend() - TAIL_SIZE);
+ tl_storer.store_int(crc32(raw_event.as_slice().truncate(raw_event.size() - TAIL_SIZE)));
+
+ return raw_event;
+}
+
+void BinlogEvent::realloc() {
+ auto data_offset = data_.begin() - raw_event_.as_slice().begin();
+ auto data_size = data_.size();
+ raw_event_ = raw_event_.copy();
+ data_ = raw_event_.as_slice().substr(data_offset, data_size);
+}
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.h
index 1874543dff..8fab08da98 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -8,17 +8,16 @@
#include "td/utils/buffer.h"
#include "td/utils/common.h"
-#include "td/utils/crypto.h"
#include "td/utils/format.h"
-#include "td/utils/logging.h"
-#include "td/utils/misc.h"
#include "td/utils/Slice.h"
+#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include "td/utils/Storer.h"
+#include "td/utils/StorerBase.h"
#include "td/utils/StringBuilder.h"
-#include "td/utils/tl_storers.h"
namespace td {
+
struct EmptyStorerImpl {
EmptyStorerImpl() {
}
@@ -33,15 +32,27 @@ inline auto EmptyStorer() {
return create_default_storer(impl);
}
-static constexpr size_t MAX_EVENT_SIZE = 1 << 24;
-static constexpr size_t EVENT_HEADER_SIZE = 4 + 8 + 4 + 4 + 8;
-static constexpr size_t EVENT_TAIL_SIZE = 4;
-static constexpr size_t MIN_EVENT_SIZE = EVENT_HEADER_SIZE + EVENT_TAIL_SIZE;
+struct BinlogDebugInfo {
+ BinlogDebugInfo() = default;
+ BinlogDebugInfo(const char *file, int line) : file(file), line(line) {
+ }
+ const char *file{""};
+ int line{0};
+};
-extern int32 VERBOSITY_NAME(binlog);
+inline StringBuilder &operator<<(StringBuilder &sb, const BinlogDebugInfo &info) {
+ if (info.line == 0) {
+ return sb;
+ }
+ return sb << "[" << info.file << ":" << info.line << "]";
+}
-// TODO: smaller BinlogEvent
struct BinlogEvent {
+ static constexpr size_t MAX_SIZE = 1 << 24;
+ static constexpr size_t HEADER_SIZE = 4 + 8 + 4 + 4 + 8;
+ static constexpr size_t TAIL_SIZE = 4;
+ static constexpr size_t MIN_SIZE = HEADER_SIZE + TAIL_SIZE;
+
int64 offset_;
uint32 size_;
@@ -54,6 +65,8 @@ struct BinlogEvent {
BufferSlice raw_event_;
+ BinlogDebugInfo debug_info_;
+
enum ServiceTypes { Header = -1, Empty = -2, AesCtrEncryption = -3, NoEncryption = -4 };
enum Flags { Rewrite = 1, Partial = 2 };
@@ -65,6 +78,7 @@ struct BinlogEvent {
}
BinlogEvent clone() const {
BinlogEvent result;
+ result.debug_info_ = BinlogDebugInfo{__FILE__, __LINE__};
result.init(raw_event_.clone()).ensure();
return result;
}
@@ -74,36 +88,32 @@ struct BinlogEvent {
}
BinlogEvent() = default;
- explicit BinlogEvent(BufferSlice &&raw_event) {
+ //explicit BinlogEvent(BufferSlice &&raw_event) {
+ //init(std::move(raw_event), false).ensure();
+ //}
+ BinlogEvent(BufferSlice &&raw_event, BinlogDebugInfo info) {
+ debug_info_ = info;
init(std::move(raw_event), false).ensure();
}
+
Status init(BufferSlice &&raw_event, bool check_crc = true) TD_WARN_UNUSED_RESULT;
static BufferSlice create_raw(uint64 id, int32 type, int32 flags, const Storer &storer);
+
+ std::string public_to_string() const {
+ return PSTRING() << "LogEvent[" << tag("id", format::as_hex(id_)) << tag("type", type_) << tag("flags", flags_)
+ << tag("data", data_.size()) << "]" << debug_info_;
+ }
+
+ Status validate() const;
+
+ void realloc();
};
inline StringBuilder &operator<<(StringBuilder &sb, const BinlogEvent &event) {
return sb << "LogEvent[" << tag("id", format::as_hex(event.id_)) << tag("type", event.type_)
- << tag("flags", event.flags_) << tag("data", format::as_hex_dump<4>(event.data_)) << "]";
+ << tag("flags", event.flags_) << tag("data", format::as_hex_dump<4>(event.data_)) << "]"
+ << event.debug_info_;
}
-// Implementation
-inline BufferSlice BinlogEvent::create_raw(uint64 id, int32 type, int32 flags, const Storer &storer) {
- auto raw_event = BufferSlice{storer.size() + MIN_EVENT_SIZE};
-
- TlStorerUnsafe tl_storer(raw_event.as_slice().begin());
- tl_storer.store_int(narrow_cast<int32>(raw_event.size()));
- tl_storer.store_long(id);
- tl_storer.store_int(type);
- tl_storer.store_int(flags);
- tl_storer.store_long(0);
-
- CHECK(tl_storer.get_buf() == raw_event.as_slice().begin() + EVENT_HEADER_SIZE);
- tl_storer.store_storer(storer);
-
- CHECK(tl_storer.get_buf() == raw_event.as_slice().end() - EVENT_TAIL_SIZE);
- tl_storer.store_int(::td::crc32(raw_event.as_slice().truncate(raw_event.size() - EVENT_TAIL_SIZE)));
-
- return raw_event;
-}
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogHelper.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogHelper.h
index 1224ac4ac2..66a65e060d 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogHelper.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogHelper.h
@@ -1,45 +1,32 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
-#include "td/actor/PromiseFuture.h"
-
#include "td/db/binlog/BinlogEvent.h"
+#include "td/db/binlog/BinlogInterface.h"
#include "td/utils/common.h"
-#include "td/utils/Storer.h"
+#include "td/utils/Promise.h"
+#include "td/utils/StorerBase.h"
namespace td {
-class BinlogHelper {
- public:
- template <class BinlogT, class StorerT>
- static uint64 add(const BinlogT &binlog_ptr, int32 type, const StorerT &storer, Promise<> promise = Promise<>()) {
- auto logevent_id = binlog_ptr->next_id();
- binlog_ptr->add_raw_event(logevent_id, BinlogEvent::create_raw(logevent_id, type, 0, storer), std::move(promise));
- return logevent_id;
- }
- template <class BinlogT, class StorerT>
- static uint64 rewrite(const BinlogT &binlog_ptr, uint64 logevent_id, int32 type, const StorerT &storer,
- Promise<> promise = Promise<>()) {
- auto seq_no = binlog_ptr->next_id();
- binlog_ptr->add_raw_event(seq_no, BinlogEvent::create_raw(logevent_id, type, BinlogEvent::Flags::Rewrite, storer),
- std::move(promise));
- return seq_no;
- }
+inline uint64 binlog_add(BinlogInterface *binlog_ptr, int32 type, const Storer &storer,
+ Promise<> promise = Promise<>()) {
+ return binlog_ptr->add(type, storer, std::move(promise));
+}
+
+inline uint64 binlog_rewrite(BinlogInterface *binlog_ptr, uint64 log_event_id, int32 type, const Storer &storer,
+ Promise<> promise = Promise<>()) {
+ return binlog_ptr->rewrite(log_event_id, type, storer, std::move(promise));
+}
+
+inline uint64 binlog_erase(BinlogInterface *binlog_ptr, uint64 log_event_id, Promise<> promise = Promise<>()) {
+ return binlog_ptr->erase(log_event_id, std::move(promise));
+}
- template <class BinlogT>
- static uint64 erase(const BinlogT &binlog_ptr, uint64 logevent_id, Promise<> promise = Promise<>()) {
- auto seq_no = binlog_ptr->next_id();
- binlog_ptr->add_raw_event(seq_no,
- BinlogEvent::create_raw(logevent_id, BinlogEvent::ServiceTypes::Empty,
- BinlogEvent::Flags::Rewrite, EmptyStorer()),
- std::move(promise));
- return seq_no;
- }
-};
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogInterface.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogInterface.h
index 2360f3c480..b3ec7b119f 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogInterface.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogInterface.h
@@ -1,20 +1,21 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
-#include "td/actor/PromiseFuture.h"
-
#include "td/db/binlog/BinlogEvent.h"
#include "td/db/DbKey.h"
#include "td/utils/buffer.h"
#include "td/utils/common.h"
+#include "td/utils/Promise.h"
+#include "td/utils/StorerBase.h"
namespace td {
+
class BinlogInterface {
public:
BinlogInterface() = default;
@@ -30,15 +31,41 @@ class BinlogInterface {
void close_and_destroy(Promise<> promise = {}) {
close_and_destroy_impl(std::move(promise));
}
+ void add_raw_event(BinlogDebugInfo info, uint64 id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) {
+ add_raw_event_impl(id, std::move(raw_event), std::move(promise), info);
+ }
void add_raw_event(uint64 id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) {
- add_raw_event_impl(id, std::move(raw_event), std::move(promise));
+ add_raw_event_impl(id, std::move(raw_event), std::move(promise), {});
}
void lazy_sync(Promise<> promise = Promise<>()) {
- add_raw_event_impl(next_id(), BufferSlice(), std::move(promise));
+ add_raw_event_impl(next_id(), BufferSlice(), std::move(promise), {});
}
+
+ uint64 add(int32 type, const Storer &storer, Promise<> promise = Promise<>()) {
+ auto log_event_id = next_id();
+ add_raw_event_impl(log_event_id, BinlogEvent::create_raw(log_event_id, type, 0, storer), std::move(promise), {});
+ return log_event_id;
+ }
+
+ uint64 rewrite(uint64 log_event_id, int32 type, const Storer &storer, Promise<> promise = Promise<>()) {
+ auto seq_no = next_id();
+ add_raw_event_impl(seq_no, BinlogEvent::create_raw(log_event_id, type, BinlogEvent::Flags::Rewrite, storer),
+ std::move(promise), {});
+ return seq_no;
+ }
+
+ uint64 erase(uint64 log_event_id, Promise<> promise = Promise<>()) {
+ auto seq_no = next_id();
+ add_raw_event_impl(seq_no,
+ BinlogEvent::create_raw(log_event_id, BinlogEvent::ServiceTypes::Empty,
+ BinlogEvent::Flags::Rewrite, EmptyStorer()),
+ std::move(promise), {});
+ return seq_no;
+ }
+
virtual void force_sync(Promise<> promise) = 0;
virtual void force_flush() = 0;
- virtual void change_key(DbKey db_key, Promise<> promise = Promise<>()) = 0;
+ virtual void change_key(DbKey db_key, Promise<> promise) = 0;
virtual uint64 next_id() = 0;
virtual uint64 next_id(int32 shift) = 0;
@@ -46,6 +73,7 @@ class BinlogInterface {
protected:
virtual void close_impl(Promise<> promise) = 0;
virtual void close_and_destroy_impl(Promise<> promise) = 0;
- virtual void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise) = 0;
+ virtual void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) = 0;
};
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp
index aaf07f2967..16a4e2c010 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -8,37 +8,41 @@
#include "td/utils/logging.h"
#include "td/utils/OrderedEventsProcessor.h"
+#include "td/utils/SliceBuilder.h"
#include "td/utils/Time.h"
#include <map>
namespace td {
namespace detail {
-class BinlogActor : public Actor {
+class BinlogActor final : public Actor {
public:
- BinlogActor(std::unique_ptr<Binlog> binlog, uint64 seq_no) : binlog_(std::move(binlog)), processor_(seq_no) {
+ BinlogActor(unique_ptr<Binlog> binlog, uint64 seq_no) : binlog_(std::move(binlog)), processor_(seq_no) {
}
void close(Promise<> promise) {
binlog_->close().ensure();
- promise.set_value(Unit());
- LOG(INFO) << "close: done";
+ LOG(INFO) << "Finished to close binlog";
stop();
+
+ promise.set_value(Unit()); // setting promise can complete closing and destroy the current actor context
}
void close_and_destroy(Promise<> promise) {
binlog_->close_and_destroy().ensure();
- promise.set_value(Unit());
- LOG(INFO) << "close_and_destroy: done";
+ LOG(INFO) << "Finished to destroy binlog";
stop();
+
+ promise.set_value(Unit()); // setting promise can complete closing and destroy the current actor context
}
struct Event {
BufferSlice raw_event;
Promise<> sync_promise;
+ BinlogDebugInfo debug_info;
};
- void add_raw_event(uint64 seq_no, BufferSlice &&raw_event, Promise<> &&promise) {
- processor_.add(seq_no, Event{std::move(raw_event), std::move(promise)}, [&](uint64 id, Event &&event) {
+ void add_raw_event(uint64 seq_no, BufferSlice &&raw_event, Promise<> &&promise, BinlogDebugInfo info) {
+ processor_.add(seq_no, Event{std::move(raw_event), std::move(promise), info}, [&](uint64 id, Event &&event) {
if (!event.raw_event.empty()) {
- do_add_raw_event(std::move(event.raw_event));
+ do_add_raw_event(std::move(event.raw_event), event.debug_info);
}
do_lazy_sync(std::move(event.sync_promise));
});
@@ -67,7 +71,7 @@ class BinlogActor : public Actor {
}
private:
- std::unique_ptr<Binlog> binlog_;
+ unique_ptr<Binlog> binlog_;
OrderedEventsProcessor<Event> processor_;
@@ -78,7 +82,7 @@ class BinlogActor : public Actor {
bool flush_flag_ = false;
double wakeup_at_ = 0;
- static constexpr int32 FLUSH_TIMEOUT = 1; // 1s
+ static constexpr double FLUSH_TIMEOUT = 0.001; // 1ms
void wakeup_after(double after) {
auto now = Time::now_cached();
@@ -92,8 +96,8 @@ class BinlogActor : public Actor {
}
}
- void do_add_raw_event(BufferSlice &&raw_event) {
- binlog_->add_raw_event(std::move(raw_event));
+ void do_add_raw_event(BufferSlice &&raw_event, BinlogDebugInfo info) {
+ binlog_->add_raw_event(std::move(raw_event), info);
}
void try_flush() {
@@ -138,7 +142,7 @@ class BinlogActor : public Actor {
}
}
- void timeout_expired() override {
+ void timeout_expired() final {
bool need_sync = lazy_sync_flag_ || force_sync_flag_;
lazy_sync_flag_ = false;
force_sync_flag_ = false;
@@ -148,10 +152,7 @@ class BinlogActor : public Actor {
if (need_sync) {
binlog_->sync();
// LOG(ERROR) << "BINLOG SYNC";
- for (auto &promise : sync_promises_) {
- promise.set_value(Unit());
- }
- sync_promises_.clear();
+ set_promises(sync_promises_);
} else if (need_flush) {
try_flush();
// LOG(ERROR) << "BINLOG FLUSH";
@@ -162,24 +163,24 @@ class BinlogActor : public Actor {
ConcurrentBinlog::ConcurrentBinlog() = default;
ConcurrentBinlog::~ConcurrentBinlog() = default;
-ConcurrentBinlog::ConcurrentBinlog(std::unique_ptr<Binlog> binlog, int scheduler_id) {
+ConcurrentBinlog::ConcurrentBinlog(unique_ptr<Binlog> binlog, int scheduler_id) {
init_impl(std::move(binlog), scheduler_id);
}
Result<BinlogInfo> ConcurrentBinlog::init(string path, const Callback &callback, DbKey db_key, DbKey old_db_key,
int scheduler_id) {
- auto binlog = std::make_unique<Binlog>();
+ auto binlog = make_unique<Binlog>();
TRY_STATUS(binlog->init(std::move(path), callback, std::move(db_key), std::move(old_db_key)));
auto info = binlog->get_info();
init_impl(std::move(binlog), scheduler_id);
return info;
}
-void ConcurrentBinlog::init_impl(std::unique_ptr<Binlog> binlog, int32 scheduler_id) {
+void ConcurrentBinlog::init_impl(unique_ptr<Binlog> binlog, int32 scheduler_id) {
path_ = binlog->get_path().str();
last_id_ = binlog->peek_next_id();
- binlog_actor_ =
- create_actor_on_scheduler<detail::BinlogActor>("Binlog " + path_, scheduler_id, std::move(binlog), last_id_);
+ binlog_actor_ = create_actor_on_scheduler<detail::BinlogActor>(PSLICE() << "Binlog " << path_, scheduler_id,
+ std::move(binlog), last_id_);
}
void ConcurrentBinlog::close_impl(Promise<> promise) {
@@ -188,8 +189,8 @@ void ConcurrentBinlog::close_impl(Promise<> promise) {
void ConcurrentBinlog::close_and_destroy_impl(Promise<> promise) {
send_closure(std::move(binlog_actor_), &detail::BinlogActor::close_and_destroy, std::move(promise));
}
-void ConcurrentBinlog::add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise) {
- send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, id, std::move(raw_event), std::move(promise));
+void ConcurrentBinlog::add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) {
+ send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, id, std::move(raw_event), std::move(promise), info);
}
void ConcurrentBinlog::force_sync(Promise<> promise) {
send_closure(binlog_actor_, &detail::BinlogActor::force_sync, std::move(promise));
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h
index ce77a78a84..ad6c9bc5c3 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h
@@ -1,19 +1,20 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
-#include "td/actor/actor.h"
-#include "td/actor/PromiseFuture.h"
-
#include "td/db/binlog/Binlog.h"
#include "td/db/binlog/BinlogInterface.h"
+#include "td/db/DbKey.h"
+
+#include "td/actor/actor.h"
#include "td/utils/buffer.h"
#include "td/utils/common.h"
+#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
@@ -26,28 +27,28 @@ namespace detail {
class BinlogActor;
} // namespace detail
-class ConcurrentBinlog : public BinlogInterface {
+class ConcurrentBinlog final : public BinlogInterface {
public:
using Callback = std::function<void(const BinlogEvent &)>;
Result<BinlogInfo> init(string path, const Callback &callback, DbKey db_key = DbKey::empty(),
DbKey old_db_key = DbKey::empty(), int scheduler_id = -1) TD_WARN_UNUSED_RESULT;
ConcurrentBinlog();
- explicit ConcurrentBinlog(std::unique_ptr<Binlog> binlog, int scheduler_id = -1);
+ explicit ConcurrentBinlog(unique_ptr<Binlog> binlog, int scheduler_id = -1);
ConcurrentBinlog(const ConcurrentBinlog &other) = delete;
ConcurrentBinlog &operator=(const ConcurrentBinlog &other) = delete;
ConcurrentBinlog(ConcurrentBinlog &&other) = delete;
ConcurrentBinlog &operator=(ConcurrentBinlog &&other) = delete;
- ~ConcurrentBinlog() override;
+ ~ConcurrentBinlog() final;
- void force_sync(Promise<> promise) override;
- void force_flush() override;
- void change_key(DbKey db_key, Promise<> promise) override;
+ void force_sync(Promise<> promise) final;
+ void force_flush() final;
+ void change_key(DbKey db_key, Promise<> promise) final;
- uint64 next_id() override {
+ uint64 next_id() final {
return last_id_.fetch_add(1, std::memory_order_relaxed);
}
- uint64 next_id(int32 shift) override {
+ uint64 next_id(int32 shift) final {
return last_id_.fetch_add(shift, std::memory_order_relaxed);
}
@@ -56,13 +57,14 @@ class ConcurrentBinlog : public BinlogInterface {
}
private:
- void init_impl(std::unique_ptr<Binlog> binlog, int scheduler_id);
- void close_impl(Promise<> promise) override;
- void close_and_destroy_impl(Promise<> promise) override;
- void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise) override;
+ void init_impl(unique_ptr<Binlog> binlog, int scheduler_id);
+ void close_impl(Promise<> promise) final;
+ void close_and_destroy_impl(Promise<> promise) final;
+ void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) final;
ActorOwn<detail::BinlogActor> binlog_actor_;
string path_;
- std::atomic<uint64> last_id_;
+ std::atomic<uint64> last_id_{0};
};
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp
index a8b8bf9e1b..f3984062fc 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp
@@ -1,52 +1,156 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/binlog/Binlog.h"
+#include "td/db/DbKey.h"
+
#include "td/utils/common.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
+#include "td/utils/misc.h"
+#include "td/utils/port/Stat.h"
+#include "td/utils/Slice.h"
+#include "td/utils/SliceBuilder.h"
+#include "td/utils/StringBuilder.h"
+#include "td/utils/tl_parsers.h"
-#include <cstdio>
#include <map>
+struct Trie {
+ Trie() {
+ nodes_.resize(1);
+ }
+
+ void add(td::Slice value) {
+ do_add(0, PSLICE() << value << '\0');
+ }
+
+ void dump() {
+ if (nodes_[0].sum == 0) { // division by zero
+ return;
+ }
+ LOG(PLAIN) << "TOTAL: " << nodes_[0].sum;
+ do_dump("", 0);
+ }
+
+ private:
+ struct FullNode {
+ int next[256] = {};
+ int sum = 0;
+ };
+ td::vector<FullNode> nodes_;
+
+ void do_add(int id, td::Slice value) {
+ nodes_[id].sum++;
+ if (value.empty()) {
+ return;
+ }
+
+ auto c = static_cast<td::uint8>(value[0]);
+ auto next_id = nodes_[id].next[c];
+ if (next_id == 0) {
+ next_id = static_cast<int>(nodes_.size());
+ nodes_.emplace_back();
+ nodes_[id].next[c] = next_id;
+ }
+ do_add(next_id, value.substr(1));
+ }
+
+ void do_dump(td::string path, int v) {
+ bool is_word_end = !path.empty() && path.back() == '\0';
+
+ bool need_stop = false;
+ int next_count = 0;
+ for (int c = 0; c < 256; c++) {
+ if (nodes_[v].next[c] != 0) {
+ need_stop |= c >= 128 || !(td::is_alpha(static_cast<char>(c)) || c == '.' || c == '_');
+ next_count++;
+ }
+ }
+ need_stop |= next_count == 0 || (next_count >= 2 && nodes_[v].sum <= nodes_[0].sum / 100);
+
+ if (is_word_end || need_stop) {
+ if (is_word_end) {
+ path.pop_back();
+ } else if (next_count != 1 || nodes_[v].next[0] == 0) {
+ path.push_back('*');
+ }
+ LOG(PLAIN) << nodes_[v].sum << " " << td::StringBuilder::FixedDouble(nodes_[v].sum * 100.0 / nodes_[0].sum, 2)
+ << "% [" << td::format::escaped(path) << "]";
+ return;
+ }
+ for (int c = 0; c < 256; c++) {
+ auto next_id = nodes_[v].next[c];
+ if (next_id == 0) {
+ continue;
+ }
+ do_dump(path + static_cast<char>(c), next_id);
+ }
+ }
+};
+
+enum Magic { ConfigPmcMagic = 0x1f18, BinlogPmcMagic = 0x4327 };
+
int main(int argc, char *argv[]) {
if (argc < 2) {
- std::fprintf(stderr, "Usage: binlog_dump <binlog_file_name>\n");
+ LOG(PLAIN) << "Usage: binlog_dump <binlog_file_name>";
+ return 1;
+ }
+ td::string binlog_file_name = argv[1];
+ auto r_stat = td::stat(binlog_file_name);
+ if (r_stat.is_error() || r_stat.ok().size_ == 0 || !r_stat.ok().is_reg_) {
+ LOG(PLAIN) << "Wrong binlog file name specified";
+ LOG(PLAIN) << "Usage: binlog_dump <binlog_file_name>";
return 1;
}
struct Info {
std::size_t full_size = 0;
std::size_t compressed_size = 0;
+ Trie trie;
+ Trie compressed_trie;
};
std::map<td::uint64, Info> info;
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
td::Binlog binlog;
binlog
- .init(argv[1],
- [&](auto &event) {
- info[0].compressed_size += event.raw_event_.size();
- info[event.type_].compressed_size += event.raw_event_.size();
- },
- td::DbKey::empty(), td::DbKey::empty(), -1,
- [&](auto &event) mutable {
- info[0].full_size += event.raw_event_.size();
- info[event.type_].full_size += event.raw_event_.size();
- LOG(PLAIN) << "LogEvent[" << td::tag("id", td::format::as_hex(event.id_)) << td::tag("type", event.type_)
- << td::tag("flags", event.flags_) << td::tag("data", td::format::escaped(event.data_))
- << "]\n";
- })
+ .init(
+ binlog_file_name,
+ [&](auto &event) {
+ info[0].compressed_size += event.raw_event_.size();
+ info[event.type_].compressed_size += event.raw_event_.size();
+ if (event.type_ == ConfigPmcMagic || event.type_ == BinlogPmcMagic) {
+ auto key = td::TlParser(event.data_).fetch_string<td::Slice>();
+ info[event.type_].compressed_trie.add(key);
+ }
+ },
+ td::DbKey::raw_key("cucumber"), td::DbKey::empty(), -1,
+ [&](auto &event) mutable {
+ info[0].full_size += event.raw_event_.size();
+ info[event.type_].full_size += event.raw_event_.size();
+ if (event.type_ == ConfigPmcMagic || event.type_ == BinlogPmcMagic) {
+ auto key = td::TlParser(event.data_).fetch_string<td::Slice>();
+ info[event.type_].trie.add(key);
+ }
+ LOG(PLAIN) << "LogEvent[" << td::tag("id", td::format::as_hex(event.id_)) << td::tag("type", event.type_)
+ << td::tag("flags", event.flags_) << td::tag("size", event.data_.size())
+ << td::tag("data", td::format::escaped(event.data_)) << "]\n";
+ })
.ensure();
for (auto &it : info) {
- LOG(ERROR) << td::tag("handler", td::format::as_hex(it.first))
+ LOG(PLAIN) << td::tag("handler", td::format::as_hex(it.first))
<< td::tag("full_size", td::format::as_size(it.second.full_size))
<< td::tag("compressed_size", td::format::as_size(it.second.compressed_size));
+ it.second.trie.dump();
+ if (it.second.full_size != it.second.compressed_size) {
+ it.second.compressed_trie.dump();
+ }
}
return 0;
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp
index b7ddc98ff1..cb4e4817c5 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -10,6 +10,7 @@
namespace td {
namespace detail {
+
void BinlogEventsBuffer::add_event(BinlogEvent &&event) {
total_events_++;
if ((event.flags_ & BinlogEvent::Flags::Partial) == 0) {
@@ -26,14 +27,17 @@ void BinlogEventsBuffer::add_event(BinlogEvent &&event) {
size_ += event.size_;
events_.push_back(std::move(event));
}
+
bool BinlogEventsBuffer::need_flush() const {
return total_events_ > 5000 || ids_.size() > 100;
}
+
void BinlogEventsBuffer::clear() {
ids_.clear();
events_.clear();
total_events_ = 0;
size_ = 0;
}
+
} // namespace detail
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h
index dcd6d7c1b3..5b7cab8212 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h
@@ -1,16 +1,18 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
+
#include "td/db/binlog/BinlogEvent.h"
#include "td/utils/common.h"
namespace td {
namespace detail {
+
class BinlogEventsBuffer {
public:
void add_event(BinlogEvent &&event);
@@ -23,7 +25,8 @@ class BinlogEventsBuffer {
auto &event = events_[i];
if (i + 1 != ids_.size() && (event.flags_ & BinlogEvent::Flags::Partial) == 0) {
callback(BinlogEvent(BinlogEvent::create_raw(event.id_, event.type_, event.flags_ | BinlogEvent::Flags::Partial,
- create_storer(event.data_))));
+ create_storer(event.data_)),
+ BinlogDebugInfo{__FILE__, __LINE__}));
} else {
callback(std::move(event));
}
@@ -43,5 +46,6 @@ class BinlogEventsBuffer {
void do_event(BinlogEvent &&event);
void clear();
};
+
} // namespace detail
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp
index 50ad91bff8..7b2832b39a 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp
@@ -1,25 +1,26 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/binlog/detail/BinlogEventsProcessor.h"
-#include "td/utils/logging.h"
+#include "td/utils/SliceBuilder.h"
+#include "td/utils/Status.h"
#include <algorithm>
namespace td {
namespace detail {
-void BinlogEventsProcessor::do_event(BinlogEvent &&event) {
+
+Status BinlogEventsProcessor::do_event(BinlogEvent &&event) {
offset_ = event.offset_;
auto fixed_id = event.id_ * 2;
if ((event.flags_ & BinlogEvent::Flags::Rewrite) && !ids_.empty() && ids_.back() >= fixed_id) {
auto it = std::lower_bound(ids_.begin(), ids_.end(), fixed_id);
if (it == ids_.end() || *it != fixed_id) {
- LOG(FATAL) << "Ignore rewrite logevent";
- return;
+ return Status::Error(PSLICE() << "Ignore rewrite log event " << event.public_to_string());
}
auto pos = it - ids_.begin();
total_raw_events_size_ -= static_cast<int64>(events_[pos].raw_event_.size());
@@ -32,10 +33,14 @@ void BinlogEventsProcessor::do_event(BinlogEvent &&event) {
total_raw_events_size_ += static_cast<int64>(event.raw_event_.size());
events_[pos] = std::move(event);
}
- } else if (event.type_ == BinlogEvent::ServiceTypes::Empty) {
- // just skip this event
+ } else if (event.type_ < 0) {
+ // just skip service events
} else {
- CHECK(ids_.empty() || ids_.back() < fixed_id);
+ if (!(ids_.empty() || ids_.back() < fixed_id)) {
+ return Status::Error(PSLICE() << offset_ << " " << ids_.size() << " " << ids_.back() << " " << fixed_id << " "
+ << event.public_to_string() << " " << total_events_ << " "
+ << total_raw_events_size_);
+ }
last_id_ = event.id_;
total_raw_events_size_ += static_cast<int64>(event.raw_event_.size());
total_events_++;
@@ -46,6 +51,7 @@ void BinlogEventsProcessor::do_event(BinlogEvent &&event) {
if (total_events_ > 10 && empty_events_ * 4 > total_events_ * 3) {
compactify();
}
+ return Status::OK();
}
void BinlogEventsProcessor::compactify() {
@@ -66,5 +72,6 @@ void BinlogEventsProcessor::compactify() {
empty_events_ = 0;
CHECK(ids_.size() == events_.size());
}
+
} // namespace detail
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h
index 645f8c50ab..8b276aef03 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h
@@ -1,25 +1,31 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
+
#include "td/db/binlog/BinlogEvent.h"
#include "td/utils/common.h"
+#include "td/utils/logging.h"
+#include "td/utils/Status.h"
namespace td {
namespace detail {
+
class BinlogEventsProcessor {
public:
- void add_event(BinlogEvent &&event) {
- do_event(std::move(event));
+ Status add_event(BinlogEvent &&event) TD_WARN_UNUSED_RESULT {
+ return do_event(std::move(event));
}
template <class CallbackT>
void for_each(CallbackT &&callback) {
for (size_t i = 0; i < ids_.size(); i++) {
+ LOG_CHECK(i == 0 || ids_[i - 1] < ids_[i]) << ids_[i - 1] << " " << events_[i - 1].public_to_string() << " "
+ << ids_[i] << " " << events_[i].public_to_string();
if ((ids_[i] & 1) == 0) {
callback(events_[i]);
}
@@ -46,8 +52,9 @@ class BinlogEventsProcessor {
int64 offset_{0};
int64 total_raw_events_size_{0};
- void do_event(BinlogEvent &&event);
+ Status do_event(BinlogEvent &&event);
void compactify();
};
+
} // namespace detail
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp
index 6ee1b45af2..83147562e8 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -10,29 +10,50 @@
#include "td/utils/common.h"
#include "td/utils/logging.h"
+#include "td/utils/misc.h"
#include "td/utils/port/path.h"
+#include "td/utils/port/Stat.h"
+
+#include <atomic>
namespace td {
namespace detail {
-Status RawSqliteDb::last_error(sqlite3 *db) {
- return Status::Error(Slice(sqlite3_errmsg(db)));
+
+static std::atomic<bool> was_database_destroyed{false};
+
+Status RawSqliteDb::last_error(tdsqlite3 *db, CSlice path) {
+ return Status::Error(PSLICE() << Slice(tdsqlite3_errmsg(db)) << " for database \"" << path << '"');
}
+
Status RawSqliteDb::destroy(Slice path) {
- with_db_path(path, [](auto path) { unlink(path).ignore(); });
- return Status::OK();
+ Status error;
+ with_db_path(path, [&](auto path) {
+ unlink(path).ignore();
+ if (!ends_with(path, "-shm") && !stat(path).is_error()) {
+ error = Status::Error(PSLICE() << "Failed to delete file \"" << path << '"');
+ }
+ });
+ return error;
}
+
Status RawSqliteDb::last_error() {
//If database was corrupted, try to delete it.
- auto code = sqlite3_errcode(db_);
+ auto code = tdsqlite3_errcode(db_);
if (code == SQLITE_CORRUPT) {
+ was_database_destroyed.store(true, std::memory_order_relaxed);
destroy(path_).ignore();
}
- return last_error(db_);
+ return last_error(db_, path());
+}
+
+bool RawSqliteDb::was_any_database_destroyed() {
+ return was_database_destroyed.load(std::memory_order_relaxed);
}
+
RawSqliteDb::~RawSqliteDb() {
- auto rc = sqlite3_close(db_);
- LOG_IF(FATAL, rc != SQLITE_OK) << last_error(db_);
+ auto rc = tdsqlite3_close(db_);
+ LOG_IF(FATAL, rc != SQLITE_OK) << last_error(db_, path());
}
} // namespace detail
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.h b/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.h
index 801f3b192a..c2227d1fa3 100644
--- a/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.h
+++ b/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.h
@@ -1,22 +1,24 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
-#include "td/utils/logging.h"
+#include "td/utils/optional.h"
#include "td/utils/Slice.h"
+#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
-struct sqlite3;
+struct tdsqlite3;
namespace td {
namespace detail {
+
class RawSqliteDb {
public:
- RawSqliteDb(sqlite3 *db, std::string path) : db_(db), path_(std::move(path)) {
+ RawSqliteDb(tdsqlite3 *db, std::string path) : db_(db), path_(std::move(path)) {
}
RawSqliteDb(const RawSqliteDb &) = delete;
RawSqliteDb(RawSqliteDb &&) = delete;
@@ -28,12 +30,12 @@ class RawSqliteDb {
static void with_db_path(Slice main_path, F &&f) {
f(PSLICE() << main_path);
f(PSLICE() << main_path << "-journal");
- f(PSLICE() << main_path << "-shm");
f(PSLICE() << main_path << "-wal");
+ f(PSLICE() << main_path << "-shm");
}
static Status destroy(Slice path) TD_WARN_UNUSED_RESULT;
- sqlite3 *db() {
+ tdsqlite3 *db() {
return db_;
}
CSlice path() const {
@@ -41,11 +43,36 @@ class RawSqliteDb {
}
Status last_error();
- static Status last_error(sqlite3 *db);
+ static Status last_error(tdsqlite3 *db, CSlice path);
+
+ static bool was_any_database_destroyed();
+
+ bool on_begin() {
+ begin_cnt_++;
+ return begin_cnt_ == 1;
+ }
+ Result<bool> on_commit() {
+ if (begin_cnt_ == 0) {
+ return Status::Error("No matching begin for commit");
+ }
+ begin_cnt_--;
+ return begin_cnt_ == 0;
+ }
+
+ void set_cipher_version(int32 cipher_version) {
+ cipher_version_ = cipher_version;
+ }
+
+ optional<int32> get_cipher_version() const {
+ return cipher_version_.copy();
+ }
private:
- sqlite3 *db_;
+ tdsqlite3 *db_;
std::string path_;
+ size_t begin_cnt_{0};
+ optional<int32> cipher_version_;
};
-}; // namespace detail
+
+} // namespace detail
} // namespace td