diff options
author | George Hazan <ghazan@miranda.im> | 2022-11-30 17:48:47 +0300 |
---|---|---|
committer | George Hazan <ghazan@miranda.im> | 2022-11-30 17:48:47 +0300 |
commit | 0ece30dc7c0e34b4c5911969b8fa99c33c6d023c (patch) | |
tree | 671325d3fec09b999411e4e3ab84ef8259261818 /protocols/Telegram/tdlib/td/tdactor/test | |
parent | 46c53ffc6809c67e4607e99951a2846c382b63b2 (diff) |
Telegram: update for TDLIB
Diffstat (limited to 'protocols/Telegram/tdlib/td/tdactor/test')
5 files changed, 545 insertions, 920 deletions
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp index f4267f2818..0720f0ed6f 100644 --- a/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp @@ -1,38 +1,39 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#include "td/utils/tests.h" - -#include "td/actor/Timeout.h" +#include "td/actor/actor.h" +#include "td/actor/ConcurrentScheduler.h" +#include "td/actor/MultiTimeout.h" -using namespace td; +#include "td/utils/common.h" +#include "td/utils/logging.h" +#include "td/utils/Random.h" +#include "td/utils/tests.h" TEST(MultiTimeout, bug) { - ConcurrentScheduler sched; - int threads_n = 0; - sched.init(threads_n); + td::ConcurrentScheduler sched(0, 0); sched.start(); - std::unique_ptr<MultiTimeout> multi_timeout; + td::unique_ptr<td::MultiTimeout> multi_timeout; struct Data { - MultiTimeout *multi_timeout; + td::MultiTimeout *multi_timeout; }; Data data; { - auto guard = sched.get_current_guard(); - multi_timeout = std::make_unique<MultiTimeout>(); + auto guard = sched.get_main_guard(); + multi_timeout = td::make_unique<td::MultiTimeout>("MultiTimeout"); data.multi_timeout = multi_timeout.get(); - multi_timeout->set_callback([](void *void_data, int64 key) { + multi_timeout->set_callback([](void *void_data, td::int64 key) { auto &data = *static_cast<Data *>(void_data); if (key == 1) { data.multi_timeout->cancel_timeout(key + 1); data.multi_timeout->set_timeout_in(key + 2, 1); } else { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } }); multi_timeout->set_callback_data(&data); @@ -45,3 +46,67 @@ TEST(MultiTimeout, bug) { } sched.finish(); } + +class TimeoutManager final : public td::Actor { + static td::int32 count; + + public: + TimeoutManager() { + count++; + + test_timeout_.set_callback(on_test_timeout_callback); + test_timeout_.set_callback_data(static_cast<void *>(this)); + } + TimeoutManager(const TimeoutManager &) = delete; + TimeoutManager &operator=(const TimeoutManager &) = delete; + TimeoutManager(TimeoutManager &&) = delete; + TimeoutManager &operator=(TimeoutManager &&) = delete; + ~TimeoutManager() final { + count--; + LOG(INFO) << "Destroy TimeoutManager"; + } + + static void on_test_timeout_callback(void *timeout_manager_ptr, td::int64 id) { + CHECK(count >= 0); + if (count == 0) { + LOG(ERROR) << "Receive timeout after manager was closed"; + return; + } + + auto manager = static_cast<TimeoutManager *>(timeout_manager_ptr); + send_closure_later(manager->actor_id(manager), &TimeoutManager::test_timeout); + } + + void test_timeout() { + CHECK(count > 0); + // we must yield scheduler, so run_main breaks immediately, if timeouts are handled immediately + td::Scheduler::instance()->yield(); + } + + td::MultiTimeout test_timeout_{"TestTimeout"}; +}; + +td::int32 TimeoutManager::count; + +TEST(MultiTimeout, Destroy) { + td::ConcurrentScheduler sched(0, 0); + + auto timeout_manager = sched.create_actor_unsafe<TimeoutManager>(0, "TimeoutManager"); + TimeoutManager *manager = timeout_manager.get().get_actor_unsafe(); + sched.start(); + int cnt = 100; + while (sched.run_main(cnt == 100 || cnt <= 0 ? 0.001 : 10)) { + auto guard = sched.get_main_guard(); + cnt--; + if (cnt > 0) { + for (int i = 0; i < 2; i++) { + manager->test_timeout_.set_timeout_in(td::Random::fast(0, 1000000000), td::Random::fast(2, 5) / 1000.0); + } + } else if (cnt == 0) { + timeout_manager.reset(); + } else if (cnt == -10) { + td::Scheduler::instance()->finish(); + } + } + sched.finish(); +} diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp deleted file mode 100644 index 9185fe8858..0000000000 --- a/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp +++ /dev/null @@ -1,535 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#include "td/actor/impl2/ActorLocker.h" -#include "td/actor/impl2/Scheduler.h" - -#include "td/utils/format.h" -#include "td/utils/logging.h" -#include "td/utils/port/thread.h" -#include "td/utils/Slice.h" -#include "td/utils/StringBuilder.h" -#include "td/utils/tests.h" -#include "td/utils/Time.h" - -#include <array> -#include <atomic> -#include <deque> -#include <memory> - -using td::actor2::ActorLocker; -using td::actor2::ActorSignals; -using td::actor2::ActorState; -using td::actor2::SchedulerId; - -TEST(Actor2, signals) { - ActorSignals signals; - signals.add_signal(ActorSignals::Wakeup); - signals.add_signal(ActorSignals::Cpu); - signals.add_signal(ActorSignals::Kill); - signals.clear_signal(ActorSignals::Cpu); - - bool was_kill = false; - bool was_wakeup = false; - while (!signals.empty()) { - auto s = signals.first_signal(); - if (s == ActorSignals::Kill) { - was_kill = true; - } else if (s == ActorSignals::Wakeup) { - was_wakeup = true; - } else { - UNREACHABLE(); - } - signals.clear_signal(s); - } - CHECK(was_kill && was_wakeup); -} - -TEST(Actors2, flags) { - ActorState::Flags flags; - CHECK(!flags.is_locked()); - flags.set_locked(true); - CHECK(flags.is_locked()); - flags.set_locked(false); - CHECK(!flags.is_locked()); - flags.set_pause(true); - - flags.set_scheduler_id(SchedulerId{123}); - - auto signals = flags.get_signals(); - CHECK(signals.empty()); - signals.add_signal(ActorSignals::Cpu); - signals.add_signal(ActorSignals::Kill); - CHECK(signals.has_signal(ActorSignals::Cpu)); - CHECK(signals.has_signal(ActorSignals::Kill)); - flags.set_signals(signals); - CHECK(flags.get_signals().raw() == signals.raw()) << flags.get_signals().raw() << " " << signals.raw(); - - auto wakeup = ActorSignals{}; - wakeup.add_signal(ActorSignals::Wakeup); - - flags.add_signals(wakeup); - signals.add_signal(ActorSignals::Wakeup); - CHECK(flags.get_signals().raw() == signals.raw()); - - flags.clear_signals(); - CHECK(flags.get_signals().empty()); - - CHECK(flags.get_scheduler_id().value() == 123); - CHECK(flags.is_pause()); -} - -TEST(Actor2, locker) { - ActorState state; - - ActorSignals kill_signal; - kill_signal.add_signal(ActorSignals::Kill); - - ActorSignals wakeup_signal; - kill_signal.add_signal(ActorSignals::Wakeup); - - ActorSignals cpu_signal; - kill_signal.add_signal(ActorSignals::Cpu); - - { - ActorLocker lockerA(&state); - ActorLocker lockerB(&state); - ActorLocker lockerC(&state); - - CHECK(lockerA.try_lock()); - CHECK(lockerA.own_lock()); - auto flagsA = lockerA.flags(); - CHECK(lockerA.try_unlock(flagsA)); - CHECK(!lockerA.own_lock()); - - CHECK(lockerA.try_lock()); - CHECK(!lockerB.try_lock()); - CHECK(!lockerC.try_lock()); - - CHECK(lockerB.try_add_signals(kill_signal)); - CHECK(!lockerC.try_add_signals(wakeup_signal)); - CHECK(lockerC.try_add_signals(wakeup_signal)); - CHECK(!lockerC.add_signals(cpu_signal)); - CHECK(!lockerA.flags().has_signals()); - CHECK(!lockerA.try_unlock(lockerA.flags())); - { - auto flags = lockerA.flags(); - auto signals = flags.get_signals(); - bool was_kill = false; - bool was_wakeup = false; - bool was_cpu = false; - while (!signals.empty()) { - auto s = signals.first_signal(); - if (s == ActorSignals::Kill) { - was_kill = true; - } else if (s == ActorSignals::Wakeup) { - was_wakeup = true; - } else if (s == ActorSignals::Cpu) { - was_cpu = true; - } else { - UNREACHABLE(); - } - signals.clear_signal(s); - } - CHECK(was_kill && was_wakeup && was_cpu); - flags.clear_signals(); - CHECK(lockerA.try_unlock(flags)); - } - } - - { - ActorLocker lockerB(&state); - CHECK(lockerB.try_lock()); - CHECK(lockerB.try_unlock(lockerB.flags())); - CHECK(lockerB.add_signals(kill_signal)); - CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill)); - auto flags = lockerB.flags(); - flags.clear_signals(); - ActorLocker lockerA(&state); - CHECK(!lockerA.add_signals(kill_signal)); - CHECK(!lockerB.try_unlock(flags)); - CHECK(!lockerA.add_signals(kill_signal)); // do not loose this signal! - CHECK(!lockerB.try_unlock(flags)); - CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill)); - CHECK(lockerB.try_unlock(flags)); - } - - { - ActorLocker lockerA(&state); - CHECK(lockerA.try_lock()); - auto flags = lockerA.flags(); - flags.set_pause(true); - CHECK(lockerA.try_unlock(flags)); - //We have to lock, though we can't execute. - CHECK(lockerA.add_signals(wakeup_signal)); - } -} - -#if !TD_THREAD_UNSUPPORTED -TEST(Actor2, locker_stress) { - ActorState state; - - constexpr size_t threads_n = 5; - auto stage = [&](std::atomic<int> &value, int need) { - value.fetch_add(1, std::memory_order_release); - while (value.load(std::memory_order_acquire) < need) { - td::this_thread::yield(); - } - }; - - struct Node { - std::atomic<td::uint32> request{0}; - td::uint32 response = 0; - char pad[64]; - }; - std::array<Node, threads_n> nodes; - auto do_work = [&]() { - for (auto &node : nodes) { - auto query = node.request.load(std::memory_order_acquire); - if (query) { - node.response = query * query; - node.request.store(0, std::memory_order_relaxed); - } - } - }; - - std::atomic<int> begin{0}; - std::atomic<int> ready{0}; - std::atomic<int> check{0}; - std::atomic<int> finish{0}; - std::vector<td::thread> threads; - for (size_t i = 0; i < threads_n; i++) { - threads.push_back(td::thread([&, id = i] { - for (size_t i = 1; i < 1000000; i++) { - ActorLocker locker(&state); - auto need = static_cast<int>(threads_n * i); - auto query = static_cast<td::uint32>(id + need); - stage(begin, need); - nodes[id].request = 0; - nodes[id].response = 0; - stage(ready, need); - if (locker.try_lock()) { - nodes[id].response = query * query; - } else { - auto cpu = ActorSignals::one(ActorSignals::Cpu); - nodes[id].request.store(query, std::memory_order_release); - locker.add_signals(cpu); - } - while (locker.own_lock()) { - auto flags = locker.flags(); - auto signals = flags.get_signals(); - if (!signals.empty()) { - do_work(); - } - flags.clear_signals(); - locker.try_unlock(flags); - } - - stage(check, need); - if (id == 0) { - CHECK(locker.add_signals(ActorSignals{})); - CHECK(!locker.flags().has_signals()); - CHECK(locker.try_unlock(locker.flags())); - for (size_t thread_id = 0; thread_id < threads_n; thread_id++) { - CHECK(nodes[thread_id].response == - static_cast<td::uint32>(thread_id + need) * static_cast<td::uint32>(thread_id + need)) - << td::tag("thread", thread_id) << " " << nodes[thread_id].response << " " - << nodes[thread_id].request.load(); - } - } - } - })); - } - for (auto &thread : threads) { - thread.join(); - } -} - -namespace { -const size_t BUF_SIZE = 1024 * 1024; -char buf[BUF_SIZE]; -td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1)); -} // namespace - -TEST(Actor2, executor_simple) { - using namespace td; - using namespace td::actor2; - struct Dispatcher : public SchedulerDispatcher { - void add_to_queue(ActorInfoPtr ptr, SchedulerId scheduler_id, bool need_poll) override { - queue.push_back(std::move(ptr)); - } - void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) override { - UNREACHABLE(); - } - SchedulerId get_scheduler_id() const override { - return SchedulerId{0}; - } - std::deque<ActorInfoPtr> queue; - }; - Dispatcher dispatcher; - - class TestActor : public Actor { - public: - void close() { - stop(); - } - - private: - void start_up() override { - sb << "StartUp"; - } - void tear_down() override { - sb << "TearDown"; - } - }; - ActorInfoCreator actor_info_creator; - auto actor = actor_info_creator.create( - std::make_unique<TestActor>(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor")); - dispatcher.add_to_queue(actor, SchedulerId{0}, false); - - { - ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); - CHECK(executor.can_send()); - CHECK(executor.can_send_immediate()); - CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice(); - sb.clear(); - executor.send(ActorMessageCreator::lambda([&] { sb << "A"; })); - CHECK(sb.as_cslice() == "A") << sb.as_cslice(); - sb.clear(); - auto big_message = ActorMessageCreator::lambda([&] { sb << "big"; }); - big_message.set_big(); - executor.send(std::move(big_message)); - CHECK(sb.as_cslice() == "") << sb.as_cslice(); - executor.send(ActorMessageCreator::lambda([&] { sb << "A"; })); - CHECK(sb.as_cslice() == "") << sb.as_cslice(); - } - CHECK(dispatcher.queue.size() == 1); - { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue()); } - CHECK(dispatcher.queue.size() == 1); - dispatcher.queue.clear(); - CHECK(sb.as_cslice() == "bigA") << sb.as_cslice(); - sb.clear(); - { - ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); - executor.send( - ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); })); - } - CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice(); - sb.clear(); - CHECK(!actor->has_actor()); - { - ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); - executor.send( - ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); })); - } - CHECK(dispatcher.queue.empty()); - CHECK(sb.as_cslice() == ""); -} - -using namespace td::actor2; -using td::uint32; -static std::atomic<int> cnt; -class Worker : public Actor { - public: - void query(uint32 x, ActorInfoPtr master); - void close() { - stop(); - } -}; -class Master : public Actor { - public: - void on_result(uint32 x, uint32 y) { - loop(); - } - - private: - uint32 l = 0; - uint32 r = 100000; - ActorInfoPtr worker; - void start_up() override { - worker = detail::create_actor<Worker>(ActorOptions().with_name("Master")); - loop(); - } - void loop() override { - l++; - if (l == r) { - if (!--cnt) { - SchedulerContext::get()->stop(); - } - detail::send_closure(*worker, &Worker::close); - stop(); - return; - } - detail::send_lambda(*worker, - [x = l, self = get_actor_info_ptr()] { detail::current_actor<Worker>().query(x, self); }); - } -}; - -void Worker::query(uint32 x, ActorInfoPtr master) { - auto y = x; - for (int i = 0; i < 100; i++) { - y = y * y; - } - detail::send_lambda(*master, [result = y, x] { detail::current_actor<Master>().on_result(x, result); }); -} - -TEST(Actor2, scheduler_simple) { - auto group_info = std::make_shared<SchedulerGroupInfo>(1); - Scheduler scheduler{group_info, SchedulerId{0}, 2}; - scheduler.start(); - scheduler.run_in_context([] { - cnt = 10; - for (int i = 0; i < 10; i++) { - detail::create_actor<Master>(ActorOptions().with_name("Master")); - } - }); - while (scheduler.run(1000)) { - } - Scheduler::close_scheduler_group(*group_info); -} - -TEST(Actor2, actor_id_simple) { - auto group_info = std::make_shared<SchedulerGroupInfo>(1); - Scheduler scheduler{group_info, SchedulerId{0}, 2}; - sb.clear(); - scheduler.start(); - - scheduler.run_in_context([] { - class A : public Actor { - public: - A(int value) : value_(value) { - sb << "A" << value_; - } - void hello() { - sb << "hello"; - } - ~A() { - sb << "~A"; - if (--cnt <= 0) { - SchedulerContext::get()->stop(); - } - } - - private: - int value_; - }; - cnt = 1; - auto id = create_actor<A>("A", 123); - CHECK(sb.as_cslice() == "A123"); - sb.clear(); - send_closure(id, &A::hello); - }); - while (scheduler.run(1000)) { - } - CHECK(sb.as_cslice() == "hello~A"); - Scheduler::close_scheduler_group(*group_info); - sb.clear(); -} - -TEST(Actor2, actor_creation) { - auto group_info = std::make_shared<SchedulerGroupInfo>(1); - Scheduler scheduler{group_info, SchedulerId{0}, 1}; - scheduler.start(); - - scheduler.run_in_context([]() mutable { - class B; - class A : public Actor { - public: - void f() { - check(); - stop(); - } - - private: - void start_up() override { - check(); - create_actor<B>("Simple", actor_id(this)).release(); - } - - void check() { - auto &context = *SchedulerContext::get(); - CHECK(context.has_poll()); - context.get_poll(); - } - - void tear_down() override { - if (--cnt <= 0) { - SchedulerContext::get()->stop(); - } - } - }; - - class B : public Actor { - public: - B(ActorId<A> a) : a_(a) { - } - - private: - void start_up() override { - auto &context = *SchedulerContext::get(); - CHECK(!context.has_poll()); - send_closure(a_, &A::f); - stop(); - } - void tear_down() override { - if (--cnt <= 0) { - SchedulerContext::get()->stop(); - } - } - ActorId<A> a_; - }; - cnt = 2; - create_actor<A>(ActorOptions().with_name("Poll").with_poll()).release(); - }); - while (scheduler.run(1000)) { - } - scheduler.stop(); - Scheduler::close_scheduler_group(*group_info); -} - -TEST(Actor2, actor_timeout_simple) { - auto group_info = std::make_shared<SchedulerGroupInfo>(1); - Scheduler scheduler{group_info, SchedulerId{0}, 2}; - sb.clear(); - scheduler.start(); - - scheduler.run_in_context([] { - class A : public Actor { - public: - void start_up() override { - set_timeout(); - } - void alarm() override { - double diff = td::Time::now() - expected_timeout_; - CHECK(-0.001 < diff && diff < 0.1) << diff; - if (cnt_-- > 0) { - set_timeout(); - } else { - stop(); - } - } - - void tear_down() override { - SchedulerContext::get()->stop(); - } - - private: - double expected_timeout_; - int cnt_ = 5; - void set_timeout() { - auto wakeup_timestamp = td::Timestamp::in(0.1); - expected_timeout_ = wakeup_timestamp.at(); - alarm_timestamp() = wakeup_timestamp; - } - }; - create_actor<A>(ActorInfoCreator::Options().with_name("A").with_poll()).release(); - }); - while (scheduler.run(1000)) { - } - Scheduler::close_scheduler_group(*group_info); - sb.clear(); -} -#endif //!TD_THREAD_UNSUPPORTED diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp index ffceacc595..628b74a94c 100644 --- a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp @@ -1,35 +1,32 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#include "td/utils/tests.h" - #include "td/actor/actor.h" +#include "td/actor/ConcurrentScheduler.h" #include "td/actor/PromiseFuture.h" +#include "td/utils/common.h" #include "td/utils/logging.h" #include "td/utils/Random.h" +#include "td/utils/ScopeGuard.h" +#include "td/utils/tests.h" #include <limits> #include <map> +#include <memory> #include <utility> -using namespace td; - -REGISTER_TESTS(actors_main); - -namespace { - template <class ContainerT> static typename ContainerT::value_type &rand_elem(ContainerT &cont) { CHECK(0 < cont.size() && cont.size() <= static_cast<size_t>(std::numeric_limits<int>::max())); - return cont[Random::fast(0, static_cast<int>(cont.size()) - 1)]; + return cont[td::Random::fast(0, static_cast<int>(cont.size()) - 1)]; } -static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) { - uint32 res = 1; +static td::uint32 fast_pow_mod_uint32(td::uint32 x, td::uint32 p) { + td::uint32 res = 1; while (p) { if (p & 1) { res *= x; @@ -40,25 +37,25 @@ static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) { return res; } -static uint32 slow_pow_mod_uint32(uint32 x, uint32 p) { - uint32 res = 1; - for (uint32 i = 0; i < p; i++) { +static td::uint32 slow_pow_mod_uint32(td::uint32 x, td::uint32 p) { + td::uint32 res = 1; + for (td::uint32 i = 0; i < p; i++) { res *= x; } return res; } -struct Query { - uint32 query_id; - uint32 result; - std::vector<int> todo; - Query() = default; - Query(const Query &) = delete; - Query &operator=(const Query &) = delete; - Query(Query &&) = default; - Query &operator=(Query &&) = default; - ~Query() { - CHECK(todo.empty()) << "Query lost"; +struct ActorQuery { + td::uint32 query_id{}; + td::uint32 result{}; + td::vector<int> todo; + ActorQuery() = default; + ActorQuery(const ActorQuery &) = delete; + ActorQuery &operator=(const ActorQuery &) = delete; + ActorQuery(ActorQuery &&) = default; + ActorQuery &operator=(ActorQuery &&) = default; + ~ActorQuery() { + LOG_CHECK(todo.empty()) << "ActorQuery lost"; } int next_pow() { CHECK(!todo.empty()); @@ -71,25 +68,25 @@ struct Query { } }; -static uint32 fast_calc(Query &q) { - uint32 result = q.result; +static td::uint32 fast_calc(ActorQuery &q) { + td::uint32 result = q.result; for (auto x : q.todo) { result = fast_pow_mod_uint32(result, x); } return result; } -class Worker final : public Actor { +class Worker final : public td::Actor { public: explicit Worker(int threads_n) : threads_n_(threads_n) { } - void query(PromiseActor<uint32> &&promise, uint32 x, uint32 p) { - uint32 result = slow_pow_mod_uint32(x, p); + void query(td::PromiseActor<td::uint32> &&promise, td::uint32 x, td::uint32 p) { + td::uint32 result = slow_pow_mod_uint32(x, p); promise.set_value(std::move(result)); (void)threads_n_; - // if (threads_n_ > 1 && Random::fast(0, 9) == 0) { - // migrate(Random::fast(2, threads_n)); + // if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) { + // migrate(td::Random::fast(2, threads_n)); //} } @@ -97,7 +94,7 @@ class Worker final : public Actor { int threads_n_; }; -class QueryActor final : public Actor { +class QueryActor final : public td::Actor { public: class Callback { public: @@ -107,44 +104,46 @@ class QueryActor final : public Actor { Callback(Callback &&) = delete; Callback &operator=(Callback &&) = delete; virtual ~Callback() = default; - virtual void on_result(Query &&query) = 0; + virtual void on_result(ActorQuery &&query) = 0; virtual void on_closed() = 0; }; explicit QueryActor(int threads_n) : threads_n_(threads_n) { } - void set_callback(std::unique_ptr<Callback> callback) { + void set_callback(td::unique_ptr<Callback> callback) { callback_ = std::move(callback); } - void set_workers(std::vector<ActorId<Worker>> workers) { + void set_workers(td::vector<td::ActorId<Worker>> workers) { workers_ = std::move(workers); } - void query(Query &&query) { - uint32 x = query.result; - uint32 p = query.next_pow(); - if (Random::fast(0, 3) && (p <= 1000 || workers_.empty())) { + void query(ActorQuery &&query) { + td::uint32 x = query.result; + td::uint32 p = query.next_pow(); + if (td::Random::fast(0, 3) && (p <= 1000 || workers_.empty())) { query.result = slow_pow_mod_uint32(x, p); callback_->on_result(std::move(query)); } else { - auto future = send_promise(rand_elem(workers_), Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, x, p); + auto future = td::Random::fast(0, 3) == 0 + ? td::send_promise<td::ActorSendType::Immediate>(rand_elem(workers_), &Worker::query, x, p) + : td::send_promise<td::ActorSendType::Later>(rand_elem(workers_), &Worker::query, x, p); if (future.is_ready()) { query.result = future.move_as_ok(); callback_->on_result(std::move(query)); } else { - future.set_event(EventCreator::raw(actor_id(), query.query_id)); + future.set_event(td::EventCreator::raw(actor_id(), query.query_id)); auto query_id = query.query_id; - pending_.insert(std::make_pair(query_id, std::make_pair(std::move(future), std::move(query)))); + pending_.emplace(query_id, std::make_pair(std::move(future), std::move(query))); } } - if (threads_n_ > 1 && Random::fast(0, 9) == 0) { - migrate(Random::fast(2, threads_n_)); + if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) { + migrate(td::Random::fast(2, threads_n_)); } } - void raw_event(const Event::Raw &event) override { - uint32 id = event.u32; + void raw_event(const td::Event::Raw &event) final { + td::uint32 id = event.u32; auto it = pending_.find(id); auto future = std::move(it->second.first); auto query = std::move(it->second.second); @@ -159,44 +158,44 @@ class QueryActor final : public Actor { stop(); } - void on_start_migrate(int32 sched_id) override { + void on_start_migrate(td::int32 sched_id) final { for (auto &it : pending_) { start_migrate(it.second.first, sched_id); } } - void on_finish_migrate() override { + void on_finish_migrate() final { for (auto &it : pending_) { finish_migrate(it.second.first); } } private: - unique_ptr<Callback> callback_; - std::map<uint32, std::pair<FutureActor<uint32>, Query>> pending_; - std::vector<ActorId<Worker>> workers_; + td::unique_ptr<Callback> callback_; + std::map<td::uint32, std::pair<td::FutureActor<td::uint32>, ActorQuery>> pending_; + td::vector<td::ActorId<Worker>> workers_; int threads_n_; }; -class MainQueryActor final : public Actor { - class QueryActorCallback : public QueryActor::Callback { +class MainQueryActor final : public td::Actor { + class QueryActorCallback final : public QueryActor::Callback { public: - void on_result(Query &&query) override { + void on_result(ActorQuery &&query) final { if (query.ready()) { send_closure(parent_id_, &MainQueryActor::on_result, std::move(query)); } else { send_closure(next_solver_, &QueryActor::query, std::move(query)); } } - void on_closed() override { + void on_closed() final { send_closure(parent_id_, &MainQueryActor::on_closed); } - QueryActorCallback(ActorId<MainQueryActor> parent_id, ActorId<QueryActor> next_solver) + QueryActorCallback(td::ActorId<MainQueryActor> parent_id, td::ActorId<QueryActor> next_solver) : parent_id_(parent_id), next_solver_(next_solver) { } private: - ActorId<MainQueryActor> parent_id_; - ActorId<QueryActor> next_solver_; + td::ActorId<MainQueryActor> parent_id_; + td::ActorId<QueryActor> next_solver_; }; const int ACTORS_CNT = 10; @@ -206,39 +205,39 @@ class MainQueryActor final : public Actor { explicit MainQueryActor(int threads_n) : threads_n_(threads_n) { } - void start_up() override { + void start_up() final { actors_.resize(ACTORS_CNT); for (auto &actor : actors_) { - auto actor_ptr = make_unique<QueryActor>(threads_n_); - actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0) + auto actor_ptr = td::make_unique<QueryActor>(threads_n_); + actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0) .release(); } workers_.resize(WORKERS_CNT); for (auto &worker : workers_) { - auto actor_ptr = make_unique<Worker>(threads_n_); - worker = - register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release(); + auto actor_ptr = td::make_unique<Worker>(threads_n_); + worker = register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0) + .release(); } for (int i = 0; i < ACTORS_CNT; i++) { ref_cnt_++; send_closure(actors_[i], &QueryActor::set_callback, - make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT])); + td::make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT])); send_closure(actors_[i], &QueryActor::set_workers, workers_); } yield(); } - void on_result(Query &&query) { + void on_result(ActorQuery &&query) { CHECK(query.ready()); CHECK(query.result == expected_[query.query_id]); in_cnt_++; wakeup(); } - Query create_query() { - Query q; + ActorQuery create_query() { + ActorQuery q; q.query_id = (query_id_ += 2); q.result = q.query_id; q.todo = {1, 1, 1, 1, 1, 1, 1, 1, 10000}; @@ -249,14 +248,14 @@ class MainQueryActor final : public Actor { void on_closed() { ref_cnt_--; if (ref_cnt_ == 0) { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } } - void wakeup() override { - int cnt = 100000; + void wakeup() final { + int cnt = 10000; while (out_cnt_ < in_cnt_ + 100 && out_cnt_ < cnt) { - if (Random::fast(0, 1)) { + if (td::Random::fast_bool()) { send_closure(rand_elem(actors_), &QueryActor::query, create_query()); } else { send_closure_later(rand_elem(actors_), &QueryActor::query, create_query()); @@ -273,9 +272,9 @@ class MainQueryActor final : public Actor { } private: - std::map<uint32, uint32> expected_; - std::vector<ActorId<QueryActor>> actors_; - std::vector<ActorId<Worker>> workers_; + std::map<td::uint32, td::uint32> expected_; + td::vector<td::ActorId<QueryActor>> actors_; + td::vector<td::ActorId<Worker>> workers_; int out_cnt_ = 0; int in_cnt_ = 0; int query_id_ = 1; @@ -283,101 +282,104 @@ class MainQueryActor final : public Actor { int threads_n_; }; -class SimpleActor final : public Actor { +class SimpleActor final : public td::Actor { public: - explicit SimpleActor(int32 threads_n) : threads_n_(threads_n) { + explicit SimpleActor(td::int32 threads_n) : threads_n_(threads_n) { } - void start_up() override { - auto actor_ptr = make_unique<Worker>(threads_n_); + void start_up() final { + auto actor_ptr = td::make_unique<Worker>(threads_n_); worker_ = - register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release(); + register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0).release(); yield(); } - void wakeup() override { - if (q_ == 100000) { - Scheduler::instance()->finish(); + void wakeup() final { + if (q_ == 10000) { + td::Scheduler::instance()->finish(); stop(); return; } q_++; - p_ = Random::fast(0, 1) ? 1 : 10000; - auto future = send_promise(worker_, Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, q_, p_); + p_ = td::Random::fast_bool() ? 1 : 10000; + auto future = td::Random::fast(0, 3) == 0 + ? td::send_promise<td::ActorSendType::Immediate>(worker_, &Worker::query, q_, p_) + : td::send_promise<td::ActorSendType::Later>(worker_, &Worker::query, q_, p_); if (future.is_ready()) { auto result = future.move_as_ok(); CHECK(result == fast_pow_mod_uint32(q_, p_)); yield(); } else { - future.set_event(EventCreator::raw(actor_id(), nullptr)); + future.set_event(td::EventCreator::raw(actor_id(), nullptr)); future_ = std::move(future); } - // if (threads_n_ > 1 && Random::fast(0, 2) == 0) { - // migrate(Random::fast(1, threads_n)); + // if (threads_n_ > 1 && td::Random::fast(0, 2) == 0) { + // migrate(td::Random::fast(1, threads_n)); //} } - void raw_event(const Event::Raw &event) override { + void raw_event(const td::Event::Raw &event) final { auto result = future_.move_as_ok(); CHECK(result == fast_pow_mod_uint32(q_, p_)); yield(); } - void on_start_migrate(int32 sched_id) override { + void on_start_migrate(td::int32 sched_id) final { start_migrate(future_, sched_id); } - void on_finish_migrate() override { + void on_finish_migrate() final { finish_migrate(future_); } private: - int32 threads_n_; - ActorId<Worker> worker_; - FutureActor<uint32> future_; - uint32 q_ = 1; - uint32 p_; + td::int32 threads_n_; + td::ActorId<Worker> worker_; + td::FutureActor<td::uint32> future_; + td::uint32 q_ = 1; + td::uint32 p_ = 0; }; -} // namespace -class SendToDead : public Actor { +class SendToDead final : public td::Actor { public: - class Parent : public Actor { + class Parent final : public td::Actor { public: - explicit Parent(ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) { + explicit Parent(td::ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) { } - void start_up() override { - set_timeout_in(Random::fast_uint32() % 3 * 0.001); + void start_up() final { + set_timeout_in(td::Random::fast_uint32() % 3 * 0.001); if (ttl_ != 0) { - child_ = create_actor_on_scheduler<Parent>( - "Child", Random::fast_uint32() % Scheduler::instance()->sched_count(), actor_shared(), ttl_ - 1); + child_ = td::create_actor_on_scheduler<Parent>( + "Child", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), actor_shared(this), + ttl_ - 1); } } - void timeout_expired() override { + void timeout_expired() final { stop(); } private: - ActorOwn<Parent> child_; - ActorShared<> parent_; + td::ActorOwn<Parent> child_; + td::ActorShared<> parent_; int ttl_; }; - void start_up() override { + void start_up() final { for (int i = 0; i < 2000; i++) { - create_actor_on_scheduler<Parent>("Parent", Random::fast_uint32() % Scheduler::instance()->sched_count(), - create_reference(), 4) + td::create_actor_on_scheduler<Parent>( + "Parent", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), create_reference(), 4) .release(); } } - ActorShared<> create_reference() { + td::ActorShared<> create_reference() { ref_cnt_++; - return actor_shared(); + return actor_shared(this); } - void hangup_shared() override { + + void hangup_shared() final { ref_cnt_--; if (ref_cnt_ == 0) { ttl_--; if (ttl_ <= 0) { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); stop(); } else { start_up(); @@ -385,18 +387,17 @@ class SendToDead : public Actor { } } - uint32 ttl_{50}; - uint32 ref_cnt_{0}; + td::uint32 ttl_{50}; + td::uint32 ref_cnt_{0}; }; TEST(Actors, send_to_dead) { //TODO: fix CHECK(storage_count_.load() == 0) return; - ConcurrentScheduler sched; int threads_n = 5; - sched.init(threads_n); + td::ConcurrentScheduler sched(threads_n, 0); - sched.create_actor_unsafe<SendToDead>(0, "manager").release(); + sched.create_actor_unsafe<SendToDead>(0, "SendToDead").release(); sched.start(); while (sched.run_main(10)) { // empty @@ -405,11 +406,8 @@ TEST(Actors, send_to_dead) { } TEST(Actors, main_simple) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - - ConcurrentScheduler sched; int threads_n = 3; - sched.init(threads_n); + td::ConcurrentScheduler sched(threads_n, 0); sched.create_actor_unsafe<SimpleActor>(threads_n > 1 ? 1 : 0, "simple", threads_n).release(); sched.start(); @@ -420,13 +418,10 @@ TEST(Actors, main_simple) { } TEST(Actors, main) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - - ConcurrentScheduler sched; int threads_n = 9; - sched.init(threads_n); + td::ConcurrentScheduler sched(threads_n, 0); - sched.create_actor_unsafe<MainQueryActor>(threads_n > 1 ? 1 : 0, "manager", threads_n).release(); + sched.create_actor_unsafe<MainQueryActor>(threads_n > 1 ? 1 : 0, "MainQuery", threads_n).release(); sched.start(); while (sched.run_main(10)) { // empty @@ -434,27 +429,76 @@ TEST(Actors, main) { sched.finish(); } -class DoAfterStop : public Actor { +class DoAfterStop final : public td::Actor { public: - void loop() override { - ptr = std::make_unique<int>(10); + void loop() final { + ptr = td::make_unique<int>(10); stop(); CHECK(*ptr == 10); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } private: - std::unique_ptr<int> ptr; + td::unique_ptr<int> ptr; }; TEST(Actors, do_after_stop) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); + int threads_n = 0; + td::ConcurrentScheduler sched(threads_n, 0); - ConcurrentScheduler sched; + sched.create_actor_unsafe<DoAfterStop>(0, "DoAfterStop").release(); + sched.start(); + while (sched.run_main(10)) { + // empty + } + sched.finish(); +} + +class XContext final : public td::ActorContext { + public: + td::int32 get_id() const final { + return 123456789; + } + + void validate() { + CHECK(x == 1234); + } + ~XContext() final { + x = 0; + } + int x = 1234; +}; + +class WithXContext final : public td::Actor { + public: + void start_up() final { + auto old_context = set_context(std::make_shared<XContext>()); + } + void f(td::unique_ptr<td::Guard> guard) { + } + void close() { + stop(); + } +}; + +static void check_context() { + auto ptr = static_cast<XContext *>(td::Scheduler::context()); + CHECK(ptr != nullptr); + ptr->validate(); +} + +TEST(Actors, context_during_destruction) { int threads_n = 0; - sched.init(threads_n); + td::ConcurrentScheduler sched(threads_n, 0); - sched.create_actor_unsafe<DoAfterStop>(0, "manager").release(); + { + auto guard = sched.get_main_guard(); + auto with_context = td::create_actor<WithXContext>("WithXContext").release(); + send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); })); + send_closure_later(with_context, &WithXContext::close); + send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); })); + send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { td::Scheduler::instance()->finish(); })); + } sched.start(); while (sched.run_main(10)) { // empty diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp index c0a6c32b61..78d32d5437 100644 --- a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp @@ -1,57 +1,66 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#include "td/utils/tests.h" - #include "td/actor/actor.h" +#include "td/actor/ConcurrentScheduler.h" #include "td/actor/MultiPromise.h" #include "td/actor/PromiseFuture.h" #include "td/actor/SleepActor.h" -#include "td/actor/Timeout.h" +#include "td/utils/common.h" #include "td/utils/logging.h" +#include "td/utils/MpscPollableQueue.h" #include "td/utils/Observer.h" #include "td/utils/port/FileFd.h" +#include "td/utils/port/thread.h" +#include "td/utils/Promise.h" #include "td/utils/Slice.h" #include "td/utils/Status.h" #include "td/utils/StringBuilder.h" +#include "td/utils/tests.h" +#include "td/utils/Time.h" +#include <memory> #include <tuple> -REGISTER_TESTS(actors_simple) - -namespace { -using namespace td; - static const size_t BUF_SIZE = 1024 * 1024; static char buf[BUF_SIZE]; static char buf2[BUF_SIZE]; -static StringBuilder sb(MutableSlice(buf, BUF_SIZE - 1)); -static StringBuilder sb2(MutableSlice(buf2, BUF_SIZE - 1)); +static td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1)); +static td::StringBuilder sb2(td::MutableSlice(buf2, BUF_SIZE - 1)); + +static td::vector<std::shared_ptr<td::MpscPollableQueue<td::EventFull>>> create_queues() { +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + return {}; +#else + auto res = std::make_shared<td::MpscPollableQueue<td::EventFull>>(); + res->init(); + return {res}; +#endif +} TEST(Actors, SendLater) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); sb.clear(); - Scheduler scheduler; - scheduler.init(); + td::Scheduler scheduler; + scheduler.init(0, create_queues(), nullptr); auto guard = scheduler.get_guard(); - class Worker : public Actor { + class Worker final : public td::Actor { public: void f() { sb << "A"; } }; - auto id = create_actor<Worker>("Worker"); - scheduler.run_no_guard(0); - send_closure(id, &Worker::f); - send_closure_later(id, &Worker::f); - send_closure(id, &Worker::f); + auto id = td::create_actor<Worker>("Worker"); + scheduler.run_no_guard(td::Timestamp::in(1)); + td::send_closure(id, &Worker::f); + td::send_closure_later(id, &Worker::f); + td::send_closure(id, &Worker::f); ASSERT_STREQ("A", sb.as_cslice().c_str()); - scheduler.run_no_guard(0); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("AAA", sb.as_cslice().c_str()); } @@ -63,21 +72,21 @@ class X { X(const X &) { sb << "[cnstr_copy]"; } - X(X &&) { + X(X &&) noexcept { sb << "[cnstr_move]"; } X &operator=(const X &) { sb << "[set_copy]"; return *this; } - X &operator=(X &&) { + X &operator=(X &&) noexcept { sb << "[set_move]"; return *this; } ~X() = default; }; -class XReceiver final : public Actor { +class XReceiver final : public td::Actor { public: void by_const_ref(const X &) { sb << "[by_const_ref]"; @@ -91,13 +100,12 @@ class XReceiver final : public Actor { }; TEST(Actors, simple_pass_event_arguments) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - Scheduler scheduler; - scheduler.init(); + td::Scheduler scheduler; + scheduler.init(0, create_queues(), nullptr); auto guard = scheduler.get_guard(); - auto id = create_actor<XReceiver>("XR").release(); - scheduler.run_no_guard(0); + auto id = td::create_actor<XReceiver>("XR").release(); + scheduler.run_no_guard(td::Timestamp::in(1)); X x; @@ -112,47 +120,47 @@ TEST(Actors, simple_pass_event_arguments) { // Tmp-->ConstRef sb.clear(); - send_closure(id, &XReceiver::by_const_ref, X()); + td::send_closure(id, &XReceiver::by_const_ref, X()); ASSERT_STREQ("[cnstr_default][by_const_ref]", sb.as_cslice().c_str()); // Tmp-->ConstRef (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_const_ref, X()); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_const_ref, X()); + scheduler.run_no_guard(td::Timestamp::in(1)); // LOG(ERROR) << sb.as_cslice(); ASSERT_STREQ("[cnstr_default][cnstr_move][by_const_ref]", sb.as_cslice().c_str()); // Tmp-->LvalueRef sb.clear(); - send_closure(id, &XReceiver::by_lvalue_ref, X()); + td::send_closure(id, &XReceiver::by_lvalue_ref, X()); ASSERT_STREQ("[cnstr_default][by_lvalue_ref]", sb.as_cslice().c_str()); // Tmp-->LvalueRef (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_lvalue_ref, X()); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_lvalue_ref, X()); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("[cnstr_default][cnstr_move][by_lvalue_ref]", sb.as_cslice().c_str()); // Tmp-->Value sb.clear(); - send_closure(id, &XReceiver::by_value, X()); + td::send_closure(id, &XReceiver::by_value, X()); ASSERT_STREQ("[cnstr_default][cnstr_move][by_value]", sb.as_cslice().c_str()); // Tmp-->Value (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_value, X()); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_value, X()); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("[cnstr_default][cnstr_move][cnstr_move][by_value]", sb.as_cslice().c_str()); // Var-->ConstRef sb.clear(); - send_closure(id, &XReceiver::by_const_ref, x); + td::send_closure(id, &XReceiver::by_const_ref, x); ASSERT_STREQ("[by_const_ref]", sb.as_cslice().c_str()); // Var-->ConstRef (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_const_ref, x); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_const_ref, x); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("[cnstr_copy][by_const_ref]", sb.as_cslice().c_str()); // Var-->LvalueRef @@ -161,24 +169,24 @@ TEST(Actors, simple_pass_event_arguments) { // Var-->Value sb.clear(); - send_closure(id, &XReceiver::by_value, x); + td::send_closure(id, &XReceiver::by_value, x); ASSERT_STREQ("[cnstr_copy][by_value]", sb.as_cslice().c_str()); // Var-->Value (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_value, x); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_value, x); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("[cnstr_copy][cnstr_move][by_value]", sb.as_cslice().c_str()); } -class PrintChar final : public Actor { +class PrintChar final : public td::Actor { public: PrintChar(char c, int cnt) : char_(c), cnt_(cnt) { } - void start_up() override { + void start_up() final { yield(); } - void wakeup() override { + void wakeup() final { if (cnt_ == 0) { stop(); } else { @@ -192,25 +200,23 @@ class PrintChar final : public Actor { char char_; int cnt_; }; -} // namespace // // Yield must add actor to the end of queue // TEST(Actors, simple_hand_yield) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - Scheduler scheduler; - scheduler.init(); + td::Scheduler scheduler; + scheduler.init(0, create_queues(), nullptr); sb.clear(); int cnt = 1000; { auto guard = scheduler.get_guard(); - create_actor<PrintChar>("PrintA", 'A', cnt).release(); - create_actor<PrintChar>("PrintB", 'B', cnt).release(); - create_actor<PrintChar>("PrintC", 'C', cnt).release(); + td::create_actor<PrintChar>("PrintA", 'A', cnt).release(); + td::create_actor<PrintChar>("PrintB", 'B', cnt).release(); + td::create_actor<PrintChar>("PrintC", 'C', cnt).release(); } - scheduler.run(0); - std::string expected; + scheduler.run(td::Timestamp::in(1)); + td::string expected; for (int i = 0; i < cnt; i++) { expected += "ABC"; } @@ -219,7 +225,7 @@ TEST(Actors, simple_hand_yield) { class Ball { public: - friend void start_migrate(Ball &ball, int32 sched_id) { + friend void start_migrate(Ball &ball, td::int32 sched_id) { sb << "start"; } friend void finish_migrate(Ball &ball) { @@ -227,31 +233,30 @@ class Ball { } }; -class Pong final : public Actor { +class Pong final : public td::Actor { public: void pong(Ball ball) { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } }; -class Ping final : public Actor { +class Ping final : public td::Actor { public: - explicit Ping(ActorId<Pong> pong) : pong_(pong) { + explicit Ping(td::ActorId<Pong> pong) : pong_(pong) { } - void start_up() override { - send_closure(pong_, &Pong::pong, Ball()); + void start_up() final { + td::send_closure(pong_, &Pong::pong, Ball()); } private: - ActorId<Pong> pong_; + td::ActorId<Pong> pong_; }; TEST(Actors, simple_migrate) { sb.clear(); sb2.clear(); - ConcurrentScheduler scheduler; - scheduler.init(2); + td::ConcurrentScheduler scheduler(2, 0); auto pong = scheduler.create_actor_unsafe<Pong>(2, "Pong").release(); scheduler.create_actor_unsafe<Ping>(1, "Ping", pong).release(); scheduler.start(); @@ -267,26 +272,25 @@ TEST(Actors, simple_migrate) { #endif } -class OpenClose final : public Actor { +class OpenClose final : public td::Actor { public: explicit OpenClose(int cnt) : cnt_(cnt) { } - void start_up() override { + void start_up() final { yield(); } - void wakeup() override { - ObserverBase *observer = reinterpret_cast<ObserverBase *>(123); + void wakeup() final { + auto observer = reinterpret_cast<td::ObserverBase *>(123); if (cnt_ > 0) { - auto r_file_fd = FileFd::open("server", FileFd::Read | FileFd::Create); - CHECK(r_file_fd.is_ok()) << r_file_fd.error(); + auto r_file_fd = td::FileFd::open("server", td::FileFd::Read | td::FileFd::Create); + LOG_CHECK(r_file_fd.is_ok()) << r_file_fd.error(); auto file_fd = r_file_fd.move_as_ok(); - // LOG(ERROR) << file_fd.get_native_fd(); - file_fd.get_fd().set_observer(observer); + { auto pollable_fd = file_fd.get_poll_info().extract_pollable_fd(observer); } file_fd.close(); cnt_--; yield(); } else { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } } @@ -295,14 +299,8 @@ class OpenClose final : public Actor { }; TEST(Actors, open_close) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - ConcurrentScheduler scheduler; - scheduler.init(2); - int cnt = 1000000; -#if TD_WINDOWS || TD_ANDROID - // TODO(perf) optimize - cnt = 100; -#endif + td::ConcurrentScheduler scheduler(2, 0); + int cnt = 10000; // TODO(perf) optimize scheduler.create_actor_unsafe<OpenClose>(1, "A", cnt).release(); scheduler.create_actor_unsafe<OpenClose>(2, "B", cnt).release(); scheduler.start(); @@ -311,62 +309,59 @@ TEST(Actors, open_close) { scheduler.finish(); } -namespace { -class MsgActor : public Actor { +class MsgActor : public td::Actor { public: virtual void msg() = 0; }; -class Slave : public Actor { +class Slave final : public td::Actor { public: - ActorId<MsgActor> msg; - explicit Slave(ActorId<MsgActor> msg) : msg(msg) { + td::ActorId<MsgActor> msg; + explicit Slave(td::ActorId<MsgActor> msg) : msg(msg) { } - void hangup() override { - send_closure(msg, &MsgActor::msg); + void hangup() final { + td::send_closure(msg, &MsgActor::msg); } }; -class MasterActor : public MsgActor { +class MasterActor final : public MsgActor { public: - void loop() override { + void loop() final { alive_ = true; - slave = create_actor<Slave>("slave", static_cast<ActorId<MsgActor>>(actor_id(this))); + slave = td::create_actor<Slave>("Slave", static_cast<td::ActorId<MsgActor>>(actor_id(this))); stop(); } - ActorOwn<Slave> slave; + td::ActorOwn<Slave> slave; MasterActor() = default; MasterActor(const MasterActor &) = delete; MasterActor &operator=(const MasterActor &) = delete; MasterActor(MasterActor &&) = delete; MasterActor &operator=(MasterActor &&) = delete; - ~MasterActor() override { + ~MasterActor() final { alive_ = 987654321; } - void msg() override { + void msg() final { CHECK(alive_ == 123456789); } - uint64 alive_ = 123456789; + td::uint64 alive_ = 123456789; }; -} // namespace TEST(Actors, call_after_destruct) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - Scheduler scheduler; - scheduler.init(); + td::Scheduler scheduler; + scheduler.init(0, create_queues(), nullptr); { auto guard = scheduler.get_guard(); - create_actor<MasterActor>("Master").release(); + td::create_actor<MasterActor>("Master").release(); } - scheduler.run(0); + scheduler.run(td::Timestamp::in(1)); } -class LinkTokenSlave : public Actor { +class LinkTokenSlave final : public td::Actor { public: - explicit LinkTokenSlave(ActorShared<> parent) : parent_(std::move(parent)) { + explicit LinkTokenSlave(td::ActorShared<> parent) : parent_(std::move(parent)) { } - void add(uint64 link_token) { + void add(td::uint64 link_token) { CHECK(link_token == get_link_token()); } void close() { @@ -374,62 +369,61 @@ class LinkTokenSlave : public Actor { } private: - ActorShared<> parent_; + td::ActorShared<> parent_; }; -class LinkTokenMasterActor : public Actor { +class LinkTokenMasterActor final : public td::Actor { public: explicit LinkTokenMasterActor(int cnt) : cnt_(cnt) { } - void start_up() override { - child_ = create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release(); + void start_up() final { + child_ = td::create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release(); yield(); } - void loop() override { + void loop() final { for (int i = 0; i < 100 && cnt_ > 0; cnt_--, i++) { + auto token = static_cast<td::uint64>(cnt_) + 1; switch (i % 4) { case 0: { - send_closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1); + td::send_closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token); break; } case 1: { - send_closure_later(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1); + td::send_closure_later(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token); break; } case 2: { - EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1) + td::EventCreator::closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token) .try_emit(); break; } case 3: { - EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1) + td::EventCreator::closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token) .try_emit_later(); break; } } } if (cnt_ == 0) { - send_closure(child_, &LinkTokenSlave::close); + td::send_closure(child_, &LinkTokenSlave::close); } else { yield(); } } - void hangup_shared() override { + void hangup_shared() final { CHECK(get_link_token() == 123); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); stop(); } private: int cnt_; - ActorId<LinkTokenSlave> child_; + td::ActorId<LinkTokenSlave> child_; }; TEST(Actors, link_token) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); auto cnt = 100000; scheduler.create_actor_unsafe<LinkTokenMasterActor>(0, "A", cnt).release(); scheduler.start(); @@ -440,25 +434,25 @@ TEST(Actors, link_token) { TEST(Actors, promise) { int value = -1; - Promise<int> p1 = PromiseCreator::lambda([&](int x) { value = x; }); - p1.set_error(Status::Error("Test error")); + td::Promise<int> p1 = td::PromiseCreator::lambda([&](int x) { value = x; }); + p1.set_error(td::Status::Error("Test error")); ASSERT_EQ(0, value); - Promise<int32> p2 = PromiseCreator::lambda([&](Result<int32> x) { value = 1; }); - p2.set_error(Status::Error("Test error")); + td::Promise<td::int32> p2 = td::PromiseCreator::lambda([&](td::Result<td::int32> x) { value = 1; }); + p2.set_error(td::Status::Error("Test error")); ASSERT_EQ(1, value); } -class LaterSlave : public Actor { +class LaterSlave final : public td::Actor { public: - explicit LaterSlave(ActorShared<> parent) : parent_(std::move(parent)) { + explicit LaterSlave(td::ActorShared<> parent) : parent_(std::move(parent)) { } private: - ActorShared<> parent_; + td::ActorShared<> parent_; - void hangup() override { + void hangup() final { sb << "A"; - send_closure(actor_id(this), &LaterSlave::finish); + td::send_closure(actor_id(this), &LaterSlave::finish); } void finish() { sb << "B"; @@ -466,31 +460,29 @@ class LaterSlave : public Actor { } }; -class LaterMasterActor : public Actor { +class LaterMasterActor final : public td::Actor { int cnt_ = 3; - std::vector<ActorOwn<LaterSlave>> children_; - void start_up() override { + td::vector<td::ActorOwn<LaterSlave>> children_; + void start_up() final { for (int i = 0; i < cnt_; i++) { - children_.push_back(create_actor<LaterSlave>("B", actor_shared())); + children_.push_back(td::create_actor<LaterSlave>("B", actor_shared(this))); } yield(); } - void loop() override { + void loop() final { children_.clear(); } - void hangup_shared() override { + void hangup_shared() final { if (!--cnt_) { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); stop(); } } }; TEST(Actors, later) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); sb.clear(); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<LaterMasterActor>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { @@ -499,39 +491,36 @@ TEST(Actors, later) { ASSERT_STREQ(sb.as_cslice().c_str(), "AAABBB"); } -class MultiPromise2 : public Actor { +class MultiPromise2 final : public td::Actor { public: - void start_up() override { - auto promise = PromiseCreator::lambda([](Result<Unit> result) { + void start_up() final { + auto promise = td::PromiseCreator::lambda([](td::Result<td::Unit> result) { result.ensure(); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); }); - MultiPromiseActorSafe multi_promise; + td::MultiPromiseActorSafe multi_promise{"MultiPromiseActor2"}; multi_promise.add_promise(std::move(promise)); for (int i = 0; i < 10; i++) { - create_actor<SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release(); + td::create_actor<td::SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release(); } } }; -class MultiPromise1 : public Actor { +class MultiPromise1 final : public td::Actor { public: - void start_up() override { - auto promise = PromiseCreator::lambda([](Result<Unit> result) { + void start_up() final { + auto promise = td::PromiseCreator::lambda([](td::Result<td::Unit> result) { CHECK(result.is_error()); - create_actor<MultiPromise2>("B").release(); + td::create_actor<MultiPromise2>("B").release(); }); - MultiPromiseActorSafe multi_promise; + td::MultiPromiseActorSafe multi_promise{"MultiPromiseActor1"}; multi_promise.add_promise(std::move(promise)); } }; TEST(Actors, MultiPromise) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - sb.clear(); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<MultiPromise1>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { @@ -539,23 +528,20 @@ TEST(Actors, MultiPromise) { scheduler.finish(); } -class FastPromise : public Actor { +class FastPromise final : public td::Actor { public: - void start_up() override { - PromiseFuture<int> pf; + void start_up() final { + td::PromiseFuture<int> pf; auto promise = pf.move_promise(); auto future = pf.move_future(); promise.set_value(123); CHECK(future.move_as_ok() == 123); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } }; TEST(Actors, FastPromise) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - sb.clear(); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<FastPromise>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { @@ -563,21 +549,18 @@ TEST(Actors, FastPromise) { scheduler.finish(); } -class StopInTeardown : public Actor { - void loop() override { +class StopInTeardown final : public td::Actor { + void loop() final { stop(); } - void tear_down() override { + void tear_down() final { stop(); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } }; TEST(Actors, stop_in_teardown) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - sb.clear(); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<StopInTeardown>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { @@ -585,38 +568,113 @@ TEST(Actors, stop_in_teardown) { scheduler.finish(); } -class AlwaysWaitForMailbox : public Actor { +class AlwaysWaitForMailbox final : public td::Actor { public: - void start_up() override { + void start_up() final { always_wait_for_mailbox(); - create_actor<SleepActor>("Sleep", 0.1, PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](Unit) { - send_closure(actor_id, &AlwaysWaitForMailbox::g); - send_closure(actor_id, &AlwaysWaitForMailbox::g); - CHECK(!ptr->was_f_); - })) + td::create_actor<td::SleepActor>("Sleep", 0.1, + td::PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](td::Unit) { + td::send_closure(actor_id, &AlwaysWaitForMailbox::g); + td::send_closure(actor_id, &AlwaysWaitForMailbox::g); + CHECK(!ptr->was_f_); + })) .release(); } void f() { was_f_ = true; - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } void g() { - send_closure(actor_id(this), &AlwaysWaitForMailbox::f); + td::send_closure(actor_id(this), &AlwaysWaitForMailbox::f); } private: - Timeout timeout_; bool was_f_{false}; }; TEST(Actors, always_wait_for_mailbox) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<AlwaysWaitForMailbox>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { } scheduler.finish(); } + +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED +TEST(Actors, send_from_other_threads) { + td::ConcurrentScheduler scheduler(1, 0); + int thread_n = 10; + class Listener final : public td::Actor { + public: + explicit Listener(int cnt) : cnt_(cnt) { + } + void dec() { + if (--cnt_ == 0) { + td::Scheduler::instance()->finish(); + } + } + + private: + int cnt_; + }; + + auto A = scheduler.create_actor_unsafe<Listener>(1, "A", thread_n).release(); + scheduler.start(); + td::vector<td::thread> threads(thread_n); + for (auto &thread : threads) { + thread = td::thread([&A, &scheduler] { + auto guard = scheduler.get_send_guard(); + td::send_closure(A, &Listener::dec); + }); + } + while (scheduler.run_main(10)) { + } + for (auto &thread : threads) { + thread.join(); + } + scheduler.finish(); +} +#endif + +class DelayedCall final : public td::Actor { + public: + void on_called(int *step) { + CHECK(*step == 0); + *step = 1; + } +}; + +class MultiPromiseSendClosureLaterTest final : public td::Actor { + public: + void start_up() final { + delayed_call_ = td::create_actor<DelayedCall>("DelayedCall").release(); + mpa_.add_promise(td::PromiseCreator::lambda([this](td::Unit) { + CHECK(step_ == 1); + step_++; + td::Scheduler::instance()->finish(); + })); + auto lock = mpa_.get_promise(); + td::send_closure_later(delayed_call_, &DelayedCall::on_called, &step_); + lock.set_value(td::Unit()); + } + + void tear_down() final { + CHECK(step_ == 2); + } + + private: + int step_ = 0; + td::MultiPromiseActor mpa_{"MultiPromiseActor"}; + td::ActorId<DelayedCall> delayed_call_; +}; + +TEST(Actors, MultiPromiseSendClosureLater) { + td::ConcurrentScheduler scheduler(0, 0); + scheduler.create_actor_unsafe<MultiPromiseSendClosureLaterTest>(0, "MultiPromiseSendClosureLaterTest").release(); + scheduler.start(); + while (scheduler.run_main(1)) { + } + scheduler.finish(); +} diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp index b97a258a44..bac42e3fd5 100644 --- a/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp @@ -1,22 +1,17 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#include "td/utils/tests.h" - #include "td/actor/actor.h" +#include "td/actor/ConcurrentScheduler.h" -#include "td/utils/logging.h" - -REGISTER_TESTS(actors_workers); - -namespace { - -using namespace td; +#include "td/utils/common.h" +#include "td/utils/SliceBuilder.h" +#include "td/utils/tests.h" -class PowerWorker final : public Actor { +class PowerWorker final : public td::Actor { public: class Callback { public: @@ -29,12 +24,12 @@ class PowerWorker final : public Actor { virtual void on_ready(int query, int res) = 0; virtual void on_closed() = 0; }; - void set_callback(unique_ptr<Callback> callback) { + void set_callback(td::unique_ptr<Callback> callback) { callback_ = std::move(callback); } - void task(uint32 x, uint32 p) { - uint32 res = 1; - for (uint32 i = 0; i < p; i++) { + void task(td::uint32 x, td::uint32 p) { + td::uint32 res = 1; + for (td::uint32 i = 0; i < p; i++) { res *= x; } callback_->on_ready(x, res); @@ -45,39 +40,41 @@ class PowerWorker final : public Actor { } private: - std::unique_ptr<Callback> callback_; + td::unique_ptr<Callback> callback_; }; -class Manager final : public Actor { +class Manager final : public td::Actor { public: - Manager(int queries_n, int query_size, std::vector<ActorId<PowerWorker>> workers) - : workers_(std::move(workers)), left_query_(queries_n), query_size_(query_size) { + Manager(int queries_n, int query_size, td::vector<td::ActorId<PowerWorker>> workers) + : workers_(std::move(workers)) + , ref_cnt_(static_cast<int>(workers_.size())) + , left_query_(queries_n) + , query_size_(query_size) { } - class Callback : public PowerWorker::Callback { + class Callback final : public PowerWorker::Callback { public: - Callback(ActorId<Manager> actor_id, int worker_id) : actor_id_(actor_id), worker_id_(worker_id) { + Callback(td::ActorId<Manager> actor_id, int worker_id) : actor_id_(actor_id), worker_id_(worker_id) { } - void on_ready(int query, int result) override { - send_closure(actor_id_, &Manager::on_ready, worker_id_, query, result); + void on_ready(int query, int result) final { + td::send_closure(actor_id_, &Manager::on_ready, worker_id_, query, result); } - void on_closed() override { - send_closure_later(actor_id_, &Manager::on_closed, worker_id_); + void on_closed() final { + td::send_closure_later(actor_id_, &Manager::on_closed, worker_id_); } private: - ActorId<Manager> actor_id_; + td::ActorId<Manager> actor_id_; int worker_id_; }; - void start_up() override { - ref_cnt_ = static_cast<int>(workers_.size()); + void start_up() final { int i = 0; for (auto &worker : workers_) { ref_cnt_++; - send_closure_later(worker, &PowerWorker::set_callback, make_unique<Callback>(actor_id(this), i)); + td::send_closure_later(worker, &PowerWorker::set_callback, td::make_unique<Callback>(actor_id(this), i)); i++; - send_closure_later(worker, &PowerWorker::task, 3, query_size_); + td::send_closure_later(worker, &PowerWorker::task, 3, query_size_); left_query_--; } } @@ -85,10 +82,10 @@ class Manager final : public Actor { void on_ready(int worker_id, int query, int res) { ref_cnt_--; if (left_query_ == 0) { - send_closure(workers_[worker_id], &PowerWorker::close); + td::send_closure(workers_[worker_id], &PowerWorker::close); } else { ref_cnt_++; - send_closure(workers_[worker_id], &PowerWorker::task, 3, query_size_); + td::send_closure(workers_[worker_id], &PowerWorker::task, 3, query_size_); left_query_--; } } @@ -96,30 +93,27 @@ class Manager final : public Actor { void on_closed(int worker_id) { ref_cnt_--; if (ref_cnt_ == 0) { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); stop(); } } private: - std::vector<ActorId<PowerWorker>> workers_; - int left_query_; + td::vector<td::ActorId<PowerWorker>> workers_; int ref_cnt_; + int left_query_; int query_size_; }; -void test_workers(int threads_n, int workers_n, int queries_n, int query_size) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - - ConcurrentScheduler sched; - sched.init(threads_n); +static void test_workers(int threads_n, int workers_n, int queries_n, int query_size) { + td::ConcurrentScheduler sched(threads_n, 0); - std::vector<ActorId<PowerWorker>> workers; + td::vector<td::ActorId<PowerWorker>> workers; for (int i = 0; i < workers_n; i++) { int thread_id = threads_n ? i % (threads_n - 1) + 2 : 0; - workers.push_back(sched.create_actor_unsafe<PowerWorker>(thread_id, "worker" + to_string(i)).release()); + workers.push_back(sched.create_actor_unsafe<PowerWorker>(thread_id, PSLICE() << "worker" << i).release()); } - sched.create_actor_unsafe<Manager>(threads_n ? 1 : 0, "manager", queries_n, query_size, std::move(workers)).release(); + sched.create_actor_unsafe<Manager>(threads_n ? 1 : 0, "Manager", queries_n, query_size, std::move(workers)).release(); sched.start(); while (sched.run_main(10)) { @@ -129,7 +123,6 @@ void test_workers(int threads_n, int workers_n, int queries_n, int query_size) { // sched.test_one_thread_run(); } -} // namespace TEST(Actors, workers_big_query_one_thread) { test_workers(0, 10, 1000, 300000); @@ -144,13 +137,13 @@ TEST(Actors, workers_big_query_nine_threads) { } TEST(Actors, workers_small_query_one_thread) { - test_workers(0, 10, 1000000, 1); + test_workers(0, 10, 100000, 1); } TEST(Actors, workers_small_query_two_threads) { - test_workers(2, 10, 1000000, 1); + test_workers(2, 10, 100000, 1); } TEST(Actors, workers_small_query_nine_threads) { - test_workers(9, 10, 1000000, 1); + test_workers(9, 10, 10000, 1); } |