diff options
Diffstat (limited to 'protocols/Telegram/tdlib/td/test/tqueue.cpp')
-rw-r--r-- | protocols/Telegram/tdlib/td/test/tqueue.cpp | 250 |
1 files changed, 250 insertions, 0 deletions
diff --git a/protocols/Telegram/tdlib/td/test/tqueue.cpp b/protocols/Telegram/tdlib/td/test/tqueue.cpp new file mode 100644 index 0000000000..b1bf94a09e --- /dev/null +++ b/protocols/Telegram/tdlib/td/test/tqueue.cpp @@ -0,0 +1,250 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/binlog/Binlog.h" +#include "td/db/binlog/BinlogEvent.h" +#include "td/db/binlog/BinlogHelper.h" +#include "td/db/TQueue.h" + +#include "td/utils/buffer.h" +#include "td/utils/common.h" +#include "td/utils/int_types.h" +#include "td/utils/logging.h" +#include "td/utils/Random.h" +#include "td/utils/Slice.h" +#include "td/utils/SliceBuilder.h" +#include "td/utils/Span.h" +#include "td/utils/tests.h" +#include "td/utils/Time.h" + +#include <memory> +#include <utility> + +TEST(TQueue, hands) { + td::TQueue::Event events[100]; + auto events_span = td::MutableSpan<td::TQueue::Event>(events, 100); + + auto tqueue = td::TQueue::create(); + auto qid = 12; + ASSERT_EQ(true, tqueue->get_head(qid).empty()); + ASSERT_EQ(true, tqueue->get_tail(qid).empty()); + tqueue->push(qid, "hello", 1, 0, td::TQueue::EventId()); + auto head = tqueue->get_head(qid); + auto tail = tqueue->get_tail(qid); + ASSERT_EQ(head.next().ok(), tail); + ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok()); + ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok()); + ASSERT_EQ(1u, tqueue->get(qid, tail, false, 0, events_span).move_as_ok()); + ASSERT_EQ(1u, tqueue->get(qid, head, true, 0, events_span).move_as_ok()); + ASSERT_EQ(0u, tqueue->get(qid, tail, true, 0, events_span).move_as_ok()); + ASSERT_EQ(0u, tqueue->get(qid, head, true, 0, events_span).move_as_ok()); +} + +class TestTQueue { + public: + using EventId = td::TQueue::EventId; + + static td::CSlice binlog_path() { + return td::CSlice("tqueue_binlog"); + } + + TestTQueue() { + baseline_ = td::TQueue::create(); + + memory_ = td::TQueue::create(); + auto memory_storage = td::make_unique<td::TQueueMemoryStorage>(); + memory_storage_ = memory_storage.get(); + memory_->set_callback(std::move(memory_storage)); + + binlog_ = td::TQueue::create(); + auto tqueue_binlog = td::make_unique<td::TQueueBinlog<td::Binlog>>(); + td::Binlog::destroy(binlog_path()).ensure(); + auto binlog = std::make_shared<td::Binlog>(); + binlog->init(binlog_path().str(), [&](const td::BinlogEvent &event) { UNREACHABLE(); }).ensure(); + tqueue_binlog->set_binlog(std::move(binlog)); + binlog_->set_callback(std::move(tqueue_binlog)); + } + + void restart(td::Random::Xorshift128plus &rnd, td::int32 now) { + if (rnd.fast(0, 10) == 0) { + baseline_->run_gc(now); + } + + memory_->extract_callback().release(); + auto memory_storage = td::unique_ptr<td::TQueueMemoryStorage>(memory_storage_); + memory_ = td::TQueue::create(); + memory_storage->replay(*memory_); + memory_->set_callback(std::move(memory_storage)); + if (rnd.fast(0, 10) == 0) { + memory_->run_gc(now); + } + + if (rnd.fast(0, 30) != 0) { + return; + } + + LOG(INFO) << "Restart binlog"; + binlog_ = td::TQueue::create(); + auto tqueue_binlog = td::make_unique<td::TQueueBinlog<td::Binlog>>(); + auto binlog = std::make_shared<td::Binlog>(); + binlog + ->init(binlog_path().str(), + [&](const td::BinlogEvent &event) { tqueue_binlog->replay(event, *binlog_).ignore(); }) + .ensure(); + tqueue_binlog->set_binlog(std::move(binlog)); + binlog_->set_callback(std::move(tqueue_binlog)); + if (rnd.fast(0, 2) == 0) { + binlog_->run_gc(now); + } + } + + EventId push(td::TQueue::QueueId queue_id, const td::string &data, td::int32 expires_at, EventId new_id = EventId()) { + auto a_id = baseline_->push(queue_id, data, expires_at, 0, new_id).move_as_ok(); + auto b_id = memory_->push(queue_id, data, expires_at, 0, new_id).move_as_ok(); + auto c_id = binlog_->push(queue_id, data, expires_at, 0, new_id).move_as_ok(); + ASSERT_EQ(a_id, b_id); + ASSERT_EQ(a_id, c_id); + return a_id; + } + + void check_head_tail(td::TQueue::QueueId qid) { + //ASSERT_EQ(baseline_->get_head(qid), memory_->get_head(qid)); + //ASSERT_EQ(baseline_->get_head(qid), binlog_->get_head(qid)); + ASSERT_EQ(baseline_->get_tail(qid), memory_->get_tail(qid)); + ASSERT_EQ(baseline_->get_tail(qid), binlog_->get_tail(qid)); + } + + void check_get(td::TQueue::QueueId qid, td::Random::Xorshift128plus &rnd, td::int32 now) { + td::TQueue::Event a[10]; + td::MutableSpan<td::TQueue::Event> a_span(a, 10); + td::TQueue::Event b[10]; + td::MutableSpan<td::TQueue::Event> b_span(b, 10); + td::TQueue::Event c[10]; + td::MutableSpan<td::TQueue::Event> c_span(c, 10); + + auto a_from = baseline_->get_head(qid); + //auto b_from = memory_->get_head(qid); + //auto c_from = binlog_->get_head(qid); + //ASSERT_EQ(a_from, b_from); + //ASSERT_EQ(a_from, c_from); + + auto tmp = a_from.advance(rnd.fast(-10, 10)); + if (tmp.is_ok()) { + a_from = tmp.move_as_ok(); + } + baseline_->get(qid, a_from, true, now, a_span).move_as_ok(); + memory_->get(qid, a_from, true, now, b_span).move_as_ok(); + binlog_->get(qid, a_from, true, now, c_span).move_as_ok(); + ASSERT_EQ(a_span.size(), b_span.size()); + ASSERT_EQ(a_span.size(), c_span.size()); + for (size_t i = 0; i < a_span.size(); i++) { + ASSERT_EQ(a_span[i].id, b_span[i].id); + ASSERT_EQ(a_span[i].id, c_span[i].id); + ASSERT_EQ(a_span[i].data, b_span[i].data); + ASSERT_EQ(a_span[i].data, c_span[i].data); + } + } + + private: + td::unique_ptr<td::TQueue> baseline_; + td::unique_ptr<td::TQueue> memory_; + td::unique_ptr<td::TQueue> binlog_; + td::TQueueMemoryStorage *memory_storage_{nullptr}; +}; + +TEST(TQueue, random) { + using EventId = td::TQueue::EventId; + td::Random::Xorshift128plus rnd(123); + auto next_queue_id = [&rnd] { + return rnd.fast(1, 10); + }; + auto next_first_id = [&rnd] { + if (rnd.fast(0, 3) == 0) { + return EventId::from_int32(EventId::MAX_ID - 20).move_as_ok(); + } + return EventId::from_int32(rnd.fast(1000000000, 1500000000)).move_as_ok(); + }; + + TestTQueue q; + td::int32 now = 1000; + auto push_event = [&] { + auto data = PSTRING() << rnd(); + if (rnd.fast(0, 10000) == 0) { + data = td::string(1 << 19, '\0'); + } + q.push(next_queue_id(), data, now + rnd.fast(-10, 10) * 10 + 5, next_first_id()); + }; + auto inc_now = [&] { + now += 10; + }; + auto check_head_tail = [&] { + q.check_head_tail(next_queue_id()); + }; + auto restart = [&] { + q.restart(rnd, now); + }; + auto get = [&] { + q.check_get(next_queue_id(), rnd, now); + }; + td::RandomSteps steps({{push_event, 100}, {check_head_tail, 10}, {get, 40}, {inc_now, 5}, {restart, 1}}); + for (int i = 0; i < 100000; i++) { + steps.step(rnd); + } +} + +TEST(TQueue, memory_leak) { + return; + auto tqueue = td::TQueue::create(); + auto tqueue_binlog = td::make_unique<td::TQueueBinlog<td::Binlog>>(); + std::string binlog_path = "test_tqueue.binlog"; + td::Binlog::destroy(binlog_path).ensure(); + auto binlog = std::make_shared<td::Binlog>(); + binlog->init(binlog_path, [&](const td::BinlogEvent &event) { UNREACHABLE(); }).ensure(); + tqueue_binlog->set_binlog(std::move(binlog)); + tqueue->set_callback(std::move(tqueue_binlog)); + + td::int32 now = 0; + std::vector<td::TQueue::EventId> ids; + td::Random::Xorshift128plus rnd(123); + int i = 0; + while (true) { + auto id = tqueue->push(1, "a", now + 600000, 0, {}).move_as_ok(); + ids.push_back(id); + if (ids.size() > static_cast<std::size_t>(rnd()) % 100000) { + auto it = static_cast<std::size_t>(rnd()) % ids.size(); + std::swap(ids.back(), ids[it]); + tqueue->forget(1, ids.back()); + ids.pop_back(); + } + now++; + if (i++ % 100000 == 0) { + LOG(ERROR) << td::BufferAllocator::get_buffer_mem() << " " << tqueue->get_size(1) << " " + << td::BufferAllocator::get_buffer_slice_size(); + } + } +} + +TEST(TQueue, clear) { + auto tqueue = td::TQueue::create(); + + auto start_time = td::Time::now(); + td::int32 now = 0; + td::vector<td::TQueue::EventId> ids; + td::Random::Xorshift128plus rnd(123); + for (size_t i = 0; i < 1000000; i++) { + tqueue->push(1, td::string(td::Random::fast(100, 500), 'a'), now + 600000, 0, {}).ensure(); + } + auto tail_id = tqueue->get_tail(1); + auto clear_start_time = td::Time::now(); + size_t keep_count = td::Random::fast(0, 2); + tqueue->clear(1, keep_count); + auto finish_time = td::Time::now(); + LOG(INFO) << "Added TQueue events in " << clear_start_time - start_time << " seconds and cleared them in " + << finish_time - clear_start_time << " seconds"; + CHECK(tqueue->get_size(1) == keep_count); + CHECK(tqueue->get_head(1).advance(keep_count).ok() == tail_id); + CHECK(tqueue->get_tail(1) == tail_id); +} |