diff options
author | aunsane <aunsane@gmail.com> | 2018-04-27 21:33:17 +0300 |
---|---|---|
committer | aunsane <aunsane@gmail.com> | 2018-04-27 21:33:17 +0300 |
commit | e1ec72eab6d00b3ba38e5932bc88920f103b6e4a (patch) | |
tree | 999de2725a83e30fbbf6576200525d4ef0c5fe38 /protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp | |
parent | b9ce1d4d98525490ca1a38e2d9fd4f3369adb3e0 (diff) |
Telegram: initial commit
- tdlib moved to telegram dir
Diffstat (limited to 'protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp')
-rw-r--r-- | protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp | 535 |
1 files changed, 535 insertions, 0 deletions
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp new file mode 100644 index 0000000000..9185fe8858 --- /dev/null +++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp @@ -0,0 +1,535 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/actor/impl2/ActorLocker.h" +#include "td/actor/impl2/Scheduler.h" + +#include "td/utils/format.h" +#include "td/utils/logging.h" +#include "td/utils/port/thread.h" +#include "td/utils/Slice.h" +#include "td/utils/StringBuilder.h" +#include "td/utils/tests.h" +#include "td/utils/Time.h" + +#include <array> +#include <atomic> +#include <deque> +#include <memory> + +using td::actor2::ActorLocker; +using td::actor2::ActorSignals; +using td::actor2::ActorState; +using td::actor2::SchedulerId; + +TEST(Actor2, signals) { + ActorSignals signals; + signals.add_signal(ActorSignals::Wakeup); + signals.add_signal(ActorSignals::Cpu); + signals.add_signal(ActorSignals::Kill); + signals.clear_signal(ActorSignals::Cpu); + + bool was_kill = false; + bool was_wakeup = false; + while (!signals.empty()) { + auto s = signals.first_signal(); + if (s == ActorSignals::Kill) { + was_kill = true; + } else if (s == ActorSignals::Wakeup) { + was_wakeup = true; + } else { + UNREACHABLE(); + } + signals.clear_signal(s); + } + CHECK(was_kill && was_wakeup); +} + +TEST(Actors2, flags) { + ActorState::Flags flags; + CHECK(!flags.is_locked()); + flags.set_locked(true); + CHECK(flags.is_locked()); + flags.set_locked(false); + CHECK(!flags.is_locked()); + flags.set_pause(true); + + flags.set_scheduler_id(SchedulerId{123}); + + auto signals = flags.get_signals(); + CHECK(signals.empty()); + signals.add_signal(ActorSignals::Cpu); + signals.add_signal(ActorSignals::Kill); + CHECK(signals.has_signal(ActorSignals::Cpu)); + CHECK(signals.has_signal(ActorSignals::Kill)); + flags.set_signals(signals); + CHECK(flags.get_signals().raw() == signals.raw()) << flags.get_signals().raw() << " " << signals.raw(); + + auto wakeup = ActorSignals{}; + wakeup.add_signal(ActorSignals::Wakeup); + + flags.add_signals(wakeup); + signals.add_signal(ActorSignals::Wakeup); + CHECK(flags.get_signals().raw() == signals.raw()); + + flags.clear_signals(); + CHECK(flags.get_signals().empty()); + + CHECK(flags.get_scheduler_id().value() == 123); + CHECK(flags.is_pause()); +} + +TEST(Actor2, locker) { + ActorState state; + + ActorSignals kill_signal; + kill_signal.add_signal(ActorSignals::Kill); + + ActorSignals wakeup_signal; + kill_signal.add_signal(ActorSignals::Wakeup); + + ActorSignals cpu_signal; + kill_signal.add_signal(ActorSignals::Cpu); + + { + ActorLocker lockerA(&state); + ActorLocker lockerB(&state); + ActorLocker lockerC(&state); + + CHECK(lockerA.try_lock()); + CHECK(lockerA.own_lock()); + auto flagsA = lockerA.flags(); + CHECK(lockerA.try_unlock(flagsA)); + CHECK(!lockerA.own_lock()); + + CHECK(lockerA.try_lock()); + CHECK(!lockerB.try_lock()); + CHECK(!lockerC.try_lock()); + + CHECK(lockerB.try_add_signals(kill_signal)); + CHECK(!lockerC.try_add_signals(wakeup_signal)); + CHECK(lockerC.try_add_signals(wakeup_signal)); + CHECK(!lockerC.add_signals(cpu_signal)); + CHECK(!lockerA.flags().has_signals()); + CHECK(!lockerA.try_unlock(lockerA.flags())); + { + auto flags = lockerA.flags(); + auto signals = flags.get_signals(); + bool was_kill = false; + bool was_wakeup = false; + bool was_cpu = false; + while (!signals.empty()) { + auto s = signals.first_signal(); + if (s == ActorSignals::Kill) { + was_kill = true; + } else if (s == ActorSignals::Wakeup) { + was_wakeup = true; + } else if (s == ActorSignals::Cpu) { + was_cpu = true; + } else { + UNREACHABLE(); + } + signals.clear_signal(s); + } + CHECK(was_kill && was_wakeup && was_cpu); + flags.clear_signals(); + CHECK(lockerA.try_unlock(flags)); + } + } + + { + ActorLocker lockerB(&state); + CHECK(lockerB.try_lock()); + CHECK(lockerB.try_unlock(lockerB.flags())); + CHECK(lockerB.add_signals(kill_signal)); + CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill)); + auto flags = lockerB.flags(); + flags.clear_signals(); + ActorLocker lockerA(&state); + CHECK(!lockerA.add_signals(kill_signal)); + CHECK(!lockerB.try_unlock(flags)); + CHECK(!lockerA.add_signals(kill_signal)); // do not loose this signal! + CHECK(!lockerB.try_unlock(flags)); + CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill)); + CHECK(lockerB.try_unlock(flags)); + } + + { + ActorLocker lockerA(&state); + CHECK(lockerA.try_lock()); + auto flags = lockerA.flags(); + flags.set_pause(true); + CHECK(lockerA.try_unlock(flags)); + //We have to lock, though we can't execute. + CHECK(lockerA.add_signals(wakeup_signal)); + } +} + +#if !TD_THREAD_UNSUPPORTED +TEST(Actor2, locker_stress) { + ActorState state; + + constexpr size_t threads_n = 5; + auto stage = [&](std::atomic<int> &value, int need) { + value.fetch_add(1, std::memory_order_release); + while (value.load(std::memory_order_acquire) < need) { + td::this_thread::yield(); + } + }; + + struct Node { + std::atomic<td::uint32> request{0}; + td::uint32 response = 0; + char pad[64]; + }; + std::array<Node, threads_n> nodes; + auto do_work = [&]() { + for (auto &node : nodes) { + auto query = node.request.load(std::memory_order_acquire); + if (query) { + node.response = query * query; + node.request.store(0, std::memory_order_relaxed); + } + } + }; + + std::atomic<int> begin{0}; + std::atomic<int> ready{0}; + std::atomic<int> check{0}; + std::atomic<int> finish{0}; + std::vector<td::thread> threads; + for (size_t i = 0; i < threads_n; i++) { + threads.push_back(td::thread([&, id = i] { + for (size_t i = 1; i < 1000000; i++) { + ActorLocker locker(&state); + auto need = static_cast<int>(threads_n * i); + auto query = static_cast<td::uint32>(id + need); + stage(begin, need); + nodes[id].request = 0; + nodes[id].response = 0; + stage(ready, need); + if (locker.try_lock()) { + nodes[id].response = query * query; + } else { + auto cpu = ActorSignals::one(ActorSignals::Cpu); + nodes[id].request.store(query, std::memory_order_release); + locker.add_signals(cpu); + } + while (locker.own_lock()) { + auto flags = locker.flags(); + auto signals = flags.get_signals(); + if (!signals.empty()) { + do_work(); + } + flags.clear_signals(); + locker.try_unlock(flags); + } + + stage(check, need); + if (id == 0) { + CHECK(locker.add_signals(ActorSignals{})); + CHECK(!locker.flags().has_signals()); + CHECK(locker.try_unlock(locker.flags())); + for (size_t thread_id = 0; thread_id < threads_n; thread_id++) { + CHECK(nodes[thread_id].response == + static_cast<td::uint32>(thread_id + need) * static_cast<td::uint32>(thread_id + need)) + << td::tag("thread", thread_id) << " " << nodes[thread_id].response << " " + << nodes[thread_id].request.load(); + } + } + } + })); + } + for (auto &thread : threads) { + thread.join(); + } +} + +namespace { +const size_t BUF_SIZE = 1024 * 1024; +char buf[BUF_SIZE]; +td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1)); +} // namespace + +TEST(Actor2, executor_simple) { + using namespace td; + using namespace td::actor2; + struct Dispatcher : public SchedulerDispatcher { + void add_to_queue(ActorInfoPtr ptr, SchedulerId scheduler_id, bool need_poll) override { + queue.push_back(std::move(ptr)); + } + void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) override { + UNREACHABLE(); + } + SchedulerId get_scheduler_id() const override { + return SchedulerId{0}; + } + std::deque<ActorInfoPtr> queue; + }; + Dispatcher dispatcher; + + class TestActor : public Actor { + public: + void close() { + stop(); + } + + private: + void start_up() override { + sb << "StartUp"; + } + void tear_down() override { + sb << "TearDown"; + } + }; + ActorInfoCreator actor_info_creator; + auto actor = actor_info_creator.create( + std::make_unique<TestActor>(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor")); + dispatcher.add_to_queue(actor, SchedulerId{0}, false); + + { + ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); + CHECK(executor.can_send()); + CHECK(executor.can_send_immediate()); + CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice(); + sb.clear(); + executor.send(ActorMessageCreator::lambda([&] { sb << "A"; })); + CHECK(sb.as_cslice() == "A") << sb.as_cslice(); + sb.clear(); + auto big_message = ActorMessageCreator::lambda([&] { sb << "big"; }); + big_message.set_big(); + executor.send(std::move(big_message)); + CHECK(sb.as_cslice() == "") << sb.as_cslice(); + executor.send(ActorMessageCreator::lambda([&] { sb << "A"; })); + CHECK(sb.as_cslice() == "") << sb.as_cslice(); + } + CHECK(dispatcher.queue.size() == 1); + { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue()); } + CHECK(dispatcher.queue.size() == 1); + dispatcher.queue.clear(); + CHECK(sb.as_cslice() == "bigA") << sb.as_cslice(); + sb.clear(); + { + ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); + executor.send( + ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); })); + } + CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice(); + sb.clear(); + CHECK(!actor->has_actor()); + { + ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); + executor.send( + ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); })); + } + CHECK(dispatcher.queue.empty()); + CHECK(sb.as_cslice() == ""); +} + +using namespace td::actor2; +using td::uint32; +static std::atomic<int> cnt; +class Worker : public Actor { + public: + void query(uint32 x, ActorInfoPtr master); + void close() { + stop(); + } +}; +class Master : public Actor { + public: + void on_result(uint32 x, uint32 y) { + loop(); + } + + private: + uint32 l = 0; + uint32 r = 100000; + ActorInfoPtr worker; + void start_up() override { + worker = detail::create_actor<Worker>(ActorOptions().with_name("Master")); + loop(); + } + void loop() override { + l++; + if (l == r) { + if (!--cnt) { + SchedulerContext::get()->stop(); + } + detail::send_closure(*worker, &Worker::close); + stop(); + return; + } + detail::send_lambda(*worker, + [x = l, self = get_actor_info_ptr()] { detail::current_actor<Worker>().query(x, self); }); + } +}; + +void Worker::query(uint32 x, ActorInfoPtr master) { + auto y = x; + for (int i = 0; i < 100; i++) { + y = y * y; + } + detail::send_lambda(*master, [result = y, x] { detail::current_actor<Master>().on_result(x, result); }); +} + +TEST(Actor2, scheduler_simple) { + auto group_info = std::make_shared<SchedulerGroupInfo>(1); + Scheduler scheduler{group_info, SchedulerId{0}, 2}; + scheduler.start(); + scheduler.run_in_context([] { + cnt = 10; + for (int i = 0; i < 10; i++) { + detail::create_actor<Master>(ActorOptions().with_name("Master")); + } + }); + while (scheduler.run(1000)) { + } + Scheduler::close_scheduler_group(*group_info); +} + +TEST(Actor2, actor_id_simple) { + auto group_info = std::make_shared<SchedulerGroupInfo>(1); + Scheduler scheduler{group_info, SchedulerId{0}, 2}; + sb.clear(); + scheduler.start(); + + scheduler.run_in_context([] { + class A : public Actor { + public: + A(int value) : value_(value) { + sb << "A" << value_; + } + void hello() { + sb << "hello"; + } + ~A() { + sb << "~A"; + if (--cnt <= 0) { + SchedulerContext::get()->stop(); + } + } + + private: + int value_; + }; + cnt = 1; + auto id = create_actor<A>("A", 123); + CHECK(sb.as_cslice() == "A123"); + sb.clear(); + send_closure(id, &A::hello); + }); + while (scheduler.run(1000)) { + } + CHECK(sb.as_cslice() == "hello~A"); + Scheduler::close_scheduler_group(*group_info); + sb.clear(); +} + +TEST(Actor2, actor_creation) { + auto group_info = std::make_shared<SchedulerGroupInfo>(1); + Scheduler scheduler{group_info, SchedulerId{0}, 1}; + scheduler.start(); + + scheduler.run_in_context([]() mutable { + class B; + class A : public Actor { + public: + void f() { + check(); + stop(); + } + + private: + void start_up() override { + check(); + create_actor<B>("Simple", actor_id(this)).release(); + } + + void check() { + auto &context = *SchedulerContext::get(); + CHECK(context.has_poll()); + context.get_poll(); + } + + void tear_down() override { + if (--cnt <= 0) { + SchedulerContext::get()->stop(); + } + } + }; + + class B : public Actor { + public: + B(ActorId<A> a) : a_(a) { + } + + private: + void start_up() override { + auto &context = *SchedulerContext::get(); + CHECK(!context.has_poll()); + send_closure(a_, &A::f); + stop(); + } + void tear_down() override { + if (--cnt <= 0) { + SchedulerContext::get()->stop(); + } + } + ActorId<A> a_; + }; + cnt = 2; + create_actor<A>(ActorOptions().with_name("Poll").with_poll()).release(); + }); + while (scheduler.run(1000)) { + } + scheduler.stop(); + Scheduler::close_scheduler_group(*group_info); +} + +TEST(Actor2, actor_timeout_simple) { + auto group_info = std::make_shared<SchedulerGroupInfo>(1); + Scheduler scheduler{group_info, SchedulerId{0}, 2}; + sb.clear(); + scheduler.start(); + + scheduler.run_in_context([] { + class A : public Actor { + public: + void start_up() override { + set_timeout(); + } + void alarm() override { + double diff = td::Time::now() - expected_timeout_; + CHECK(-0.001 < diff && diff < 0.1) << diff; + if (cnt_-- > 0) { + set_timeout(); + } else { + stop(); + } + } + + void tear_down() override { + SchedulerContext::get()->stop(); + } + + private: + double expected_timeout_; + int cnt_ = 5; + void set_timeout() { + auto wakeup_timestamp = td::Timestamp::in(0.1); + expected_timeout_ = wakeup_timestamp.at(); + alarm_timestamp() = wakeup_timestamp; + } + }; + create_actor<A>(ActorInfoCreator::Options().with_name("A").with_poll()).release(); + }); + while (scheduler.run(1000)) { + } + Scheduler::close_scheduler_group(*group_info); + sb.clear(); +} +#endif //!TD_THREAD_UNSUPPORTED |