diff options
Diffstat (limited to 'protocols/Telegram/tdlib/td/tddb')
33 files changed, 444 insertions, 283 deletions
diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/BinlogKeyValue.h b/protocols/Telegram/tdlib/td/tddb/td/db/BinlogKeyValue.h index 9ed4d92240..6b6e68422d 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/BinlogKeyValue.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/BinlogKeyValue.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -82,7 +82,7 @@ class BinlogKeyValue final : public KeyValueSyncInterface { name, [&](const BinlogEvent &binlog_event) { Event event; - event.parse(TlParser(binlog_event.data_)); + event.parse(TlParser(binlog_event.get_data())); map_.emplace(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_)); }, std::move(db_key), DbKey::empty(), scheduler_id)); @@ -103,7 +103,7 @@ class BinlogKeyValue final : public KeyValueSyncInterface { void external_init_handle(const BinlogEvent &binlog_event) { Event event; - event.parse(TlParser(binlog_event.data_)); + event.parse(TlParser(binlog_event.get_data())); map_.emplace(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_)); } @@ -120,7 +120,7 @@ class BinlogKeyValue final : public KeyValueSyncInterface { SeqNo set(string key, string value) final { auto lock = rw_mutex_.lock_write().move_as_ok(); - uint64 old_id = 0; + uint64 old_event_id = 0; auto it_ok = map_.emplace(key, std::make_pair(value, 0)); if (!it_ok.second) { if (it_ok.first->second.first == value) { @@ -128,25 +128,25 @@ class BinlogKeyValue final : public KeyValueSyncInterface { } VLOG(binlog) << "Change value of key " << key << " from " << hex_encode(it_ok.first->second.first) << " to " << hex_encode(value); - old_id = it_ok.first->second.second; + old_event_id = it_ok.first->second.second; it_ok.first->second.first = value; } else { VLOG(binlog) << "Set value of key " << key << " to " << hex_encode(value); } bool rewrite = false; - uint64 id; - auto seq_no = binlog_->next_id(); - if (old_id != 0) { + uint64 event_id; + auto seq_no = binlog_->next_event_id(); + if (old_event_id != 0) { rewrite = true; - id = old_id; + event_id = old_event_id; } else { - id = seq_no; - it_ok.first->second.second = id; + event_id = seq_no; + it_ok.first->second.second = event_id; } lock.reset(); add_event(seq_no, - BinlogEvent::create_raw(id, magic_, rewrite ? BinlogEvent::Flags::Rewrite : 0, Event{key, value})); + BinlogEvent::create_raw(event_id, magic_, rewrite ? BinlogEvent::Flags::Rewrite : 0, Event{key, value})); return seq_no; } @@ -157,15 +157,32 @@ class BinlogKeyValue final : public KeyValueSyncInterface { return 0; } VLOG(binlog) << "Remove value of key " << key << ", which is " << hex_encode(it->second.first); - uint64 id = it->second.second; + uint64 event_id = it->second.second; map_.erase(it); - auto seq_no = binlog_->next_id(); + auto seq_no = binlog_->next_event_id(); lock.reset(); - add_event(seq_no, BinlogEvent::create_raw(id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, + add_event(seq_no, BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, EmptyStorer())); return seq_no; } + SeqNo erase_batch(vector<string> keys) final { + auto lock = rw_mutex_.lock_write().move_as_ok(); + vector<uint64> log_event_ids; + for (auto &key : keys) { + auto it = map_.find(key); + if (it != map_.end()) { + log_event_ids.push_back(it->second.second); + map_.erase(it); + } + } + if (log_event_ids.empty()) { + return 0; + } + VLOG(binlog) << "Remove value of keys " << keys; + return binlog_->erase_batch(std::move(log_event_ids)); + } + void add_event(uint64 seq_no, BufferSlice &&event) { binlog_->add_raw_event(BinlogDebugInfo{__FILE__, __LINE__}, seq_no, std::move(event)); } @@ -215,18 +232,18 @@ class BinlogKeyValue final : public KeyValueSyncInterface { void erase_by_prefix(Slice prefix) final { auto lock = rw_mutex_.lock_write().move_as_ok(); - vector<uint64> ids; + vector<uint64> event_ids; table_remove_if(map_, [&](const auto &it) { if (begins_with(it.first, prefix)) { - ids.push_back(it.second.second); + event_ids.push_back(it.second.second); return true; } return false; }); - auto seq_no = binlog_->next_id(narrow_cast<int32>(ids.size())); + auto seq_no = binlog_->next_event_id(narrow_cast<int32>(event_ids.size())); lock.reset(); - for (auto id : ids) { - add_event(seq_no, BinlogEvent::create_raw(id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, + for (auto event_id : event_ids) { + add_event(seq_no, BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, EmptyStorer())); seq_no++; } diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/DbKey.h b/protocols/Telegram/tdlib/td/tddb/td/db/DbKey.h index 084f6283de..a7d596c1eb 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/DbKey.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/DbKey.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/KeyValueSyncInterface.h b/protocols/Telegram/tdlib/td/tddb/td/db/KeyValueSyncInterface.h index 4007d0bfd6..111882fb29 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/KeyValueSyncInterface.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/KeyValueSyncInterface.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -40,6 +40,8 @@ class KeyValueSyncInterface { virtual SeqNo erase(const string &key) = 0; + virtual SeqNo erase_batch(vector<string> keys) = 0; + virtual void erase_by_prefix(Slice prefix) = 0; virtual void force_sync(Promise<> &&promise) = 0; diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SeqKeyValue.h b/protocols/Telegram/tdlib/td/tddb/td/db/SeqKeyValue.h index 0ec10b682a..9870ac9391 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SeqKeyValue.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SeqKeyValue.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -43,6 +43,23 @@ class SeqKeyValue { return next_seq_no(); } + SeqNo erase_batch(vector<string> keys) { + size_t count = 0; + for (auto &key : keys) { + auto it = map_.find(key); + if (it != map_.end()) { + map_.erase(it); + count++; + } + } + if (count == 0) { + return 0; + } + SeqNo result = current_id_ + 1; + current_id_ += count; + return result; + } + SeqNo seq_no() const { return current_id_ + 1; } diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.cpp index a3c13d7120..9326cd580d 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.h index a8bae2b3c2..c9ca33dc8e 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteConnectionSafe.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.cpp index 62c5284f2c..a53b6a6411 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.h index da4d3c6dfc..6e242e9ebd 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteDb.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.cpp index 97e2e6fe94..42b6d8bce0 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -88,6 +88,12 @@ void SqliteKeyValue::erase(Slice key) { erase_stmt_.reset(); } +void SqliteKeyValue::erase_batch(vector<string> keys) { + for (auto &key : keys) { + erase(key); + } +} + void SqliteKeyValue::erase_by_prefix(Slice prefix) { auto next = next_prefix(prefix); if (next.empty()) { diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.h index bc617ff05f..914825da8a 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValue.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -47,6 +47,8 @@ class SqliteKeyValue { void erase(Slice key); + void erase_batch(vector<string> keys); + Status begin_read_transaction() TD_WARN_UNUSED_RESULT { return db_.begin_read_transaction(); } diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp index a0a35a1e54..71327598ec 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h index e5bc29b1e8..1e198a580c 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h index b61a96e193..9608514671 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.cpp index ca638aa1df..16f9643da8 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -183,7 +183,7 @@ Status SqliteStatement::step() { auto rc = sqlite3_step(stmt_.get()); VLOG(sqlite) << "Finish step with response " << (rc == SQLITE_ROW ? "ROW" : (rc == SQLITE_DONE ? "DONE" : "ERROR")); if (rc == SQLITE_ROW) { - state_ = State::GotRow; + state_ = State::HaveRow; return Status::OK(); } diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.h b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.h index 16ef022ef2..5a59976748 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/SqliteStatement.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -26,10 +26,10 @@ extern int VERBOSITY_NAME(sqlite); class SqliteStatement { public: SqliteStatement() = default; - SqliteStatement(const SqliteStatement &other) = delete; - SqliteStatement &operator=(const SqliteStatement &other) = delete; - SqliteStatement(SqliteStatement &&other) = default; - SqliteStatement &operator=(SqliteStatement &&other) = default; + SqliteStatement(const SqliteStatement &) = delete; + SqliteStatement &operator=(const SqliteStatement &) = delete; + SqliteStatement(SqliteStatement &&) = default; + SqliteStatement &operator=(SqliteStatement &&) = default; ~SqliteStatement(); Status bind_blob(int id, Slice blob) TD_WARN_UNUSED_RESULT; @@ -51,7 +51,7 @@ class SqliteStatement { return state_ != State::Finish; } bool has_row() const { - return state_ == State::GotRow; + return state_ == State::HaveRow; } bool empty() const { return !stmt_; @@ -76,7 +76,7 @@ class SqliteStatement { void operator()(sqlite3_stmt *stmt); }; - enum class State { Start, GotRow, Finish }; + enum class State { Start, HaveRow, Finish }; State state_ = State::Start; std::unique_ptr<sqlite3_stmt, StmtDeleter> stmt_; diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.cpp index c94d7defaa..65c509bd91 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -218,15 +218,15 @@ class TQueueImpl final : public TQueue { pop(q, queue_id, it, q.tail_id); } - void clear(QueueId queue_id, size_t keep_count) final { + std::map<EventId, RawEvent> clear(QueueId queue_id, size_t keep_count) final { auto queue_it = queues_.find(queue_id); if (queue_it == queues_.end()) { - return; + return {}; } auto &q = queue_it->second; auto size = get_size(q); if (size <= keep_count) { - return; + return {}; } auto start_time = Time::now(); @@ -236,15 +236,60 @@ class TQueueImpl final : public TQueue { for (size_t i = 0; i < keep_count; i++) { --end_it; } - for (auto it = q.events.begin(); it != end_it;) { - pop(q, queue_id, it, q.tail_id); + if (keep_count == 0) { + --end_it; + auto &event = end_it->second; + if (callback_ == nullptr || event.log_event_id == 0) { + ++end_it; + } else if (!event.data.empty()) { + clear_event_data(q, event); + callback_->push(queue_id, event); + } + } + + auto collect_deleted_event_ids_time = 0.0; + if (callback_ != nullptr) { + vector<uint64> deleted_log_event_ids; + deleted_log_event_ids.reserve(size - keep_count); + for (auto it = q.events.begin(); it != end_it; ++it) { + auto &event = it->second; + if (event.log_event_id != 0) { + deleted_log_event_ids.push_back(event.log_event_id); + } + } + collect_deleted_event_ids_time = Time::now() - start_time; + callback_->pop_batch(std::move(deleted_log_event_ids)); + } + auto callback_clear_time = Time::now() - start_time; + + std::map<EventId, RawEvent> deleted_events; + if (keep_count > size / 2) { + for (auto it = q.events.begin(); it != end_it;) { + q.total_event_length -= it->second.data.size(); + bool is_inserted = deleted_events.emplace(it->first, std::move(it->second)).second; + CHECK(is_inserted); + it = q.events.erase(it); + } + } else { + q.total_event_length = 0; + for (auto it = end_it; it != q.events.end();) { + q.total_event_length += it->second.data.size(); + bool is_inserted = deleted_events.emplace(it->first, std::move(it->second)).second; + CHECK(is_inserted); + it = q.events.erase(it); + } + std::swap(deleted_events, q.events); } auto clear_time = Time::now() - start_time; - if (clear_time > 0.1) { + if (clear_time > 0.02) { LOG(WARNING) << "Cleared " << (size - keep_count) << " TQueue events with total size " - << (total_event_length - q.total_event_length) << " in " << clear_time << " seconds"; + << (total_event_length - q.total_event_length) << " in " << clear_time - callback_clear_time + << " seconds, collected their identifiers in " << collect_deleted_event_ids_time + << " seconds, and deleted them from callback in " + << callback_clear_time - collect_deleted_event_ids_time << " seconds"; } + return deleted_events; } Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now, @@ -501,9 +546,14 @@ void TQueueBinlog<BinlogT>::pop(uint64 log_event_id) { } template <class BinlogT> +void TQueueBinlog<BinlogT>::pop_batch(std::vector<uint64> log_event_ids) { + binlog_->erase_batch(std::move(log_event_ids)); +} + +template <class BinlogT> Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q) const { TQueueLogEvent event; - TlParser parser(binlog_event.data_); + TlParser parser(binlog_event.get_data()); int32 has_extra = binlog_event.type_ - BINLOG_EVENT_TYPE; if (has_extra != 0 && has_extra != 1) { return Status::Error("Wrong magic"); @@ -555,4 +605,9 @@ void TQueueMemoryStorage::close(Promise<> promise) { promise.set_value({}); } +void TQueue::StorageCallback::pop_batch(std::vector<uint64> log_event_ids) { + for (auto id : log_event_ids) { + pop(id); + } +} } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.h b/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.h index 117726851b..c47d7a725f 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/TQueue.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -61,9 +61,9 @@ class TQueue { struct RawEvent { uint64 log_event_id{0}; EventId event_id; + int32 expires_at{0}; string data; int64 extra{0}; - int32 expires_at{0}; }; using QueueId = int64; @@ -83,6 +83,7 @@ class TQueue { virtual uint64 push(QueueId queue_id, const RawEvent &event) = 0; virtual void pop(uint64 log_event_id) = 0; virtual void close(Promise<> promise) = 0; + virtual void pop_batch(std::vector<uint64> log_event_ids); }; static unique_ptr<TQueue> create(); @@ -104,7 +105,7 @@ class TQueue { virtual void forget(QueueId queue_id, EventId event_id) = 0; - virtual void clear(QueueId queue_id, size_t keep_count) = 0; + virtual std::map<EventId, RawEvent> clear(QueueId queue_id, size_t keep_count) = 0; virtual EventId get_head(QueueId queue_id) const = 0; virtual EventId get_tail(QueueId queue_id) const = 0; @@ -128,6 +129,7 @@ class TQueueBinlog final : public TQueue::StorageCallback { public: uint64 push(QueueId queue_id, const RawEvent &event) final; void pop(uint64 log_event_id) final; + void pop_batch(std::vector<uint64> log_event_ids) final; Status replay(const BinlogEvent &binlog_event, TQueue &q) const TD_WARN_UNUSED_RESULT; void set_binlog(std::shared_ptr<BinlogT> binlog) { diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/TsSeqKeyValue.h b/protocols/Telegram/tdlib/td/tddb/td/db/TsSeqKeyValue.h index 5663bcdfff..ebb4132892 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/TsSeqKeyValue.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/TsSeqKeyValue.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -45,6 +45,11 @@ class TsSeqKeyValue { return kv_.erase(key); } + SeqNo erase_batch(vector<string> keys) { + auto lock = rw_mutex_.lock_write().move_as_ok(); + return kv_.erase_batch(std::move(keys)); + } + std::pair<SeqNo, RwMutex::WriteLock> erase_and_lock(const string &key) { auto lock = rw_mutex_.lock_write().move_as_ok(); return std::make_pair(kv_.erase(key), std::move(lock)); diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.cpp index 5b14eed69d..adbe97684c 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -51,24 +51,24 @@ struct AesCtrEncryptionEvent { return 2; } - BufferSlice key_salt_; - BufferSlice iv_; - BufferSlice key_hash_; + string key_salt_; + string iv_; + string key_hash_; - BufferSlice generate_key(const DbKey &db_key) const { + string generate_key(const DbKey &db_key) const { CHECK(!db_key.is_empty()); - BufferSlice key(key_size()); + string key(key_size(), '\0'); size_t iteration_count = kdf_iteration_count(); if (db_key.is_raw_key()) { iteration_count = kdf_fast_iteration_count(); } - pbkdf2_sha256(db_key.data(), key_salt_.as_slice(), narrow_cast<int>(iteration_count), key.as_slice()); + pbkdf2_sha256(db_key.data(), key_salt_, narrow_cast<int>(iteration_count), key); return key; } - static BufferSlice generate_hash(Slice key) { - BufferSlice hash(hash_size()); - hmac_sha256(key, "cucumbers everywhere", hash.as_slice()); + static string generate_hash(Slice key) { + string hash(hash_size(), '\0'); + hmac_sha256(key, "cucumbers everywhere", hash); return hash; } @@ -139,7 +139,9 @@ class BinlogReader { } event->debug_info_ = BinlogDebugInfo{__FILE__, __LINE__}; - TRY_STATUS(event->init(input_->cut_head(size_).move_as_buffer_slice())); + auto buffer_slice = input_->cut_head(size_).move_as_buffer_slice(); + event->init(buffer_slice.as_slice().str()); + TRY_STATUS(event->validate()); offset_ += size_; event->offset_ = offset_; state_ = State::ReadLength; @@ -208,15 +210,15 @@ Status Binlog::init(string path, const Callback &callback, DbKey db_key, DbKey o close().ignore(); return status; } - info_.last_id = processor_->last_id(); - last_id_ = processor_->last_id(); + info_.last_event_id = processor_->last_event_id(); + last_event_id_ = processor_->last_event_id(); if (info_.wrong_password) { close().ignore(); - return Status::Error(Error::WrongPassword, "Wrong password"); + return Status::Error(static_cast<int>(Error::WrongPassword), "Wrong password"); } if ((!db_key_.is_empty() && !db_key_used_) || (db_key_.is_empty() && encryption_type_ != EncryptionType::None)) { - aes_ctr_key_salt_ = BufferSlice(); + aes_ctr_key_salt_ = string(); do_reindex(); } @@ -304,7 +306,7 @@ void Binlog::close(Promise<> promise) { void Binlog::change_key(DbKey new_db_key) { db_key_ = std::move(new_db_key); - aes_ctr_key_salt_ = BufferSlice(); + aes_ctr_key_salt_ = string(); do_reindex(); } @@ -328,41 +330,30 @@ void Binlog::do_event(BinlogEvent &&event) { auto validate_status = event.validate(); if (validate_status.is_error()) { LOG(FATAL) << "Failed to validate binlog event " << validate_status << " " - << format::as_hex_dump<4>(Slice(event.raw_event_.as_slice().truncate(28))); + << format::as_hex_dump<4>(as_slice(event.raw_event_).truncate(28)); } VLOG(binlog) << "Write binlog event: " << format::cond(state_ == State::Reindex, "[reindex] ") << event.public_to_string(); - switch (encryption_type_) { - case EncryptionType::None: { - buffer_writer_.append(event.raw_event_.clone()); - break; - } - case EncryptionType::AesCtr: { - buffer_writer_.append(event.raw_event_.as_slice()); - break; - } - } + buffer_writer_.append(as_slice(event.raw_event_)); } if (event.type_ < 0) { if (event.type_ == BinlogEvent::ServiceTypes::AesCtrEncryption) { detail::AesCtrEncryptionEvent encryption_event; - encryption_event.parse(TlParser(event.data_)); + encryption_event.parse(TlParser(event.get_data())); - BufferSlice key; - if (aes_ctr_key_salt_.as_slice() == encryption_event.key_salt_.as_slice()) { - key = BufferSlice(as_slice(aes_ctr_key_)); + string key; + if (aes_ctr_key_salt_ == encryption_event.key_salt_) { + key = as_slice(aes_ctr_key_).str(); } else if (!db_key_.is_empty()) { key = encryption_event.generate_key(db_key_); } - if (detail::AesCtrEncryptionEvent::generate_hash(key.as_slice()).as_slice() != - encryption_event.key_hash_.as_slice()) { + if (detail::AesCtrEncryptionEvent::generate_hash(key) != encryption_event.key_hash_) { CHECK(state_ == State::Load); if (!old_db_key_.is_empty()) { key = encryption_event.generate_key(old_db_key_); - if (detail::AesCtrEncryptionEvent::generate_hash(key.as_slice()).as_slice() != - encryption_event.key_hash_.as_slice()) { + if (detail::AesCtrEncryptionEvent::generate_hash(key) != encryption_event.key_hash_) { info_.wrong_password = true; } } else { @@ -374,8 +365,8 @@ void Binlog::do_event(BinlogEvent &&event) { encryption_type_ = EncryptionType::AesCtr; - aes_ctr_key_salt_ = encryption_event.key_salt_.copy(); - update_encryption(key.as_slice(), encryption_event.iv_.as_slice()); + aes_ctr_key_salt_ = encryption_event.key_salt_; + update_encryption(key, encryption_event.iv_); if (state_ == State::Load) { update_read_encryption(); @@ -439,6 +430,17 @@ void Binlog::flush() { } need_flush_since_ = 0; LOG_IF(FATAL, fd_.need_flush_write()) << "Failed to flush binlog"; + + if (state_ == State::Run && Time::now() > next_buffer_flush_time_) { + VLOG(binlog) << "Flush write buffer"; + buffer_writer_ = ChainBufferWriter(); + buffer_reader_ = buffer_writer_.extract_reader(); + if (encryption_type_ == EncryptionType::AesCtr) { + aes_ctr_state_ = aes_xcode_byte_flow_.move_aes_ctr_state(); + } + update_write_encryption(); + next_buffer_flush_time_ = Time::now() + 1.0; + } } void Binlog::lazy_flush() { @@ -552,6 +554,7 @@ Status Binlog::load_binlog(const Callback &callback, const Callback &debug_callb } auto offset = processor_->offset(); + CHECK(offset >= 0); processor_->for_each([&](BinlogEvent &event) { VLOG(binlog) << "Replay binlog event: " << event.public_to_string(); if (callback) { @@ -583,9 +586,9 @@ Status Binlog::load_binlog(const Callback &callback, const Callback &debug_callb } void Binlog::update_encryption(Slice key, Slice iv) { - as_slice(aes_ctr_key_).copy_from(key); + as_mutable_slice(aes_ctr_key_).copy_from(key); UInt128 aes_ctr_iv; - as_slice(aes_ctr_iv).copy_from(iv); + as_mutable_slice(aes_ctr_iv).copy_from(iv); aes_ctr_state_.init(as_slice(aes_ctr_key_), as_slice(aes_ctr_iv)); } @@ -599,22 +602,22 @@ void Binlog::reset_encryption() { EncryptionEvent event; if (aes_ctr_key_salt_.empty()) { - event.key_salt_ = BufferSlice(EncryptionEvent::default_salt_size()); - Random::secure_bytes(event.key_salt_.as_slice()); + event.key_salt_.resize(EncryptionEvent::default_salt_size()); + Random::secure_bytes(event.key_salt_); } else { - event.key_salt_ = aes_ctr_key_salt_.clone(); + event.key_salt_ = aes_ctr_key_salt_; } - event.iv_ = BufferSlice(EncryptionEvent::iv_size()); - Random::secure_bytes(event.iv_.as_slice()); + event.iv_.resize(EncryptionEvent::iv_size()); + Random::secure_bytes(event.iv_); - BufferSlice key; - if (aes_ctr_key_salt_.as_slice() == event.key_salt_.as_slice()) { - key = BufferSlice(as_slice(aes_ctr_key_)); + string key; + if (aes_ctr_key_salt_ == event.key_salt_) { + key = as_slice(aes_ctr_key_).str(); } else { key = event.generate_key(db_key_); } - event.key_hash_ = EncryptionEvent::generate_hash(key.as_slice()); + event.key_hash_ = EncryptionEvent::generate_hash(key); do_event(BinlogEvent( BinlogEvent::create_raw(0, BinlogEvent::ServiceTypes::AesCtrEncryption, 0, create_default_storer(event)), @@ -654,11 +657,16 @@ void Binlog::do_reindex() { fd_events_ = 0; reset_encryption(); processor_->for_each([&](BinlogEvent &event) { - event.realloc(); do_event(std::move(event)); // NB: no move is actually happens }); - need_sync_ = start_size != 0; // must sync creation of the file if it is non-empty - sync(); + { + flush(); + if (start_size != 0) { // must sync creation of the file if it is non-empty + auto status = fd_.sync_barrier(); + LOG_IF(FATAL, status.is_error()) << "Failed to sync binlog: " << status; + } + need_sync_ = false; + } // finish_reindex auto status = unlink(path_); diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.h index 88dbacf58f..4b13b8a94f 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/Binlog.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -31,7 +31,7 @@ extern int32 VERBOSITY_NAME(binlog); struct BinlogInfo { bool was_created{false}; - uint64 last_id{0}; + uint64 last_event_id{0}; bool is_encrypted{false}; bool wrong_password{false}; bool is_opened{false}; @@ -45,28 +45,28 @@ class BinlogEventsBuffer; class Binlog { public: - enum Error : int { WrongPassword = -1 }; + enum class Error : int { WrongPassword = -1037284 }; Binlog(); - Binlog(const Binlog &other) = delete; - Binlog &operator=(const Binlog &other) = delete; - Binlog(Binlog &&other) = delete; - Binlog &operator=(Binlog &&other) = delete; + Binlog(const Binlog &) = delete; + Binlog &operator=(const Binlog &) = delete; + Binlog(Binlog &&) = delete; + Binlog &operator=(Binlog &&) = delete; ~Binlog(); using Callback = std::function<void(const BinlogEvent &)>; Status init(string path, const Callback &callback, DbKey db_key = DbKey::empty(), DbKey old_db_key = DbKey::empty(), int32 dummy = -1, const Callback &debug_callback = Callback()) TD_WARN_UNUSED_RESULT; - uint64 next_id() { - return ++last_id_; + uint64 next_event_id() { + return ++last_event_id_; } - uint64 next_id(int32 shift) { - auto res = last_id_ + 1; - last_id_ += shift; + uint64 next_event_id(int32 shift) { + auto res = last_event_id_ + 1; + last_event_id_ += shift; return res; } - uint64 peek_next_id() const { - return last_id_ + 1; + uint64 peek_next_event_id() const { + return last_event_id_ + 1; } bool empty() const { @@ -74,22 +74,33 @@ class Binlog { } uint64 add(int32 type, const Storer &storer) { - auto log_event_id = next_id(); - add_raw_event(BinlogEvent::create_raw(log_event_id, type, 0, storer), {}); - return log_event_id; + auto event_id = next_event_id(); + add_raw_event(BinlogEvent::create_raw(event_id, type, 0, storer), {}); + return event_id; } - uint64 rewrite(uint64 log_event_id, int32 type, const Storer &storer) { - auto seq_no = next_id(); - add_raw_event(BinlogEvent::create_raw(log_event_id, type, BinlogEvent::Flags::Rewrite, storer), {}); + uint64 rewrite(uint64 event_id, int32 type, const Storer &storer) { + auto seq_no = next_event_id(); + add_raw_event(BinlogEvent::create_raw(event_id, type, BinlogEvent::Flags::Rewrite, storer), {}); return seq_no; } - uint64 erase(uint64 log_event_id) { - auto seq_no = next_id(); - add_raw_event(BinlogEvent::create_raw(log_event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, - EmptyStorer()), - {}); + uint64 erase(uint64 event_id) { + auto seq_no = next_event_id(); + add_raw_event( + BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, EmptyStorer()), + {}); + return seq_no; + } + + uint64 erase_batch(vector<uint64> event_ids) { + if (event_ids.empty()) { + return 0; + } + auto seq_no = next_event_id(0); + for (auto event_id : event_ids) { + erase(event_id); + } return seq_no; } @@ -132,7 +143,7 @@ class Binlog { enum class EncryptionType { None, AesCtr } encryption_type_ = EncryptionType::None; // AesCtrEncryption - BufferSlice aes_ctr_key_salt_; + string aes_ctr_key_salt_; UInt256 aes_ctr_key_; AesCtrState aes_ctr_state_; @@ -144,12 +155,13 @@ class Binlog { int64 fd_size_{0}; uint64 fd_events_{0}; string path_; - std::vector<BinlogEvent> pending_events_; + vector<BinlogEvent> pending_events_; unique_ptr<detail::BinlogEventsProcessor> processor_; unique_ptr<detail::BinlogEventsBuffer> events_buffer_; bool in_flush_events_buffer_{false}; - uint64 last_id_{0}; + uint64 last_event_id_{0}; double need_flush_since_ = 0; + double next_buffer_flush_time_ = 0; bool need_sync_{false}; enum class State { Empty, Load, Reindex, Run } state_{State::Empty}; diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp index 09f9fc40e3..c74f7378bd 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -14,45 +14,49 @@ namespace td { -Status BinlogEvent::init(BufferSlice &&raw_event, bool check_crc) { - TlParser parser(raw_event.as_slice()); - size_ = parser.fetch_int(); - LOG_CHECK(size_ == raw_event.size()) << size_ << " " << raw_event.size() << debug_info_; - id_ = parser.fetch_long(); +void BinlogEvent::init(string raw_event) { + TlParser parser(as_slice(raw_event)); + size_ = static_cast<uint32>(parser.fetch_int()); + LOG_CHECK(size_ == raw_event.size()) << size_ << ' ' << raw_event.size() << debug_info_; + id_ = static_cast<uint64>(parser.fetch_long()); type_ = parser.fetch_int(); flags_ = parser.fetch_int(); - extra_ = parser.fetch_long(); + extra_ = static_cast<uint64>(parser.fetch_long()); CHECK(size_ >= MIN_SIZE); - auto slice_data = parser.fetch_string_raw<Slice>(size_ - MIN_SIZE); - data_ = MutableSlice(const_cast<char *>(slice_data.begin()), slice_data.size()); + parser.fetch_string_raw<Slice>(size_ - MIN_SIZE); // skip data crc32_ = static_cast<uint32>(parser.fetch_int()); - if (check_crc) { - auto calculated_crc = crc32(raw_event.as_slice().substr(0, size_ - TAIL_SIZE)); - if (calculated_crc != crc32_) { - return Status::Error(PSLICE() << "crc mismatch " << tag("actual", format::as_hex(calculated_crc)) - << tag("expected", format::as_hex(crc32_)) << public_to_string()); - } - } raw_event_ = std::move(raw_event); - return Status::OK(); +} + +Slice BinlogEvent::get_data() const { + CHECK(raw_event_.size() >= MIN_SIZE); + return Slice(as_slice(raw_event_).data() + HEADER_SIZE, raw_event_.size() - MIN_SIZE); } Status BinlogEvent::validate() const { - BinlogEvent event; - if (raw_event_.size() < 4) { + if (raw_event_.size() < MIN_SIZE) { return Status::Error("Too small event"); } - uint32 size = TlParser(raw_event_.as_slice().substr(0, 4)).fetch_int(); - if (size_ != size) { - return Status::Error(PSLICE() << "Size of event changed: " << tag("was", size_) << tag("now", size)); + TlParser parser(as_slice(raw_event_)); + auto size = static_cast<uint32>(parser.fetch_int()); + if (size_ != size || size_ != raw_event_.size()) { + return Status::Error(PSLICE() << "Size of event changed: " << tag("was", size_) << tag("now", size) + << tag("real size", raw_event_.size())); } - return event.init(raw_event_.clone(), true); + parser.fetch_string_raw<Slice>(size_ - TAIL_SIZE - sizeof(int)); // skip + auto stored_crc32 = static_cast<uint32>(parser.fetch_int()); + auto calculated_crc = crc32(Slice(as_slice(raw_event_).data(), size_ - TAIL_SIZE)); + if (calculated_crc != crc32_ || calculated_crc != stored_crc32) { + return Status::Error(PSLICE() << "CRC mismatch " << tag("actual", format::as_hex(calculated_crc)) + << tag("expected", format::as_hex(crc32_)) << public_to_string()); + } + return Status::OK(); } BufferSlice BinlogEvent::create_raw(uint64 id, int32 type, int32 flags, const Storer &storer) { auto raw_event = BufferSlice{storer.size() + MIN_SIZE}; - TlStorerUnsafe tl_storer(raw_event.as_slice().ubegin()); + TlStorerUnsafe tl_storer(raw_event.as_mutable_slice().ubegin()); tl_storer.store_int(narrow_cast<int32>(raw_event.size())); tl_storer.store_long(id); tl_storer.store_int(type); @@ -68,11 +72,4 @@ BufferSlice BinlogEvent::create_raw(uint64 id, int32 type, int32 flags, const St return raw_event; } -void BinlogEvent::realloc() { - auto data_offset = data_.begin() - raw_event_.as_slice().begin(); - auto data_size = data_.size(); - raw_event_ = raw_event_.copy(); - data_ = raw_event_.as_slice().substr(data_offset, data_size); -} - } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.h index 8fab08da98..ca93bb1937 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogEvent.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -53,66 +53,62 @@ struct BinlogEvent { static constexpr size_t TAIL_SIZE = 4; static constexpr size_t MIN_SIZE = HEADER_SIZE + TAIL_SIZE; - int64 offset_; + int64 offset_ = -1; - uint32 size_; - uint64 id_; - int32 type_; // type can be merged with flags - int32 flags_; - uint64 extra_; - MutableSlice data_; - uint32 crc32_; + uint32 size_ = 0; + uint64 id_ = 0; + int32 type_ = 0; // type can be merged with flags + int32 flags_ = 0; + uint64 extra_ = 0; + uint32 crc32_ = 0; - BufferSlice raw_event_; + string raw_event_; BinlogDebugInfo debug_info_; enum ServiceTypes { Header = -1, Empty = -2, AesCtrEncryption = -3, NoEncryption = -4 }; enum Flags { Rewrite = 1, Partial = 2 }; - void clear() { - raw_event_ = BufferSlice(); - } - bool empty() const { + Slice get_data() const; + + bool is_empty() const { return raw_event_.empty(); } + BinlogEvent clone() const { BinlogEvent result; result.debug_info_ = BinlogDebugInfo{__FILE__, __LINE__}; - result.init(raw_event_.clone()).ensure(); + result.init(raw_event_); + result.validate().ensure(); return result; } BufferSlice data_as_buffer_slice() const { - return raw_event_.from_slice(data_); + return BufferSlice(get_data()); } BinlogEvent() = default; - //explicit BinlogEvent(BufferSlice &&raw_event) { - //init(std::move(raw_event), false).ensure(); - //} + BinlogEvent(BufferSlice &&raw_event, BinlogDebugInfo info) { debug_info_ = info; - init(std::move(raw_event), false).ensure(); + init(raw_event.as_slice().str()); } - Status init(BufferSlice &&raw_event, bool check_crc = true) TD_WARN_UNUSED_RESULT; - static BufferSlice create_raw(uint64 id, int32 type, int32 flags, const Storer &storer); - std::string public_to_string() const { + string public_to_string() const { return PSTRING() << "LogEvent[" << tag("id", format::as_hex(id_)) << tag("type", type_) << tag("flags", flags_) - << tag("data", data_.size()) << "]" << debug_info_; + << tag("data", get_data().size()) << "]" << debug_info_; } - Status validate() const; + void init(string raw_event); - void realloc(); + Status validate() const TD_WARN_UNUSED_RESULT; }; inline StringBuilder &operator<<(StringBuilder &sb, const BinlogEvent &event) { return sb << "LogEvent[" << tag("id", format::as_hex(event.id_)) << tag("type", event.type_) - << tag("flags", event.flags_) << tag("data", format::as_hex_dump<4>(event.data_)) << "]" + << tag("flags", event.flags_) << tag("data", format::as_hex_dump<4>(event.get_data())) << "]" << event.debug_info_; } diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogHelper.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogHelper.h index 66a65e060d..9348c26129 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogHelper.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogHelper.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogInterface.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogInterface.h index b3ec7b119f..d46297f944 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogInterface.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/BinlogInterface.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -31,35 +31,46 @@ class BinlogInterface { void close_and_destroy(Promise<> promise = {}) { close_and_destroy_impl(std::move(promise)); } - void add_raw_event(BinlogDebugInfo info, uint64 id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) { - add_raw_event_impl(id, std::move(raw_event), std::move(promise), info); + void add_raw_event(BinlogDebugInfo info, uint64 event_id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) { + add_raw_event_impl(event_id, std::move(raw_event), std::move(promise), info); } - void add_raw_event(uint64 id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) { - add_raw_event_impl(id, std::move(raw_event), std::move(promise), {}); + void add_raw_event(uint64 event_id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) { + add_raw_event_impl(event_id, std::move(raw_event), std::move(promise), {}); } void lazy_sync(Promise<> promise = Promise<>()) { - add_raw_event_impl(next_id(), BufferSlice(), std::move(promise), {}); + add_raw_event_impl(next_event_id(), BufferSlice(), std::move(promise), {}); } uint64 add(int32 type, const Storer &storer, Promise<> promise = Promise<>()) { - auto log_event_id = next_id(); - add_raw_event_impl(log_event_id, BinlogEvent::create_raw(log_event_id, type, 0, storer), std::move(promise), {}); - return log_event_id; + auto event_id = next_event_id(); + add_raw_event_impl(event_id, BinlogEvent::create_raw(event_id, type, 0, storer), std::move(promise), {}); + return event_id; } - uint64 rewrite(uint64 log_event_id, int32 type, const Storer &storer, Promise<> promise = Promise<>()) { - auto seq_no = next_id(); - add_raw_event_impl(seq_no, BinlogEvent::create_raw(log_event_id, type, BinlogEvent::Flags::Rewrite, storer), + uint64 rewrite(uint64 event_id, int32 type, const Storer &storer, Promise<> promise = Promise<>()) { + auto seq_no = next_event_id(); + add_raw_event_impl(seq_no, BinlogEvent::create_raw(event_id, type, BinlogEvent::Flags::Rewrite, storer), std::move(promise), {}); return seq_no; } - uint64 erase(uint64 log_event_id, Promise<> promise = Promise<>()) { - auto seq_no = next_id(); - add_raw_event_impl(seq_no, - BinlogEvent::create_raw(log_event_id, BinlogEvent::ServiceTypes::Empty, - BinlogEvent::Flags::Rewrite, EmptyStorer()), - std::move(promise), {}); + uint64 erase(uint64 event_id, Promise<> promise = Promise<>()) { + auto seq_no = next_event_id(); + add_raw_event_impl( + seq_no, + BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, EmptyStorer()), + std::move(promise), {}); + return seq_no; + } + + virtual uint64 erase_batch(vector<uint64> event_ids) { + if (event_ids.empty()) { + return 0; + } + uint64 seq_no = next_event_id(0); + for (auto event_id : event_ids) { + erase(event_id); + } return seq_no; } @@ -67,13 +78,13 @@ class BinlogInterface { virtual void force_flush() = 0; virtual void change_key(DbKey db_key, Promise<> promise) = 0; - virtual uint64 next_id() = 0; - virtual uint64 next_id(int32 shift) = 0; + virtual uint64 next_event_id() = 0; + virtual uint64 next_event_id(int32 shift) = 0; protected: virtual void close_impl(Promise<> promise) = 0; virtual void close_and_destroy_impl(Promise<> promise) = 0; - virtual void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) = 0; + virtual void add_raw_event_impl(uint64 seq_no, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) = 0; }; } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp index 16a4e2c010..437a4bbbfa 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -7,6 +7,7 @@ #include "td/db/binlog/ConcurrentBinlog.h" #include "td/utils/logging.h" +#include "td/utils/misc.h" #include "td/utils/OrderedEventsProcessor.h" #include "td/utils/SliceBuilder.h" #include "td/utils/Time.h" @@ -39,8 +40,18 @@ class BinlogActor final : public Actor { Promise<> sync_promise; BinlogDebugInfo debug_info; }; + + void erase_batch(uint64 seq_no, std::vector<uint64> event_ids) { + for (auto event_id : event_ids) { + auto event = BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, + EmptyStorer()); + add_raw_event(seq_no, std::move(event), Promise<Unit>(), BinlogDebugInfo{__FILE__, __LINE__}); + seq_no++; + } + } + void add_raw_event(uint64 seq_no, BufferSlice &&raw_event, Promise<> &&promise, BinlogDebugInfo info) { - processor_.add(seq_no, Event{std::move(raw_event), std::move(promise), info}, [&](uint64 id, Event &&event) { + processor_.add(seq_no, Event{std::move(raw_event), std::move(promise), info}, [&](uint64 event_id, Event &&event) { if (!event.raw_event.empty()) { do_add_raw_event(std::move(event.raw_event), event.debug_info); } @@ -178,9 +189,9 @@ Result<BinlogInfo> ConcurrentBinlog::init(string path, const Callback &callback, void ConcurrentBinlog::init_impl(unique_ptr<Binlog> binlog, int32 scheduler_id) { path_ = binlog->get_path().str(); - last_id_ = binlog->peek_next_id(); + last_event_id_ = binlog->peek_next_event_id(); binlog_actor_ = create_actor_on_scheduler<detail::BinlogActor>(PSLICE() << "Binlog " << path_, scheduler_id, - std::move(binlog), last_id_); + std::move(binlog), last_event_id_); } void ConcurrentBinlog::close_impl(Promise<> promise) { @@ -189,8 +200,10 @@ void ConcurrentBinlog::close_impl(Promise<> promise) { void ConcurrentBinlog::close_and_destroy_impl(Promise<> promise) { send_closure(std::move(binlog_actor_), &detail::BinlogActor::close_and_destroy, std::move(promise)); } -void ConcurrentBinlog::add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) { - send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, id, std::move(raw_event), std::move(promise), info); +void ConcurrentBinlog::add_raw_event_impl(uint64 event_id, BufferSlice &&raw_event, Promise<> promise, + BinlogDebugInfo info) { + send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, event_id, std::move(raw_event), std::move(promise), + info); } void ConcurrentBinlog::force_sync(Promise<> promise) { send_closure(binlog_actor_, &detail::BinlogActor::force_sync, std::move(promise)); @@ -201,4 +214,15 @@ void ConcurrentBinlog::force_flush() { void ConcurrentBinlog::change_key(DbKey db_key, Promise<> promise) { send_closure(binlog_actor_, &detail::BinlogActor::change_key, std::move(db_key), std::move(promise)); } + +uint64 ConcurrentBinlog::erase_batch(vector<uint64> event_ids) { + auto shift = narrow_cast<int32>(event_ids.size()); + if (shift == 0) { + return 0; + } + auto seq_no = next_event_id(shift); + send_closure(binlog_actor_, &detail::BinlogActor::erase_batch, seq_no, std::move(event_ids)); + return seq_no; +} + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h index ad6c9bc5c3..f850e2ad46 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -35,36 +35,38 @@ class ConcurrentBinlog final : public BinlogInterface { ConcurrentBinlog(); explicit ConcurrentBinlog(unique_ptr<Binlog> binlog, int scheduler_id = -1); - ConcurrentBinlog(const ConcurrentBinlog &other) = delete; - ConcurrentBinlog &operator=(const ConcurrentBinlog &other) = delete; - ConcurrentBinlog(ConcurrentBinlog &&other) = delete; - ConcurrentBinlog &operator=(ConcurrentBinlog &&other) = delete; + ConcurrentBinlog(const ConcurrentBinlog &) = delete; + ConcurrentBinlog &operator=(const ConcurrentBinlog &) = delete; + ConcurrentBinlog(ConcurrentBinlog &&) = delete; + ConcurrentBinlog &operator=(ConcurrentBinlog &&) = delete; ~ConcurrentBinlog() final; void force_sync(Promise<> promise) final; void force_flush() final; void change_key(DbKey db_key, Promise<> promise) final; - uint64 next_id() final { - return last_id_.fetch_add(1, std::memory_order_relaxed); + uint64 next_event_id() final { + return last_event_id_.fetch_add(1, std::memory_order_relaxed); } - uint64 next_id(int32 shift) final { - return last_id_.fetch_add(shift, std::memory_order_relaxed); + uint64 next_event_id(int32 shift) final { + return last_event_id_.fetch_add(shift, std::memory_order_relaxed); } CSlice get_path() const { return path_; } + uint64 erase_batch(vector<uint64> event_ids) final; + private: void init_impl(unique_ptr<Binlog> binlog, int scheduler_id); void close_impl(Promise<> promise) final; void close_and_destroy_impl(Promise<> promise) final; - void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) final; + void add_raw_event_impl(uint64 event_id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) final; ActorOwn<detail::BinlogActor> binlog_actor_; string path_; - std::atomic<uint64> last_id_{0}; + std::atomic<uint64> last_event_id_{0}; }; } // namespace td diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp index f3984062fc..fd2498f256 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -44,20 +44,20 @@ struct Trie { }; td::vector<FullNode> nodes_; - void do_add(int id, td::Slice value) { - nodes_[id].sum++; + void do_add(int event_id, td::Slice value) { + nodes_[event_id].sum++; if (value.empty()) { return; } auto c = static_cast<td::uint8>(value[0]); - auto next_id = nodes_[id].next[c]; - if (next_id == 0) { - next_id = static_cast<int>(nodes_.size()); + auto next_event_id = nodes_[event_id].next[c]; + if (next_event_id == 0) { + next_event_id = static_cast<int>(nodes_.size()); nodes_.emplace_back(); - nodes_[id].next[c] = next_id; + nodes_[event_id].next[c] = next_event_id; } - do_add(next_id, value.substr(1)); + do_add(next_event_id, value.substr(1)); } void do_dump(td::string path, int v) { @@ -84,11 +84,11 @@ struct Trie { return; } for (int c = 0; c < 256; c++) { - auto next_id = nodes_[v].next[c]; - if (next_id == 0) { + auto next_event_id = nodes_[v].next[c]; + if (next_event_id == 0) { continue; } - do_dump(path + static_cast<char>(c), next_id); + do_dump(path + static_cast<char>(c), next_event_id); } } }; @@ -125,7 +125,7 @@ int main(int argc, char *argv[]) { info[0].compressed_size += event.raw_event_.size(); info[event.type_].compressed_size += event.raw_event_.size(); if (event.type_ == ConfigPmcMagic || event.type_ == BinlogPmcMagic) { - auto key = td::TlParser(event.data_).fetch_string<td::Slice>(); + auto key = td::TlParser(event.get_data()).fetch_string<td::Slice>(); info[event.type_].compressed_trie.add(key); } }, @@ -134,12 +134,13 @@ int main(int argc, char *argv[]) { info[0].full_size += event.raw_event_.size(); info[event.type_].full_size += event.raw_event_.size(); if (event.type_ == ConfigPmcMagic || event.type_ == BinlogPmcMagic) { - auto key = td::TlParser(event.data_).fetch_string<td::Slice>(); + auto key = td::TlParser(event.get_data()).fetch_string<td::Slice>(); info[event.type_].trie.add(key); } - LOG(PLAIN) << "LogEvent[" << td::tag("id", td::format::as_hex(event.id_)) << td::tag("type", event.type_) - << td::tag("flags", event.flags_) << td::tag("size", event.data_.size()) - << td::tag("data", td::format::escaped(event.data_)) << "]\n"; + LOG(PLAIN) << "LogEvent[" << td::tag("event_id", td::format::as_hex(event.id_)) + << td::tag("type", event.type_) << td::tag("flags", event.flags_) + << td::tag("size", event.get_data().size()) + << td::tag("data", td::format::escaped(event.get_data())) << "]\n"; }) .ensure(); diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp index cb4e4817c5..e6030c6f15 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h index 5b7cab8212..37be436c0b 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -25,7 +25,7 @@ class BinlogEventsBuffer { auto &event = events_[i]; if (i + 1 != ids_.size() && (event.flags_ & BinlogEvent::Flags::Partial) == 0) { callback(BinlogEvent(BinlogEvent::create_raw(event.id_, event.type_, event.flags_ | BinlogEvent::Flags::Partial, - create_storer(event.data_)), + create_storer(event.get_data())), BinlogDebugInfo{__FILE__, __LINE__})); } else { callback(std::move(event)); diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp index 7b2832b39a..6c07d292e9 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -16,18 +16,18 @@ namespace detail { Status BinlogEventsProcessor::do_event(BinlogEvent &&event) { offset_ = event.offset_; - auto fixed_id = event.id_ * 2; - if ((event.flags_ & BinlogEvent::Flags::Rewrite) && !ids_.empty() && ids_.back() >= fixed_id) { - auto it = std::lower_bound(ids_.begin(), ids_.end(), fixed_id); - if (it == ids_.end() || *it != fixed_id) { + auto fixed_event_id = event.id_ * 2; + if ((event.flags_ & BinlogEvent::Flags::Rewrite) && !event_ids_.empty() && event_ids_.back() >= fixed_event_id) { + auto it = std::lower_bound(event_ids_.begin(), event_ids_.end(), fixed_event_id); + if (it == event_ids_.end() || *it != fixed_event_id) { return Status::Error(PSLICE() << "Ignore rewrite log event " << event.public_to_string()); } - auto pos = it - ids_.begin(); + auto pos = it - event_ids_.begin(); total_raw_events_size_ -= static_cast<int64>(events_[pos].raw_event_.size()); if (event.type_ == BinlogEvent::ServiceTypes::Empty) { *it += 1; empty_events_++; - events_[pos].clear(); + events_[pos] = {}; } else { event.flags_ &= ~BinlogEvent::Flags::Rewrite; total_raw_events_size_ += static_cast<int64>(event.raw_event_.size()); @@ -36,15 +36,15 @@ Status BinlogEventsProcessor::do_event(BinlogEvent &&event) { } else if (event.type_ < 0) { // just skip service events } else { - if (!(ids_.empty() || ids_.back() < fixed_id)) { - return Status::Error(PSLICE() << offset_ << " " << ids_.size() << " " << ids_.back() << " " << fixed_id << " " - << event.public_to_string() << " " << total_events_ << " " + if (!(event_ids_.empty() || event_ids_.back() < fixed_event_id)) { + return Status::Error(PSLICE() << offset_ << ' ' << event_ids_.size() << ' ' << event_ids_.back() << ' ' + << fixed_event_id << ' ' << event.public_to_string() << ' ' << total_events_ << ' ' << total_raw_events_size_); } - last_id_ = event.id_; + last_event_id_ = event.id_; total_raw_events_size_ += static_cast<int64>(event.raw_event_.size()); total_events_++; - ids_.push_back(fixed_id); + event_ids_.push_back(fixed_event_id); events_.emplace_back(std::move(event)); } @@ -55,22 +55,25 @@ Status BinlogEventsProcessor::do_event(BinlogEvent &&event) { } void BinlogEventsProcessor::compactify() { - CHECK(ids_.size() == events_.size()); - auto ids_from = ids_.begin(); - auto ids_to = ids_from; + CHECK(event_ids_.size() == events_.size()); + auto event_ids_from = event_ids_.begin(); + auto event_ids_to = event_ids_from; auto events_from = events_.begin(); auto events_to = events_from; - for (; ids_from != ids_.end(); ids_from++, events_from++) { - if ((*ids_from & 1) == 0) { - *ids_to++ = *ids_from; - *events_to++ = std::move(*events_from); + for (; event_ids_from != event_ids_.end(); event_ids_from++, events_from++) { + if ((*event_ids_from & 1) == 0) { + *event_ids_to++ = *event_ids_from; + if (events_to != events_from) { + *events_to = std::move(*events_from); + } + events_to++; } } - ids_.erase(ids_to, ids_.end()); + event_ids_.erase(event_ids_to, event_ids_.end()); events_.erase(events_to, events_.end()); - total_events_ = ids_.size(); + total_events_ = event_ids_.size(); empty_events_ = 0; - CHECK(ids_.size() == events_.size()); + CHECK(event_ids_.size() == events_.size()); } } // namespace detail diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h index 8b276aef03..b358548a24 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -23,17 +23,18 @@ class BinlogEventsProcessor { template <class CallbackT> void for_each(CallbackT &&callback) { - for (size_t i = 0; i < ids_.size(); i++) { - LOG_CHECK(i == 0 || ids_[i - 1] < ids_[i]) << ids_[i - 1] << " " << events_[i - 1].public_to_string() << " " - << ids_[i] << " " << events_[i].public_to_string(); - if ((ids_[i] & 1) == 0) { + for (size_t i = 0; i < event_ids_.size(); i++) { + LOG_CHECK(i == 0 || event_ids_[i - 1] < event_ids_[i]) + << event_ids_[i - 1] << " " << events_[i - 1].public_to_string() << " " << event_ids_[i] << " " + << events_[i].public_to_string(); + if ((event_ids_[i] & 1) == 0) { callback(events_[i]); } } } - uint64 last_id() const { - return last_id_; + uint64 last_event_id() const { + return last_event_id_; } int64 offset() const { return offset_; @@ -43,12 +44,12 @@ class BinlogEventsProcessor { } private: - // holds (id * 2 + was_deleted) - std::vector<uint64> ids_; + // holds (event_id * 2 + was_deleted) + std::vector<uint64> event_ids_; std::vector<BinlogEvent> events_; size_t total_events_{0}; size_t empty_events_{0}; - uint64 last_id_{0}; + uint64 last_event_id_{0}; int64 offset_{0}; int64 total_raw_events_size_{0}; diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp b/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp index eb6389eebc..0d4408d476 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp +++ b/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) diff --git a/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.h b/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.h index ad67aa949a..1ff955f38e 100644 --- a/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.h +++ b/protocols/Telegram/tdlib/td/tddb/td/db/detail/RawSqliteDb.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) |