summaryrefslogtreecommitdiff
path: root/protocols/Telegram/tdlib/td/tdactor/test
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/Telegram/tdlib/td/tdactor/test')
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp93
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp535
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp322
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp428
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp87
5 files changed, 545 insertions, 920 deletions
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp
index f4267f2818..0720f0ed6f 100644
--- a/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp
@@ -1,38 +1,39 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
-#include "td/utils/tests.h"
-
-#include "td/actor/Timeout.h"
+#include "td/actor/actor.h"
+#include "td/actor/ConcurrentScheduler.h"
+#include "td/actor/MultiTimeout.h"
-using namespace td;
+#include "td/utils/common.h"
+#include "td/utils/logging.h"
+#include "td/utils/Random.h"
+#include "td/utils/tests.h"
TEST(MultiTimeout, bug) {
- ConcurrentScheduler sched;
- int threads_n = 0;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(0, 0);
sched.start();
- std::unique_ptr<MultiTimeout> multi_timeout;
+ td::unique_ptr<td::MultiTimeout> multi_timeout;
struct Data {
- MultiTimeout *multi_timeout;
+ td::MultiTimeout *multi_timeout;
};
Data data;
{
- auto guard = sched.get_current_guard();
- multi_timeout = std::make_unique<MultiTimeout>();
+ auto guard = sched.get_main_guard();
+ multi_timeout = td::make_unique<td::MultiTimeout>("MultiTimeout");
data.multi_timeout = multi_timeout.get();
- multi_timeout->set_callback([](void *void_data, int64 key) {
+ multi_timeout->set_callback([](void *void_data, td::int64 key) {
auto &data = *static_cast<Data *>(void_data);
if (key == 1) {
data.multi_timeout->cancel_timeout(key + 1);
data.multi_timeout->set_timeout_in(key + 2, 1);
} else {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
});
multi_timeout->set_callback_data(&data);
@@ -45,3 +46,67 @@ TEST(MultiTimeout, bug) {
}
sched.finish();
}
+
+class TimeoutManager final : public td::Actor {
+ static td::int32 count;
+
+ public:
+ TimeoutManager() {
+ count++;
+
+ test_timeout_.set_callback(on_test_timeout_callback);
+ test_timeout_.set_callback_data(static_cast<void *>(this));
+ }
+ TimeoutManager(const TimeoutManager &) = delete;
+ TimeoutManager &operator=(const TimeoutManager &) = delete;
+ TimeoutManager(TimeoutManager &&) = delete;
+ TimeoutManager &operator=(TimeoutManager &&) = delete;
+ ~TimeoutManager() final {
+ count--;
+ LOG(INFO) << "Destroy TimeoutManager";
+ }
+
+ static void on_test_timeout_callback(void *timeout_manager_ptr, td::int64 id) {
+ CHECK(count >= 0);
+ if (count == 0) {
+ LOG(ERROR) << "Receive timeout after manager was closed";
+ return;
+ }
+
+ auto manager = static_cast<TimeoutManager *>(timeout_manager_ptr);
+ send_closure_later(manager->actor_id(manager), &TimeoutManager::test_timeout);
+ }
+
+ void test_timeout() {
+ CHECK(count > 0);
+ // we must yield scheduler, so run_main breaks immediately, if timeouts are handled immediately
+ td::Scheduler::instance()->yield();
+ }
+
+ td::MultiTimeout test_timeout_{"TestTimeout"};
+};
+
+td::int32 TimeoutManager::count;
+
+TEST(MultiTimeout, Destroy) {
+ td::ConcurrentScheduler sched(0, 0);
+
+ auto timeout_manager = sched.create_actor_unsafe<TimeoutManager>(0, "TimeoutManager");
+ TimeoutManager *manager = timeout_manager.get().get_actor_unsafe();
+ sched.start();
+ int cnt = 100;
+ while (sched.run_main(cnt == 100 || cnt <= 0 ? 0.001 : 10)) {
+ auto guard = sched.get_main_guard();
+ cnt--;
+ if (cnt > 0) {
+ for (int i = 0; i < 2; i++) {
+ manager->test_timeout_.set_timeout_in(td::Random::fast(0, 1000000000), td::Random::fast(2, 5) / 1000.0);
+ }
+ } else if (cnt == 0) {
+ timeout_manager.reset();
+ } else if (cnt == -10) {
+ td::Scheduler::instance()->finish();
+ }
+ }
+ sched.finish();
+}
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp
deleted file mode 100644
index 9185fe8858..0000000000
--- a/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp
+++ /dev/null
@@ -1,535 +0,0 @@
-//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
-//
-// Distributed under the Boost Software License, Version 1.0. (See accompanying
-// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-//
-#include "td/actor/impl2/ActorLocker.h"
-#include "td/actor/impl2/Scheduler.h"
-
-#include "td/utils/format.h"
-#include "td/utils/logging.h"
-#include "td/utils/port/thread.h"
-#include "td/utils/Slice.h"
-#include "td/utils/StringBuilder.h"
-#include "td/utils/tests.h"
-#include "td/utils/Time.h"
-
-#include <array>
-#include <atomic>
-#include <deque>
-#include <memory>
-
-using td::actor2::ActorLocker;
-using td::actor2::ActorSignals;
-using td::actor2::ActorState;
-using td::actor2::SchedulerId;
-
-TEST(Actor2, signals) {
- ActorSignals signals;
- signals.add_signal(ActorSignals::Wakeup);
- signals.add_signal(ActorSignals::Cpu);
- signals.add_signal(ActorSignals::Kill);
- signals.clear_signal(ActorSignals::Cpu);
-
- bool was_kill = false;
- bool was_wakeup = false;
- while (!signals.empty()) {
- auto s = signals.first_signal();
- if (s == ActorSignals::Kill) {
- was_kill = true;
- } else if (s == ActorSignals::Wakeup) {
- was_wakeup = true;
- } else {
- UNREACHABLE();
- }
- signals.clear_signal(s);
- }
- CHECK(was_kill && was_wakeup);
-}
-
-TEST(Actors2, flags) {
- ActorState::Flags flags;
- CHECK(!flags.is_locked());
- flags.set_locked(true);
- CHECK(flags.is_locked());
- flags.set_locked(false);
- CHECK(!flags.is_locked());
- flags.set_pause(true);
-
- flags.set_scheduler_id(SchedulerId{123});
-
- auto signals = flags.get_signals();
- CHECK(signals.empty());
- signals.add_signal(ActorSignals::Cpu);
- signals.add_signal(ActorSignals::Kill);
- CHECK(signals.has_signal(ActorSignals::Cpu));
- CHECK(signals.has_signal(ActorSignals::Kill));
- flags.set_signals(signals);
- CHECK(flags.get_signals().raw() == signals.raw()) << flags.get_signals().raw() << " " << signals.raw();
-
- auto wakeup = ActorSignals{};
- wakeup.add_signal(ActorSignals::Wakeup);
-
- flags.add_signals(wakeup);
- signals.add_signal(ActorSignals::Wakeup);
- CHECK(flags.get_signals().raw() == signals.raw());
-
- flags.clear_signals();
- CHECK(flags.get_signals().empty());
-
- CHECK(flags.get_scheduler_id().value() == 123);
- CHECK(flags.is_pause());
-}
-
-TEST(Actor2, locker) {
- ActorState state;
-
- ActorSignals kill_signal;
- kill_signal.add_signal(ActorSignals::Kill);
-
- ActorSignals wakeup_signal;
- kill_signal.add_signal(ActorSignals::Wakeup);
-
- ActorSignals cpu_signal;
- kill_signal.add_signal(ActorSignals::Cpu);
-
- {
- ActorLocker lockerA(&state);
- ActorLocker lockerB(&state);
- ActorLocker lockerC(&state);
-
- CHECK(lockerA.try_lock());
- CHECK(lockerA.own_lock());
- auto flagsA = lockerA.flags();
- CHECK(lockerA.try_unlock(flagsA));
- CHECK(!lockerA.own_lock());
-
- CHECK(lockerA.try_lock());
- CHECK(!lockerB.try_lock());
- CHECK(!lockerC.try_lock());
-
- CHECK(lockerB.try_add_signals(kill_signal));
- CHECK(!lockerC.try_add_signals(wakeup_signal));
- CHECK(lockerC.try_add_signals(wakeup_signal));
- CHECK(!lockerC.add_signals(cpu_signal));
- CHECK(!lockerA.flags().has_signals());
- CHECK(!lockerA.try_unlock(lockerA.flags()));
- {
- auto flags = lockerA.flags();
- auto signals = flags.get_signals();
- bool was_kill = false;
- bool was_wakeup = false;
- bool was_cpu = false;
- while (!signals.empty()) {
- auto s = signals.first_signal();
- if (s == ActorSignals::Kill) {
- was_kill = true;
- } else if (s == ActorSignals::Wakeup) {
- was_wakeup = true;
- } else if (s == ActorSignals::Cpu) {
- was_cpu = true;
- } else {
- UNREACHABLE();
- }
- signals.clear_signal(s);
- }
- CHECK(was_kill && was_wakeup && was_cpu);
- flags.clear_signals();
- CHECK(lockerA.try_unlock(flags));
- }
- }
-
- {
- ActorLocker lockerB(&state);
- CHECK(lockerB.try_lock());
- CHECK(lockerB.try_unlock(lockerB.flags()));
- CHECK(lockerB.add_signals(kill_signal));
- CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill));
- auto flags = lockerB.flags();
- flags.clear_signals();
- ActorLocker lockerA(&state);
- CHECK(!lockerA.add_signals(kill_signal));
- CHECK(!lockerB.try_unlock(flags));
- CHECK(!lockerA.add_signals(kill_signal)); // do not loose this signal!
- CHECK(!lockerB.try_unlock(flags));
- CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill));
- CHECK(lockerB.try_unlock(flags));
- }
-
- {
- ActorLocker lockerA(&state);
- CHECK(lockerA.try_lock());
- auto flags = lockerA.flags();
- flags.set_pause(true);
- CHECK(lockerA.try_unlock(flags));
- //We have to lock, though we can't execute.
- CHECK(lockerA.add_signals(wakeup_signal));
- }
-}
-
-#if !TD_THREAD_UNSUPPORTED
-TEST(Actor2, locker_stress) {
- ActorState state;
-
- constexpr size_t threads_n = 5;
- auto stage = [&](std::atomic<int> &value, int need) {
- value.fetch_add(1, std::memory_order_release);
- while (value.load(std::memory_order_acquire) < need) {
- td::this_thread::yield();
- }
- };
-
- struct Node {
- std::atomic<td::uint32> request{0};
- td::uint32 response = 0;
- char pad[64];
- };
- std::array<Node, threads_n> nodes;
- auto do_work = [&]() {
- for (auto &node : nodes) {
- auto query = node.request.load(std::memory_order_acquire);
- if (query) {
- node.response = query * query;
- node.request.store(0, std::memory_order_relaxed);
- }
- }
- };
-
- std::atomic<int> begin{0};
- std::atomic<int> ready{0};
- std::atomic<int> check{0};
- std::atomic<int> finish{0};
- std::vector<td::thread> threads;
- for (size_t i = 0; i < threads_n; i++) {
- threads.push_back(td::thread([&, id = i] {
- for (size_t i = 1; i < 1000000; i++) {
- ActorLocker locker(&state);
- auto need = static_cast<int>(threads_n * i);
- auto query = static_cast<td::uint32>(id + need);
- stage(begin, need);
- nodes[id].request = 0;
- nodes[id].response = 0;
- stage(ready, need);
- if (locker.try_lock()) {
- nodes[id].response = query * query;
- } else {
- auto cpu = ActorSignals::one(ActorSignals::Cpu);
- nodes[id].request.store(query, std::memory_order_release);
- locker.add_signals(cpu);
- }
- while (locker.own_lock()) {
- auto flags = locker.flags();
- auto signals = flags.get_signals();
- if (!signals.empty()) {
- do_work();
- }
- flags.clear_signals();
- locker.try_unlock(flags);
- }
-
- stage(check, need);
- if (id == 0) {
- CHECK(locker.add_signals(ActorSignals{}));
- CHECK(!locker.flags().has_signals());
- CHECK(locker.try_unlock(locker.flags()));
- for (size_t thread_id = 0; thread_id < threads_n; thread_id++) {
- CHECK(nodes[thread_id].response ==
- static_cast<td::uint32>(thread_id + need) * static_cast<td::uint32>(thread_id + need))
- << td::tag("thread", thread_id) << " " << nodes[thread_id].response << " "
- << nodes[thread_id].request.load();
- }
- }
- }
- }));
- }
- for (auto &thread : threads) {
- thread.join();
- }
-}
-
-namespace {
-const size_t BUF_SIZE = 1024 * 1024;
-char buf[BUF_SIZE];
-td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1));
-} // namespace
-
-TEST(Actor2, executor_simple) {
- using namespace td;
- using namespace td::actor2;
- struct Dispatcher : public SchedulerDispatcher {
- void add_to_queue(ActorInfoPtr ptr, SchedulerId scheduler_id, bool need_poll) override {
- queue.push_back(std::move(ptr));
- }
- void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) override {
- UNREACHABLE();
- }
- SchedulerId get_scheduler_id() const override {
- return SchedulerId{0};
- }
- std::deque<ActorInfoPtr> queue;
- };
- Dispatcher dispatcher;
-
- class TestActor : public Actor {
- public:
- void close() {
- stop();
- }
-
- private:
- void start_up() override {
- sb << "StartUp";
- }
- void tear_down() override {
- sb << "TearDown";
- }
- };
- ActorInfoCreator actor_info_creator;
- auto actor = actor_info_creator.create(
- std::make_unique<TestActor>(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor"));
- dispatcher.add_to_queue(actor, SchedulerId{0}, false);
-
- {
- ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
- CHECK(executor.can_send());
- CHECK(executor.can_send_immediate());
- CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice();
- sb.clear();
- executor.send(ActorMessageCreator::lambda([&] { sb << "A"; }));
- CHECK(sb.as_cslice() == "A") << sb.as_cslice();
- sb.clear();
- auto big_message = ActorMessageCreator::lambda([&] { sb << "big"; });
- big_message.set_big();
- executor.send(std::move(big_message));
- CHECK(sb.as_cslice() == "") << sb.as_cslice();
- executor.send(ActorMessageCreator::lambda([&] { sb << "A"; }));
- CHECK(sb.as_cslice() == "") << sb.as_cslice();
- }
- CHECK(dispatcher.queue.size() == 1);
- { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue()); }
- CHECK(dispatcher.queue.size() == 1);
- dispatcher.queue.clear();
- CHECK(sb.as_cslice() == "bigA") << sb.as_cslice();
- sb.clear();
- {
- ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
- executor.send(
- ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
- }
- CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice();
- sb.clear();
- CHECK(!actor->has_actor());
- {
- ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
- executor.send(
- ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
- }
- CHECK(dispatcher.queue.empty());
- CHECK(sb.as_cslice() == "");
-}
-
-using namespace td::actor2;
-using td::uint32;
-static std::atomic<int> cnt;
-class Worker : public Actor {
- public:
- void query(uint32 x, ActorInfoPtr master);
- void close() {
- stop();
- }
-};
-class Master : public Actor {
- public:
- void on_result(uint32 x, uint32 y) {
- loop();
- }
-
- private:
- uint32 l = 0;
- uint32 r = 100000;
- ActorInfoPtr worker;
- void start_up() override {
- worker = detail::create_actor<Worker>(ActorOptions().with_name("Master"));
- loop();
- }
- void loop() override {
- l++;
- if (l == r) {
- if (!--cnt) {
- SchedulerContext::get()->stop();
- }
- detail::send_closure(*worker, &Worker::close);
- stop();
- return;
- }
- detail::send_lambda(*worker,
- [x = l, self = get_actor_info_ptr()] { detail::current_actor<Worker>().query(x, self); });
- }
-};
-
-void Worker::query(uint32 x, ActorInfoPtr master) {
- auto y = x;
- for (int i = 0; i < 100; i++) {
- y = y * y;
- }
- detail::send_lambda(*master, [result = y, x] { detail::current_actor<Master>().on_result(x, result); });
-}
-
-TEST(Actor2, scheduler_simple) {
- auto group_info = std::make_shared<SchedulerGroupInfo>(1);
- Scheduler scheduler{group_info, SchedulerId{0}, 2};
- scheduler.start();
- scheduler.run_in_context([] {
- cnt = 10;
- for (int i = 0; i < 10; i++) {
- detail::create_actor<Master>(ActorOptions().with_name("Master"));
- }
- });
- while (scheduler.run(1000)) {
- }
- Scheduler::close_scheduler_group(*group_info);
-}
-
-TEST(Actor2, actor_id_simple) {
- auto group_info = std::make_shared<SchedulerGroupInfo>(1);
- Scheduler scheduler{group_info, SchedulerId{0}, 2};
- sb.clear();
- scheduler.start();
-
- scheduler.run_in_context([] {
- class A : public Actor {
- public:
- A(int value) : value_(value) {
- sb << "A" << value_;
- }
- void hello() {
- sb << "hello";
- }
- ~A() {
- sb << "~A";
- if (--cnt <= 0) {
- SchedulerContext::get()->stop();
- }
- }
-
- private:
- int value_;
- };
- cnt = 1;
- auto id = create_actor<A>("A", 123);
- CHECK(sb.as_cslice() == "A123");
- sb.clear();
- send_closure(id, &A::hello);
- });
- while (scheduler.run(1000)) {
- }
- CHECK(sb.as_cslice() == "hello~A");
- Scheduler::close_scheduler_group(*group_info);
- sb.clear();
-}
-
-TEST(Actor2, actor_creation) {
- auto group_info = std::make_shared<SchedulerGroupInfo>(1);
- Scheduler scheduler{group_info, SchedulerId{0}, 1};
- scheduler.start();
-
- scheduler.run_in_context([]() mutable {
- class B;
- class A : public Actor {
- public:
- void f() {
- check();
- stop();
- }
-
- private:
- void start_up() override {
- check();
- create_actor<B>("Simple", actor_id(this)).release();
- }
-
- void check() {
- auto &context = *SchedulerContext::get();
- CHECK(context.has_poll());
- context.get_poll();
- }
-
- void tear_down() override {
- if (--cnt <= 0) {
- SchedulerContext::get()->stop();
- }
- }
- };
-
- class B : public Actor {
- public:
- B(ActorId<A> a) : a_(a) {
- }
-
- private:
- void start_up() override {
- auto &context = *SchedulerContext::get();
- CHECK(!context.has_poll());
- send_closure(a_, &A::f);
- stop();
- }
- void tear_down() override {
- if (--cnt <= 0) {
- SchedulerContext::get()->stop();
- }
- }
- ActorId<A> a_;
- };
- cnt = 2;
- create_actor<A>(ActorOptions().with_name("Poll").with_poll()).release();
- });
- while (scheduler.run(1000)) {
- }
- scheduler.stop();
- Scheduler::close_scheduler_group(*group_info);
-}
-
-TEST(Actor2, actor_timeout_simple) {
- auto group_info = std::make_shared<SchedulerGroupInfo>(1);
- Scheduler scheduler{group_info, SchedulerId{0}, 2};
- sb.clear();
- scheduler.start();
-
- scheduler.run_in_context([] {
- class A : public Actor {
- public:
- void start_up() override {
- set_timeout();
- }
- void alarm() override {
- double diff = td::Time::now() - expected_timeout_;
- CHECK(-0.001 < diff && diff < 0.1) << diff;
- if (cnt_-- > 0) {
- set_timeout();
- } else {
- stop();
- }
- }
-
- void tear_down() override {
- SchedulerContext::get()->stop();
- }
-
- private:
- double expected_timeout_;
- int cnt_ = 5;
- void set_timeout() {
- auto wakeup_timestamp = td::Timestamp::in(0.1);
- expected_timeout_ = wakeup_timestamp.at();
- alarm_timestamp() = wakeup_timestamp;
- }
- };
- create_actor<A>(ActorInfoCreator::Options().with_name("A").with_poll()).release();
- });
- while (scheduler.run(1000)) {
- }
- Scheduler::close_scheduler_group(*group_info);
- sb.clear();
-}
-#endif //!TD_THREAD_UNSUPPORTED
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp
index ffceacc595..628b74a94c 100644
--- a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp
@@ -1,35 +1,32 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
-#include "td/utils/tests.h"
-
#include "td/actor/actor.h"
+#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/PromiseFuture.h"
+#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/Random.h"
+#include "td/utils/ScopeGuard.h"
+#include "td/utils/tests.h"
#include <limits>
#include <map>
+#include <memory>
#include <utility>
-using namespace td;
-
-REGISTER_TESTS(actors_main);
-
-namespace {
-
template <class ContainerT>
static typename ContainerT::value_type &rand_elem(ContainerT &cont) {
CHECK(0 < cont.size() && cont.size() <= static_cast<size_t>(std::numeric_limits<int>::max()));
- return cont[Random::fast(0, static_cast<int>(cont.size()) - 1)];
+ return cont[td::Random::fast(0, static_cast<int>(cont.size()) - 1)];
}
-static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) {
- uint32 res = 1;
+static td::uint32 fast_pow_mod_uint32(td::uint32 x, td::uint32 p) {
+ td::uint32 res = 1;
while (p) {
if (p & 1) {
res *= x;
@@ -40,25 +37,25 @@ static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) {
return res;
}
-static uint32 slow_pow_mod_uint32(uint32 x, uint32 p) {
- uint32 res = 1;
- for (uint32 i = 0; i < p; i++) {
+static td::uint32 slow_pow_mod_uint32(td::uint32 x, td::uint32 p) {
+ td::uint32 res = 1;
+ for (td::uint32 i = 0; i < p; i++) {
res *= x;
}
return res;
}
-struct Query {
- uint32 query_id;
- uint32 result;
- std::vector<int> todo;
- Query() = default;
- Query(const Query &) = delete;
- Query &operator=(const Query &) = delete;
- Query(Query &&) = default;
- Query &operator=(Query &&) = default;
- ~Query() {
- CHECK(todo.empty()) << "Query lost";
+struct ActorQuery {
+ td::uint32 query_id{};
+ td::uint32 result{};
+ td::vector<int> todo;
+ ActorQuery() = default;
+ ActorQuery(const ActorQuery &) = delete;
+ ActorQuery &operator=(const ActorQuery &) = delete;
+ ActorQuery(ActorQuery &&) = default;
+ ActorQuery &operator=(ActorQuery &&) = default;
+ ~ActorQuery() {
+ LOG_CHECK(todo.empty()) << "ActorQuery lost";
}
int next_pow() {
CHECK(!todo.empty());
@@ -71,25 +68,25 @@ struct Query {
}
};
-static uint32 fast_calc(Query &q) {
- uint32 result = q.result;
+static td::uint32 fast_calc(ActorQuery &q) {
+ td::uint32 result = q.result;
for (auto x : q.todo) {
result = fast_pow_mod_uint32(result, x);
}
return result;
}
-class Worker final : public Actor {
+class Worker final : public td::Actor {
public:
explicit Worker(int threads_n) : threads_n_(threads_n) {
}
- void query(PromiseActor<uint32> &&promise, uint32 x, uint32 p) {
- uint32 result = slow_pow_mod_uint32(x, p);
+ void query(td::PromiseActor<td::uint32> &&promise, td::uint32 x, td::uint32 p) {
+ td::uint32 result = slow_pow_mod_uint32(x, p);
promise.set_value(std::move(result));
(void)threads_n_;
- // if (threads_n_ > 1 && Random::fast(0, 9) == 0) {
- // migrate(Random::fast(2, threads_n));
+ // if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) {
+ // migrate(td::Random::fast(2, threads_n));
//}
}
@@ -97,7 +94,7 @@ class Worker final : public Actor {
int threads_n_;
};
-class QueryActor final : public Actor {
+class QueryActor final : public td::Actor {
public:
class Callback {
public:
@@ -107,44 +104,46 @@ class QueryActor final : public Actor {
Callback(Callback &&) = delete;
Callback &operator=(Callback &&) = delete;
virtual ~Callback() = default;
- virtual void on_result(Query &&query) = 0;
+ virtual void on_result(ActorQuery &&query) = 0;
virtual void on_closed() = 0;
};
explicit QueryActor(int threads_n) : threads_n_(threads_n) {
}
- void set_callback(std::unique_ptr<Callback> callback) {
+ void set_callback(td::unique_ptr<Callback> callback) {
callback_ = std::move(callback);
}
- void set_workers(std::vector<ActorId<Worker>> workers) {
+ void set_workers(td::vector<td::ActorId<Worker>> workers) {
workers_ = std::move(workers);
}
- void query(Query &&query) {
- uint32 x = query.result;
- uint32 p = query.next_pow();
- if (Random::fast(0, 3) && (p <= 1000 || workers_.empty())) {
+ void query(ActorQuery &&query) {
+ td::uint32 x = query.result;
+ td::uint32 p = query.next_pow();
+ if (td::Random::fast(0, 3) && (p <= 1000 || workers_.empty())) {
query.result = slow_pow_mod_uint32(x, p);
callback_->on_result(std::move(query));
} else {
- auto future = send_promise(rand_elem(workers_), Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, x, p);
+ auto future = td::Random::fast(0, 3) == 0
+ ? td::send_promise<td::ActorSendType::Immediate>(rand_elem(workers_), &Worker::query, x, p)
+ : td::send_promise<td::ActorSendType::Later>(rand_elem(workers_), &Worker::query, x, p);
if (future.is_ready()) {
query.result = future.move_as_ok();
callback_->on_result(std::move(query));
} else {
- future.set_event(EventCreator::raw(actor_id(), query.query_id));
+ future.set_event(td::EventCreator::raw(actor_id(), query.query_id));
auto query_id = query.query_id;
- pending_.insert(std::make_pair(query_id, std::make_pair(std::move(future), std::move(query))));
+ pending_.emplace(query_id, std::make_pair(std::move(future), std::move(query)));
}
}
- if (threads_n_ > 1 && Random::fast(0, 9) == 0) {
- migrate(Random::fast(2, threads_n_));
+ if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) {
+ migrate(td::Random::fast(2, threads_n_));
}
}
- void raw_event(const Event::Raw &event) override {
- uint32 id = event.u32;
+ void raw_event(const td::Event::Raw &event) final {
+ td::uint32 id = event.u32;
auto it = pending_.find(id);
auto future = std::move(it->second.first);
auto query = std::move(it->second.second);
@@ -159,44 +158,44 @@ class QueryActor final : public Actor {
stop();
}
- void on_start_migrate(int32 sched_id) override {
+ void on_start_migrate(td::int32 sched_id) final {
for (auto &it : pending_) {
start_migrate(it.second.first, sched_id);
}
}
- void on_finish_migrate() override {
+ void on_finish_migrate() final {
for (auto &it : pending_) {
finish_migrate(it.second.first);
}
}
private:
- unique_ptr<Callback> callback_;
- std::map<uint32, std::pair<FutureActor<uint32>, Query>> pending_;
- std::vector<ActorId<Worker>> workers_;
+ td::unique_ptr<Callback> callback_;
+ std::map<td::uint32, std::pair<td::FutureActor<td::uint32>, ActorQuery>> pending_;
+ td::vector<td::ActorId<Worker>> workers_;
int threads_n_;
};
-class MainQueryActor final : public Actor {
- class QueryActorCallback : public QueryActor::Callback {
+class MainQueryActor final : public td::Actor {
+ class QueryActorCallback final : public QueryActor::Callback {
public:
- void on_result(Query &&query) override {
+ void on_result(ActorQuery &&query) final {
if (query.ready()) {
send_closure(parent_id_, &MainQueryActor::on_result, std::move(query));
} else {
send_closure(next_solver_, &QueryActor::query, std::move(query));
}
}
- void on_closed() override {
+ void on_closed() final {
send_closure(parent_id_, &MainQueryActor::on_closed);
}
- QueryActorCallback(ActorId<MainQueryActor> parent_id, ActorId<QueryActor> next_solver)
+ QueryActorCallback(td::ActorId<MainQueryActor> parent_id, td::ActorId<QueryActor> next_solver)
: parent_id_(parent_id), next_solver_(next_solver) {
}
private:
- ActorId<MainQueryActor> parent_id_;
- ActorId<QueryActor> next_solver_;
+ td::ActorId<MainQueryActor> parent_id_;
+ td::ActorId<QueryActor> next_solver_;
};
const int ACTORS_CNT = 10;
@@ -206,39 +205,39 @@ class MainQueryActor final : public Actor {
explicit MainQueryActor(int threads_n) : threads_n_(threads_n) {
}
- void start_up() override {
+ void start_up() final {
actors_.resize(ACTORS_CNT);
for (auto &actor : actors_) {
- auto actor_ptr = make_unique<QueryActor>(threads_n_);
- actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0)
+ auto actor_ptr = td::make_unique<QueryActor>(threads_n_);
+ actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0)
.release();
}
workers_.resize(WORKERS_CNT);
for (auto &worker : workers_) {
- auto actor_ptr = make_unique<Worker>(threads_n_);
- worker =
- register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release();
+ auto actor_ptr = td::make_unique<Worker>(threads_n_);
+ worker = register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0)
+ .release();
}
for (int i = 0; i < ACTORS_CNT; i++) {
ref_cnt_++;
send_closure(actors_[i], &QueryActor::set_callback,
- make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT]));
+ td::make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT]));
send_closure(actors_[i], &QueryActor::set_workers, workers_);
}
yield();
}
- void on_result(Query &&query) {
+ void on_result(ActorQuery &&query) {
CHECK(query.ready());
CHECK(query.result == expected_[query.query_id]);
in_cnt_++;
wakeup();
}
- Query create_query() {
- Query q;
+ ActorQuery create_query() {
+ ActorQuery q;
q.query_id = (query_id_ += 2);
q.result = q.query_id;
q.todo = {1, 1, 1, 1, 1, 1, 1, 1, 10000};
@@ -249,14 +248,14 @@ class MainQueryActor final : public Actor {
void on_closed() {
ref_cnt_--;
if (ref_cnt_ == 0) {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
}
- void wakeup() override {
- int cnt = 100000;
+ void wakeup() final {
+ int cnt = 10000;
while (out_cnt_ < in_cnt_ + 100 && out_cnt_ < cnt) {
- if (Random::fast(0, 1)) {
+ if (td::Random::fast_bool()) {
send_closure(rand_elem(actors_), &QueryActor::query, create_query());
} else {
send_closure_later(rand_elem(actors_), &QueryActor::query, create_query());
@@ -273,9 +272,9 @@ class MainQueryActor final : public Actor {
}
private:
- std::map<uint32, uint32> expected_;
- std::vector<ActorId<QueryActor>> actors_;
- std::vector<ActorId<Worker>> workers_;
+ std::map<td::uint32, td::uint32> expected_;
+ td::vector<td::ActorId<QueryActor>> actors_;
+ td::vector<td::ActorId<Worker>> workers_;
int out_cnt_ = 0;
int in_cnt_ = 0;
int query_id_ = 1;
@@ -283,101 +282,104 @@ class MainQueryActor final : public Actor {
int threads_n_;
};
-class SimpleActor final : public Actor {
+class SimpleActor final : public td::Actor {
public:
- explicit SimpleActor(int32 threads_n) : threads_n_(threads_n) {
+ explicit SimpleActor(td::int32 threads_n) : threads_n_(threads_n) {
}
- void start_up() override {
- auto actor_ptr = make_unique<Worker>(threads_n_);
+ void start_up() final {
+ auto actor_ptr = td::make_unique<Worker>(threads_n_);
worker_ =
- register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release();
+ register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0).release();
yield();
}
- void wakeup() override {
- if (q_ == 100000) {
- Scheduler::instance()->finish();
+ void wakeup() final {
+ if (q_ == 10000) {
+ td::Scheduler::instance()->finish();
stop();
return;
}
q_++;
- p_ = Random::fast(0, 1) ? 1 : 10000;
- auto future = send_promise(worker_, Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, q_, p_);
+ p_ = td::Random::fast_bool() ? 1 : 10000;
+ auto future = td::Random::fast(0, 3) == 0
+ ? td::send_promise<td::ActorSendType::Immediate>(worker_, &Worker::query, q_, p_)
+ : td::send_promise<td::ActorSendType::Later>(worker_, &Worker::query, q_, p_);
if (future.is_ready()) {
auto result = future.move_as_ok();
CHECK(result == fast_pow_mod_uint32(q_, p_));
yield();
} else {
- future.set_event(EventCreator::raw(actor_id(), nullptr));
+ future.set_event(td::EventCreator::raw(actor_id(), nullptr));
future_ = std::move(future);
}
- // if (threads_n_ > 1 && Random::fast(0, 2) == 0) {
- // migrate(Random::fast(1, threads_n));
+ // if (threads_n_ > 1 && td::Random::fast(0, 2) == 0) {
+ // migrate(td::Random::fast(1, threads_n));
//}
}
- void raw_event(const Event::Raw &event) override {
+ void raw_event(const td::Event::Raw &event) final {
auto result = future_.move_as_ok();
CHECK(result == fast_pow_mod_uint32(q_, p_));
yield();
}
- void on_start_migrate(int32 sched_id) override {
+ void on_start_migrate(td::int32 sched_id) final {
start_migrate(future_, sched_id);
}
- void on_finish_migrate() override {
+ void on_finish_migrate() final {
finish_migrate(future_);
}
private:
- int32 threads_n_;
- ActorId<Worker> worker_;
- FutureActor<uint32> future_;
- uint32 q_ = 1;
- uint32 p_;
+ td::int32 threads_n_;
+ td::ActorId<Worker> worker_;
+ td::FutureActor<td::uint32> future_;
+ td::uint32 q_ = 1;
+ td::uint32 p_ = 0;
};
-} // namespace
-class SendToDead : public Actor {
+class SendToDead final : public td::Actor {
public:
- class Parent : public Actor {
+ class Parent final : public td::Actor {
public:
- explicit Parent(ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) {
+ explicit Parent(td::ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) {
}
- void start_up() override {
- set_timeout_in(Random::fast_uint32() % 3 * 0.001);
+ void start_up() final {
+ set_timeout_in(td::Random::fast_uint32() % 3 * 0.001);
if (ttl_ != 0) {
- child_ = create_actor_on_scheduler<Parent>(
- "Child", Random::fast_uint32() % Scheduler::instance()->sched_count(), actor_shared(), ttl_ - 1);
+ child_ = td::create_actor_on_scheduler<Parent>(
+ "Child", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), actor_shared(this),
+ ttl_ - 1);
}
}
- void timeout_expired() override {
+ void timeout_expired() final {
stop();
}
private:
- ActorOwn<Parent> child_;
- ActorShared<> parent_;
+ td::ActorOwn<Parent> child_;
+ td::ActorShared<> parent_;
int ttl_;
};
- void start_up() override {
+ void start_up() final {
for (int i = 0; i < 2000; i++) {
- create_actor_on_scheduler<Parent>("Parent", Random::fast_uint32() % Scheduler::instance()->sched_count(),
- create_reference(), 4)
+ td::create_actor_on_scheduler<Parent>(
+ "Parent", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), create_reference(), 4)
.release();
}
}
- ActorShared<> create_reference() {
+ td::ActorShared<> create_reference() {
ref_cnt_++;
- return actor_shared();
+ return actor_shared(this);
}
- void hangup_shared() override {
+
+ void hangup_shared() final {
ref_cnt_--;
if (ref_cnt_ == 0) {
ttl_--;
if (ttl_ <= 0) {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
stop();
} else {
start_up();
@@ -385,18 +387,17 @@ class SendToDead : public Actor {
}
}
- uint32 ttl_{50};
- uint32 ref_cnt_{0};
+ td::uint32 ttl_{50};
+ td::uint32 ref_cnt_{0};
};
TEST(Actors, send_to_dead) {
//TODO: fix CHECK(storage_count_.load() == 0)
return;
- ConcurrentScheduler sched;
int threads_n = 5;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(threads_n, 0);
- sched.create_actor_unsafe<SendToDead>(0, "manager").release();
+ sched.create_actor_unsafe<SendToDead>(0, "SendToDead").release();
sched.start();
while (sched.run_main(10)) {
// empty
@@ -405,11 +406,8 @@ TEST(Actors, send_to_dead) {
}
TEST(Actors, main_simple) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
-
- ConcurrentScheduler sched;
int threads_n = 3;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(threads_n, 0);
sched.create_actor_unsafe<SimpleActor>(threads_n > 1 ? 1 : 0, "simple", threads_n).release();
sched.start();
@@ -420,13 +418,10 @@ TEST(Actors, main_simple) {
}
TEST(Actors, main) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
-
- ConcurrentScheduler sched;
int threads_n = 9;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(threads_n, 0);
- sched.create_actor_unsafe<MainQueryActor>(threads_n > 1 ? 1 : 0, "manager", threads_n).release();
+ sched.create_actor_unsafe<MainQueryActor>(threads_n > 1 ? 1 : 0, "MainQuery", threads_n).release();
sched.start();
while (sched.run_main(10)) {
// empty
@@ -434,27 +429,76 @@ TEST(Actors, main) {
sched.finish();
}
-class DoAfterStop : public Actor {
+class DoAfterStop final : public td::Actor {
public:
- void loop() override {
- ptr = std::make_unique<int>(10);
+ void loop() final {
+ ptr = td::make_unique<int>(10);
stop();
CHECK(*ptr == 10);
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
private:
- std::unique_ptr<int> ptr;
+ td::unique_ptr<int> ptr;
};
TEST(Actors, do_after_stop) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
+ int threads_n = 0;
+ td::ConcurrentScheduler sched(threads_n, 0);
- ConcurrentScheduler sched;
+ sched.create_actor_unsafe<DoAfterStop>(0, "DoAfterStop").release();
+ sched.start();
+ while (sched.run_main(10)) {
+ // empty
+ }
+ sched.finish();
+}
+
+class XContext final : public td::ActorContext {
+ public:
+ td::int32 get_id() const final {
+ return 123456789;
+ }
+
+ void validate() {
+ CHECK(x == 1234);
+ }
+ ~XContext() final {
+ x = 0;
+ }
+ int x = 1234;
+};
+
+class WithXContext final : public td::Actor {
+ public:
+ void start_up() final {
+ auto old_context = set_context(std::make_shared<XContext>());
+ }
+ void f(td::unique_ptr<td::Guard> guard) {
+ }
+ void close() {
+ stop();
+ }
+};
+
+static void check_context() {
+ auto ptr = static_cast<XContext *>(td::Scheduler::context());
+ CHECK(ptr != nullptr);
+ ptr->validate();
+}
+
+TEST(Actors, context_during_destruction) {
int threads_n = 0;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(threads_n, 0);
- sched.create_actor_unsafe<DoAfterStop>(0, "manager").release();
+ {
+ auto guard = sched.get_main_guard();
+ auto with_context = td::create_actor<WithXContext>("WithXContext").release();
+ send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); }));
+ send_closure_later(with_context, &WithXContext::close);
+ send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); }));
+ send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { td::Scheduler::instance()->finish(); }));
+ }
sched.start();
while (sched.run_main(10)) {
// empty
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp
index c0a6c32b61..78d32d5437 100644
--- a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp
@@ -1,57 +1,66 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
-#include "td/utils/tests.h"
-
#include "td/actor/actor.h"
+#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/MultiPromise.h"
#include "td/actor/PromiseFuture.h"
#include "td/actor/SleepActor.h"
-#include "td/actor/Timeout.h"
+#include "td/utils/common.h"
#include "td/utils/logging.h"
+#include "td/utils/MpscPollableQueue.h"
#include "td/utils/Observer.h"
#include "td/utils/port/FileFd.h"
+#include "td/utils/port/thread.h"
+#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include "td/utils/StringBuilder.h"
+#include "td/utils/tests.h"
+#include "td/utils/Time.h"
+#include <memory>
#include <tuple>
-REGISTER_TESTS(actors_simple)
-
-namespace {
-using namespace td;
-
static const size_t BUF_SIZE = 1024 * 1024;
static char buf[BUF_SIZE];
static char buf2[BUF_SIZE];
-static StringBuilder sb(MutableSlice(buf, BUF_SIZE - 1));
-static StringBuilder sb2(MutableSlice(buf2, BUF_SIZE - 1));
+static td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1));
+static td::StringBuilder sb2(td::MutableSlice(buf2, BUF_SIZE - 1));
+
+static td::vector<std::shared_ptr<td::MpscPollableQueue<td::EventFull>>> create_queues() {
+#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
+ return {};
+#else
+ auto res = std::make_shared<td::MpscPollableQueue<td::EventFull>>();
+ res->init();
+ return {res};
+#endif
+}
TEST(Actors, SendLater) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
sb.clear();
- Scheduler scheduler;
- scheduler.init();
+ td::Scheduler scheduler;
+ scheduler.init(0, create_queues(), nullptr);
auto guard = scheduler.get_guard();
- class Worker : public Actor {
+ class Worker final : public td::Actor {
public:
void f() {
sb << "A";
}
};
- auto id = create_actor<Worker>("Worker");
- scheduler.run_no_guard(0);
- send_closure(id, &Worker::f);
- send_closure_later(id, &Worker::f);
- send_closure(id, &Worker::f);
+ auto id = td::create_actor<Worker>("Worker");
+ scheduler.run_no_guard(td::Timestamp::in(1));
+ td::send_closure(id, &Worker::f);
+ td::send_closure_later(id, &Worker::f);
+ td::send_closure(id, &Worker::f);
ASSERT_STREQ("A", sb.as_cslice().c_str());
- scheduler.run_no_guard(0);
+ scheduler.run_no_guard(td::Timestamp::in(1));
ASSERT_STREQ("AAA", sb.as_cslice().c_str());
}
@@ -63,21 +72,21 @@ class X {
X(const X &) {
sb << "[cnstr_copy]";
}
- X(X &&) {
+ X(X &&) noexcept {
sb << "[cnstr_move]";
}
X &operator=(const X &) {
sb << "[set_copy]";
return *this;
}
- X &operator=(X &&) {
+ X &operator=(X &&) noexcept {
sb << "[set_move]";
return *this;
}
~X() = default;
};
-class XReceiver final : public Actor {
+class XReceiver final : public td::Actor {
public:
void by_const_ref(const X &) {
sb << "[by_const_ref]";
@@ -91,13 +100,12 @@ class XReceiver final : public Actor {
};
TEST(Actors, simple_pass_event_arguments) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
- Scheduler scheduler;
- scheduler.init();
+ td::Scheduler scheduler;
+ scheduler.init(0, create_queues(), nullptr);
auto guard = scheduler.get_guard();
- auto id = create_actor<XReceiver>("XR").release();
- scheduler.run_no_guard(0);
+ auto id = td::create_actor<XReceiver>("XR").release();
+ scheduler.run_no_guard(td::Timestamp::in(1));
X x;
@@ -112,47 +120,47 @@ TEST(Actors, simple_pass_event_arguments) {
// Tmp-->ConstRef
sb.clear();
- send_closure(id, &XReceiver::by_const_ref, X());
+ td::send_closure(id, &XReceiver::by_const_ref, X());
ASSERT_STREQ("[cnstr_default][by_const_ref]", sb.as_cslice().c_str());
// Tmp-->ConstRef (Delayed)
sb.clear();
- send_closure_later(id, &XReceiver::by_const_ref, X());
- scheduler.run_no_guard(0);
+ td::send_closure_later(id, &XReceiver::by_const_ref, X());
+ scheduler.run_no_guard(td::Timestamp::in(1));
// LOG(ERROR) << sb.as_cslice();
ASSERT_STREQ("[cnstr_default][cnstr_move][by_const_ref]", sb.as_cslice().c_str());
// Tmp-->LvalueRef
sb.clear();
- send_closure(id, &XReceiver::by_lvalue_ref, X());
+ td::send_closure(id, &XReceiver::by_lvalue_ref, X());
ASSERT_STREQ("[cnstr_default][by_lvalue_ref]", sb.as_cslice().c_str());
// Tmp-->LvalueRef (Delayed)
sb.clear();
- send_closure_later(id, &XReceiver::by_lvalue_ref, X());
- scheduler.run_no_guard(0);
+ td::send_closure_later(id, &XReceiver::by_lvalue_ref, X());
+ scheduler.run_no_guard(td::Timestamp::in(1));
ASSERT_STREQ("[cnstr_default][cnstr_move][by_lvalue_ref]", sb.as_cslice().c_str());
// Tmp-->Value
sb.clear();
- send_closure(id, &XReceiver::by_value, X());
+ td::send_closure(id, &XReceiver::by_value, X());
ASSERT_STREQ("[cnstr_default][cnstr_move][by_value]", sb.as_cslice().c_str());
// Tmp-->Value (Delayed)
sb.clear();
- send_closure_later(id, &XReceiver::by_value, X());
- scheduler.run_no_guard(0);
+ td::send_closure_later(id, &XReceiver::by_value, X());
+ scheduler.run_no_guard(td::Timestamp::in(1));
ASSERT_STREQ("[cnstr_default][cnstr_move][cnstr_move][by_value]", sb.as_cslice().c_str());
// Var-->ConstRef
sb.clear();
- send_closure(id, &XReceiver::by_const_ref, x);
+ td::send_closure(id, &XReceiver::by_const_ref, x);
ASSERT_STREQ("[by_const_ref]", sb.as_cslice().c_str());
// Var-->ConstRef (Delayed)
sb.clear();
- send_closure_later(id, &XReceiver::by_const_ref, x);
- scheduler.run_no_guard(0);
+ td::send_closure_later(id, &XReceiver::by_const_ref, x);
+ scheduler.run_no_guard(td::Timestamp::in(1));
ASSERT_STREQ("[cnstr_copy][by_const_ref]", sb.as_cslice().c_str());
// Var-->LvalueRef
@@ -161,24 +169,24 @@ TEST(Actors, simple_pass_event_arguments) {
// Var-->Value
sb.clear();
- send_closure(id, &XReceiver::by_value, x);
+ td::send_closure(id, &XReceiver::by_value, x);
ASSERT_STREQ("[cnstr_copy][by_value]", sb.as_cslice().c_str());
// Var-->Value (Delayed)
sb.clear();
- send_closure_later(id, &XReceiver::by_value, x);
- scheduler.run_no_guard(0);
+ td::send_closure_later(id, &XReceiver::by_value, x);
+ scheduler.run_no_guard(td::Timestamp::in(1));
ASSERT_STREQ("[cnstr_copy][cnstr_move][by_value]", sb.as_cslice().c_str());
}
-class PrintChar final : public Actor {
+class PrintChar final : public td::Actor {
public:
PrintChar(char c, int cnt) : char_(c), cnt_(cnt) {
}
- void start_up() override {
+ void start_up() final {
yield();
}
- void wakeup() override {
+ void wakeup() final {
if (cnt_ == 0) {
stop();
} else {
@@ -192,25 +200,23 @@ class PrintChar final : public Actor {
char char_;
int cnt_;
};
-} // namespace
//
// Yield must add actor to the end of queue
//
TEST(Actors, simple_hand_yield) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
- Scheduler scheduler;
- scheduler.init();
+ td::Scheduler scheduler;
+ scheduler.init(0, create_queues(), nullptr);
sb.clear();
int cnt = 1000;
{
auto guard = scheduler.get_guard();
- create_actor<PrintChar>("PrintA", 'A', cnt).release();
- create_actor<PrintChar>("PrintB", 'B', cnt).release();
- create_actor<PrintChar>("PrintC", 'C', cnt).release();
+ td::create_actor<PrintChar>("PrintA", 'A', cnt).release();
+ td::create_actor<PrintChar>("PrintB", 'B', cnt).release();
+ td::create_actor<PrintChar>("PrintC", 'C', cnt).release();
}
- scheduler.run(0);
- std::string expected;
+ scheduler.run(td::Timestamp::in(1));
+ td::string expected;
for (int i = 0; i < cnt; i++) {
expected += "ABC";
}
@@ -219,7 +225,7 @@ TEST(Actors, simple_hand_yield) {
class Ball {
public:
- friend void start_migrate(Ball &ball, int32 sched_id) {
+ friend void start_migrate(Ball &ball, td::int32 sched_id) {
sb << "start";
}
friend void finish_migrate(Ball &ball) {
@@ -227,31 +233,30 @@ class Ball {
}
};
-class Pong final : public Actor {
+class Pong final : public td::Actor {
public:
void pong(Ball ball) {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
};
-class Ping final : public Actor {
+class Ping final : public td::Actor {
public:
- explicit Ping(ActorId<Pong> pong) : pong_(pong) {
+ explicit Ping(td::ActorId<Pong> pong) : pong_(pong) {
}
- void start_up() override {
- send_closure(pong_, &Pong::pong, Ball());
+ void start_up() final {
+ td::send_closure(pong_, &Pong::pong, Ball());
}
private:
- ActorId<Pong> pong_;
+ td::ActorId<Pong> pong_;
};
TEST(Actors, simple_migrate) {
sb.clear();
sb2.clear();
- ConcurrentScheduler scheduler;
- scheduler.init(2);
+ td::ConcurrentScheduler scheduler(2, 0);
auto pong = scheduler.create_actor_unsafe<Pong>(2, "Pong").release();
scheduler.create_actor_unsafe<Ping>(1, "Ping", pong).release();
scheduler.start();
@@ -267,26 +272,25 @@ TEST(Actors, simple_migrate) {
#endif
}
-class OpenClose final : public Actor {
+class OpenClose final : public td::Actor {
public:
explicit OpenClose(int cnt) : cnt_(cnt) {
}
- void start_up() override {
+ void start_up() final {
yield();
}
- void wakeup() override {
- ObserverBase *observer = reinterpret_cast<ObserverBase *>(123);
+ void wakeup() final {
+ auto observer = reinterpret_cast<td::ObserverBase *>(123);
if (cnt_ > 0) {
- auto r_file_fd = FileFd::open("server", FileFd::Read | FileFd::Create);
- CHECK(r_file_fd.is_ok()) << r_file_fd.error();
+ auto r_file_fd = td::FileFd::open("server", td::FileFd::Read | td::FileFd::Create);
+ LOG_CHECK(r_file_fd.is_ok()) << r_file_fd.error();
auto file_fd = r_file_fd.move_as_ok();
- // LOG(ERROR) << file_fd.get_native_fd();
- file_fd.get_fd().set_observer(observer);
+ { auto pollable_fd = file_fd.get_poll_info().extract_pollable_fd(observer); }
file_fd.close();
cnt_--;
yield();
} else {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
}
@@ -295,14 +299,8 @@ class OpenClose final : public Actor {
};
TEST(Actors, open_close) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
- ConcurrentScheduler scheduler;
- scheduler.init(2);
- int cnt = 1000000;
-#if TD_WINDOWS || TD_ANDROID
- // TODO(perf) optimize
- cnt = 100;
-#endif
+ td::ConcurrentScheduler scheduler(2, 0);
+ int cnt = 10000; // TODO(perf) optimize
scheduler.create_actor_unsafe<OpenClose>(1, "A", cnt).release();
scheduler.create_actor_unsafe<OpenClose>(2, "B", cnt).release();
scheduler.start();
@@ -311,62 +309,59 @@ TEST(Actors, open_close) {
scheduler.finish();
}
-namespace {
-class MsgActor : public Actor {
+class MsgActor : public td::Actor {
public:
virtual void msg() = 0;
};
-class Slave : public Actor {
+class Slave final : public td::Actor {
public:
- ActorId<MsgActor> msg;
- explicit Slave(ActorId<MsgActor> msg) : msg(msg) {
+ td::ActorId<MsgActor> msg;
+ explicit Slave(td::ActorId<MsgActor> msg) : msg(msg) {
}
- void hangup() override {
- send_closure(msg, &MsgActor::msg);
+ void hangup() final {
+ td::send_closure(msg, &MsgActor::msg);
}
};
-class MasterActor : public MsgActor {
+class MasterActor final : public MsgActor {
public:
- void loop() override {
+ void loop() final {
alive_ = true;
- slave = create_actor<Slave>("slave", static_cast<ActorId<MsgActor>>(actor_id(this)));
+ slave = td::create_actor<Slave>("Slave", static_cast<td::ActorId<MsgActor>>(actor_id(this)));
stop();
}
- ActorOwn<Slave> slave;
+ td::ActorOwn<Slave> slave;
MasterActor() = default;
MasterActor(const MasterActor &) = delete;
MasterActor &operator=(const MasterActor &) = delete;
MasterActor(MasterActor &&) = delete;
MasterActor &operator=(MasterActor &&) = delete;
- ~MasterActor() override {
+ ~MasterActor() final {
alive_ = 987654321;
}
- void msg() override {
+ void msg() final {
CHECK(alive_ == 123456789);
}
- uint64 alive_ = 123456789;
+ td::uint64 alive_ = 123456789;
};
-} // namespace
TEST(Actors, call_after_destruct) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
- Scheduler scheduler;
- scheduler.init();
+ td::Scheduler scheduler;
+ scheduler.init(0, create_queues(), nullptr);
{
auto guard = scheduler.get_guard();
- create_actor<MasterActor>("Master").release();
+ td::create_actor<MasterActor>("Master").release();
}
- scheduler.run(0);
+ scheduler.run(td::Timestamp::in(1));
}
-class LinkTokenSlave : public Actor {
+class LinkTokenSlave final : public td::Actor {
public:
- explicit LinkTokenSlave(ActorShared<> parent) : parent_(std::move(parent)) {
+ explicit LinkTokenSlave(td::ActorShared<> parent) : parent_(std::move(parent)) {
}
- void add(uint64 link_token) {
+ void add(td::uint64 link_token) {
CHECK(link_token == get_link_token());
}
void close() {
@@ -374,62 +369,61 @@ class LinkTokenSlave : public Actor {
}
private:
- ActorShared<> parent_;
+ td::ActorShared<> parent_;
};
-class LinkTokenMasterActor : public Actor {
+class LinkTokenMasterActor final : public td::Actor {
public:
explicit LinkTokenMasterActor(int cnt) : cnt_(cnt) {
}
- void start_up() override {
- child_ = create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release();
+ void start_up() final {
+ child_ = td::create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release();
yield();
}
- void loop() override {
+ void loop() final {
for (int i = 0; i < 100 && cnt_ > 0; cnt_--, i++) {
+ auto token = static_cast<td::uint64>(cnt_) + 1;
switch (i % 4) {
case 0: {
- send_closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1);
+ td::send_closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token);
break;
}
case 1: {
- send_closure_later(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1);
+ td::send_closure_later(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token);
break;
}
case 2: {
- EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1)
+ td::EventCreator::closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token)
.try_emit();
break;
}
case 3: {
- EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1)
+ td::EventCreator::closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token)
.try_emit_later();
break;
}
}
}
if (cnt_ == 0) {
- send_closure(child_, &LinkTokenSlave::close);
+ td::send_closure(child_, &LinkTokenSlave::close);
} else {
yield();
}
}
- void hangup_shared() override {
+ void hangup_shared() final {
CHECK(get_link_token() == 123);
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
stop();
}
private:
int cnt_;
- ActorId<LinkTokenSlave> child_;
+ td::ActorId<LinkTokenSlave> child_;
};
TEST(Actors, link_token) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
- ConcurrentScheduler scheduler;
- scheduler.init(0);
+ td::ConcurrentScheduler scheduler(0, 0);
auto cnt = 100000;
scheduler.create_actor_unsafe<LinkTokenMasterActor>(0, "A", cnt).release();
scheduler.start();
@@ -440,25 +434,25 @@ TEST(Actors, link_token) {
TEST(Actors, promise) {
int value = -1;
- Promise<int> p1 = PromiseCreator::lambda([&](int x) { value = x; });
- p1.set_error(Status::Error("Test error"));
+ td::Promise<int> p1 = td::PromiseCreator::lambda([&](int x) { value = x; });
+ p1.set_error(td::Status::Error("Test error"));
ASSERT_EQ(0, value);
- Promise<int32> p2 = PromiseCreator::lambda([&](Result<int32> x) { value = 1; });
- p2.set_error(Status::Error("Test error"));
+ td::Promise<td::int32> p2 = td::PromiseCreator::lambda([&](td::Result<td::int32> x) { value = 1; });
+ p2.set_error(td::Status::Error("Test error"));
ASSERT_EQ(1, value);
}
-class LaterSlave : public Actor {
+class LaterSlave final : public td::Actor {
public:
- explicit LaterSlave(ActorShared<> parent) : parent_(std::move(parent)) {
+ explicit LaterSlave(td::ActorShared<> parent) : parent_(std::move(parent)) {
}
private:
- ActorShared<> parent_;
+ td::ActorShared<> parent_;
- void hangup() override {
+ void hangup() final {
sb << "A";
- send_closure(actor_id(this), &LaterSlave::finish);
+ td::send_closure(actor_id(this), &LaterSlave::finish);
}
void finish() {
sb << "B";
@@ -466,31 +460,29 @@ class LaterSlave : public Actor {
}
};
-class LaterMasterActor : public Actor {
+class LaterMasterActor final : public td::Actor {
int cnt_ = 3;
- std::vector<ActorOwn<LaterSlave>> children_;
- void start_up() override {
+ td::vector<td::ActorOwn<LaterSlave>> children_;
+ void start_up() final {
for (int i = 0; i < cnt_; i++) {
- children_.push_back(create_actor<LaterSlave>("B", actor_shared()));
+ children_.push_back(td::create_actor<LaterSlave>("B", actor_shared(this)));
}
yield();
}
- void loop() override {
+ void loop() final {
children_.clear();
}
- void hangup_shared() override {
+ void hangup_shared() final {
if (!--cnt_) {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
stop();
}
}
};
TEST(Actors, later) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
sb.clear();
- ConcurrentScheduler scheduler;
- scheduler.init(0);
+ td::ConcurrentScheduler scheduler(0, 0);
scheduler.create_actor_unsafe<LaterMasterActor>(0, "A").release();
scheduler.start();
while (scheduler.run_main(10)) {
@@ -499,39 +491,36 @@ TEST(Actors, later) {
ASSERT_STREQ(sb.as_cslice().c_str(), "AAABBB");
}
-class MultiPromise2 : public Actor {
+class MultiPromise2 final : public td::Actor {
public:
- void start_up() override {
- auto promise = PromiseCreator::lambda([](Result<Unit> result) {
+ void start_up() final {
+ auto promise = td::PromiseCreator::lambda([](td::Result<td::Unit> result) {
result.ensure();
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
});
- MultiPromiseActorSafe multi_promise;
+ td::MultiPromiseActorSafe multi_promise{"MultiPromiseActor2"};
multi_promise.add_promise(std::move(promise));
for (int i = 0; i < 10; i++) {
- create_actor<SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release();
+ td::create_actor<td::SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release();
}
}
};
-class MultiPromise1 : public Actor {
+class MultiPromise1 final : public td::Actor {
public:
- void start_up() override {
- auto promise = PromiseCreator::lambda([](Result<Unit> result) {
+ void start_up() final {
+ auto promise = td::PromiseCreator::lambda([](td::Result<td::Unit> result) {
CHECK(result.is_error());
- create_actor<MultiPromise2>("B").release();
+ td::create_actor<MultiPromise2>("B").release();
});
- MultiPromiseActorSafe multi_promise;
+ td::MultiPromiseActorSafe multi_promise{"MultiPromiseActor1"};
multi_promise.add_promise(std::move(promise));
}
};
TEST(Actors, MultiPromise) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
- sb.clear();
- ConcurrentScheduler scheduler;
- scheduler.init(0);
+ td::ConcurrentScheduler scheduler(0, 0);
scheduler.create_actor_unsafe<MultiPromise1>(0, "A").release();
scheduler.start();
while (scheduler.run_main(10)) {
@@ -539,23 +528,20 @@ TEST(Actors, MultiPromise) {
scheduler.finish();
}
-class FastPromise : public Actor {
+class FastPromise final : public td::Actor {
public:
- void start_up() override {
- PromiseFuture<int> pf;
+ void start_up() final {
+ td::PromiseFuture<int> pf;
auto promise = pf.move_promise();
auto future = pf.move_future();
promise.set_value(123);
CHECK(future.move_as_ok() == 123);
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
};
TEST(Actors, FastPromise) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
- sb.clear();
- ConcurrentScheduler scheduler;
- scheduler.init(0);
+ td::ConcurrentScheduler scheduler(0, 0);
scheduler.create_actor_unsafe<FastPromise>(0, "A").release();
scheduler.start();
while (scheduler.run_main(10)) {
@@ -563,21 +549,18 @@ TEST(Actors, FastPromise) {
scheduler.finish();
}
-class StopInTeardown : public Actor {
- void loop() override {
+class StopInTeardown final : public td::Actor {
+ void loop() final {
stop();
}
- void tear_down() override {
+ void tear_down() final {
stop();
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
};
TEST(Actors, stop_in_teardown) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
- sb.clear();
- ConcurrentScheduler scheduler;
- scheduler.init(0);
+ td::ConcurrentScheduler scheduler(0, 0);
scheduler.create_actor_unsafe<StopInTeardown>(0, "A").release();
scheduler.start();
while (scheduler.run_main(10)) {
@@ -585,38 +568,113 @@ TEST(Actors, stop_in_teardown) {
scheduler.finish();
}
-class AlwaysWaitForMailbox : public Actor {
+class AlwaysWaitForMailbox final : public td::Actor {
public:
- void start_up() override {
+ void start_up() final {
always_wait_for_mailbox();
- create_actor<SleepActor>("Sleep", 0.1, PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](Unit) {
- send_closure(actor_id, &AlwaysWaitForMailbox::g);
- send_closure(actor_id, &AlwaysWaitForMailbox::g);
- CHECK(!ptr->was_f_);
- }))
+ td::create_actor<td::SleepActor>("Sleep", 0.1,
+ td::PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](td::Unit) {
+ td::send_closure(actor_id, &AlwaysWaitForMailbox::g);
+ td::send_closure(actor_id, &AlwaysWaitForMailbox::g);
+ CHECK(!ptr->was_f_);
+ }))
.release();
}
void f() {
was_f_ = true;
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
void g() {
- send_closure(actor_id(this), &AlwaysWaitForMailbox::f);
+ td::send_closure(actor_id(this), &AlwaysWaitForMailbox::f);
}
private:
- Timeout timeout_;
bool was_f_{false};
};
TEST(Actors, always_wait_for_mailbox) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
- ConcurrentScheduler scheduler;
- scheduler.init(0);
+ td::ConcurrentScheduler scheduler(0, 0);
scheduler.create_actor_unsafe<AlwaysWaitForMailbox>(0, "A").release();
scheduler.start();
while (scheduler.run_main(10)) {
}
scheduler.finish();
}
+
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+TEST(Actors, send_from_other_threads) {
+ td::ConcurrentScheduler scheduler(1, 0);
+ int thread_n = 10;
+ class Listener final : public td::Actor {
+ public:
+ explicit Listener(int cnt) : cnt_(cnt) {
+ }
+ void dec() {
+ if (--cnt_ == 0) {
+ td::Scheduler::instance()->finish();
+ }
+ }
+
+ private:
+ int cnt_;
+ };
+
+ auto A = scheduler.create_actor_unsafe<Listener>(1, "A", thread_n).release();
+ scheduler.start();
+ td::vector<td::thread> threads(thread_n);
+ for (auto &thread : threads) {
+ thread = td::thread([&A, &scheduler] {
+ auto guard = scheduler.get_send_guard();
+ td::send_closure(A, &Listener::dec);
+ });
+ }
+ while (scheduler.run_main(10)) {
+ }
+ for (auto &thread : threads) {
+ thread.join();
+ }
+ scheduler.finish();
+}
+#endif
+
+class DelayedCall final : public td::Actor {
+ public:
+ void on_called(int *step) {
+ CHECK(*step == 0);
+ *step = 1;
+ }
+};
+
+class MultiPromiseSendClosureLaterTest final : public td::Actor {
+ public:
+ void start_up() final {
+ delayed_call_ = td::create_actor<DelayedCall>("DelayedCall").release();
+ mpa_.add_promise(td::PromiseCreator::lambda([this](td::Unit) {
+ CHECK(step_ == 1);
+ step_++;
+ td::Scheduler::instance()->finish();
+ }));
+ auto lock = mpa_.get_promise();
+ td::send_closure_later(delayed_call_, &DelayedCall::on_called, &step_);
+ lock.set_value(td::Unit());
+ }
+
+ void tear_down() final {
+ CHECK(step_ == 2);
+ }
+
+ private:
+ int step_ = 0;
+ td::MultiPromiseActor mpa_{"MultiPromiseActor"};
+ td::ActorId<DelayedCall> delayed_call_;
+};
+
+TEST(Actors, MultiPromiseSendClosureLater) {
+ td::ConcurrentScheduler scheduler(0, 0);
+ scheduler.create_actor_unsafe<MultiPromiseSendClosureLaterTest>(0, "MultiPromiseSendClosureLaterTest").release();
+ scheduler.start();
+ while (scheduler.run_main(1)) {
+ }
+ scheduler.finish();
+}
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp
index b97a258a44..bac42e3fd5 100644
--- a/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp
@@ -1,22 +1,17 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
-#include "td/utils/tests.h"
-
#include "td/actor/actor.h"
+#include "td/actor/ConcurrentScheduler.h"
-#include "td/utils/logging.h"
-
-REGISTER_TESTS(actors_workers);
-
-namespace {
-
-using namespace td;
+#include "td/utils/common.h"
+#include "td/utils/SliceBuilder.h"
+#include "td/utils/tests.h"
-class PowerWorker final : public Actor {
+class PowerWorker final : public td::Actor {
public:
class Callback {
public:
@@ -29,12 +24,12 @@ class PowerWorker final : public Actor {
virtual void on_ready(int query, int res) = 0;
virtual void on_closed() = 0;
};
- void set_callback(unique_ptr<Callback> callback) {
+ void set_callback(td::unique_ptr<Callback> callback) {
callback_ = std::move(callback);
}
- void task(uint32 x, uint32 p) {
- uint32 res = 1;
- for (uint32 i = 0; i < p; i++) {
+ void task(td::uint32 x, td::uint32 p) {
+ td::uint32 res = 1;
+ for (td::uint32 i = 0; i < p; i++) {
res *= x;
}
callback_->on_ready(x, res);
@@ -45,39 +40,41 @@ class PowerWorker final : public Actor {
}
private:
- std::unique_ptr<Callback> callback_;
+ td::unique_ptr<Callback> callback_;
};
-class Manager final : public Actor {
+class Manager final : public td::Actor {
public:
- Manager(int queries_n, int query_size, std::vector<ActorId<PowerWorker>> workers)
- : workers_(std::move(workers)), left_query_(queries_n), query_size_(query_size) {
+ Manager(int queries_n, int query_size, td::vector<td::ActorId<PowerWorker>> workers)
+ : workers_(std::move(workers))
+ , ref_cnt_(static_cast<int>(workers_.size()))
+ , left_query_(queries_n)
+ , query_size_(query_size) {
}
- class Callback : public PowerWorker::Callback {
+ class Callback final : public PowerWorker::Callback {
public:
- Callback(ActorId<Manager> actor_id, int worker_id) : actor_id_(actor_id), worker_id_(worker_id) {
+ Callback(td::ActorId<Manager> actor_id, int worker_id) : actor_id_(actor_id), worker_id_(worker_id) {
}
- void on_ready(int query, int result) override {
- send_closure(actor_id_, &Manager::on_ready, worker_id_, query, result);
+ void on_ready(int query, int result) final {
+ td::send_closure(actor_id_, &Manager::on_ready, worker_id_, query, result);
}
- void on_closed() override {
- send_closure_later(actor_id_, &Manager::on_closed, worker_id_);
+ void on_closed() final {
+ td::send_closure_later(actor_id_, &Manager::on_closed, worker_id_);
}
private:
- ActorId<Manager> actor_id_;
+ td::ActorId<Manager> actor_id_;
int worker_id_;
};
- void start_up() override {
- ref_cnt_ = static_cast<int>(workers_.size());
+ void start_up() final {
int i = 0;
for (auto &worker : workers_) {
ref_cnt_++;
- send_closure_later(worker, &PowerWorker::set_callback, make_unique<Callback>(actor_id(this), i));
+ td::send_closure_later(worker, &PowerWorker::set_callback, td::make_unique<Callback>(actor_id(this), i));
i++;
- send_closure_later(worker, &PowerWorker::task, 3, query_size_);
+ td::send_closure_later(worker, &PowerWorker::task, 3, query_size_);
left_query_--;
}
}
@@ -85,10 +82,10 @@ class Manager final : public Actor {
void on_ready(int worker_id, int query, int res) {
ref_cnt_--;
if (left_query_ == 0) {
- send_closure(workers_[worker_id], &PowerWorker::close);
+ td::send_closure(workers_[worker_id], &PowerWorker::close);
} else {
ref_cnt_++;
- send_closure(workers_[worker_id], &PowerWorker::task, 3, query_size_);
+ td::send_closure(workers_[worker_id], &PowerWorker::task, 3, query_size_);
left_query_--;
}
}
@@ -96,30 +93,27 @@ class Manager final : public Actor {
void on_closed(int worker_id) {
ref_cnt_--;
if (ref_cnt_ == 0) {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
stop();
}
}
private:
- std::vector<ActorId<PowerWorker>> workers_;
- int left_query_;
+ td::vector<td::ActorId<PowerWorker>> workers_;
int ref_cnt_;
+ int left_query_;
int query_size_;
};
-void test_workers(int threads_n, int workers_n, int queries_n, int query_size) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
-
- ConcurrentScheduler sched;
- sched.init(threads_n);
+static void test_workers(int threads_n, int workers_n, int queries_n, int query_size) {
+ td::ConcurrentScheduler sched(threads_n, 0);
- std::vector<ActorId<PowerWorker>> workers;
+ td::vector<td::ActorId<PowerWorker>> workers;
for (int i = 0; i < workers_n; i++) {
int thread_id = threads_n ? i % (threads_n - 1) + 2 : 0;
- workers.push_back(sched.create_actor_unsafe<PowerWorker>(thread_id, "worker" + to_string(i)).release());
+ workers.push_back(sched.create_actor_unsafe<PowerWorker>(thread_id, PSLICE() << "worker" << i).release());
}
- sched.create_actor_unsafe<Manager>(threads_n ? 1 : 0, "manager", queries_n, query_size, std::move(workers)).release();
+ sched.create_actor_unsafe<Manager>(threads_n ? 1 : 0, "Manager", queries_n, query_size, std::move(workers)).release();
sched.start();
while (sched.run_main(10)) {
@@ -129,7 +123,6 @@ void test_workers(int threads_n, int workers_n, int queries_n, int query_size) {
// sched.test_one_thread_run();
}
-} // namespace
TEST(Actors, workers_big_query_one_thread) {
test_workers(0, 10, 1000, 300000);
@@ -144,13 +137,13 @@ TEST(Actors, workers_big_query_nine_threads) {
}
TEST(Actors, workers_small_query_one_thread) {
- test_workers(0, 10, 1000000, 1);
+ test_workers(0, 10, 100000, 1);
}
TEST(Actors, workers_small_query_two_threads) {
- test_workers(2, 10, 1000000, 1);
+ test_workers(2, 10, 100000, 1);
}
TEST(Actors, workers_small_query_nine_threads) {
- test_workers(9, 10, 1000000, 1);
+ test_workers(9, 10, 10000, 1);
}