diff options
Diffstat (limited to 'protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp')
-rw-r--r-- | protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp | 428 |
1 files changed, 243 insertions, 185 deletions
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp index c0a6c32b61..78d32d5437 100644 --- a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp @@ -1,57 +1,66 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#include "td/utils/tests.h" - #include "td/actor/actor.h" +#include "td/actor/ConcurrentScheduler.h" #include "td/actor/MultiPromise.h" #include "td/actor/PromiseFuture.h" #include "td/actor/SleepActor.h" -#include "td/actor/Timeout.h" +#include "td/utils/common.h" #include "td/utils/logging.h" +#include "td/utils/MpscPollableQueue.h" #include "td/utils/Observer.h" #include "td/utils/port/FileFd.h" +#include "td/utils/port/thread.h" +#include "td/utils/Promise.h" #include "td/utils/Slice.h" #include "td/utils/Status.h" #include "td/utils/StringBuilder.h" +#include "td/utils/tests.h" +#include "td/utils/Time.h" +#include <memory> #include <tuple> -REGISTER_TESTS(actors_simple) - -namespace { -using namespace td; - static const size_t BUF_SIZE = 1024 * 1024; static char buf[BUF_SIZE]; static char buf2[BUF_SIZE]; -static StringBuilder sb(MutableSlice(buf, BUF_SIZE - 1)); -static StringBuilder sb2(MutableSlice(buf2, BUF_SIZE - 1)); +static td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1)); +static td::StringBuilder sb2(td::MutableSlice(buf2, BUF_SIZE - 1)); + +static td::vector<std::shared_ptr<td::MpscPollableQueue<td::EventFull>>> create_queues() { +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + return {}; +#else + auto res = std::make_shared<td::MpscPollableQueue<td::EventFull>>(); + res->init(); + return {res}; +#endif +} TEST(Actors, SendLater) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); sb.clear(); - Scheduler scheduler; - scheduler.init(); + td::Scheduler scheduler; + scheduler.init(0, create_queues(), nullptr); auto guard = scheduler.get_guard(); - class Worker : public Actor { + class Worker final : public td::Actor { public: void f() { sb << "A"; } }; - auto id = create_actor<Worker>("Worker"); - scheduler.run_no_guard(0); - send_closure(id, &Worker::f); - send_closure_later(id, &Worker::f); - send_closure(id, &Worker::f); + auto id = td::create_actor<Worker>("Worker"); + scheduler.run_no_guard(td::Timestamp::in(1)); + td::send_closure(id, &Worker::f); + td::send_closure_later(id, &Worker::f); + td::send_closure(id, &Worker::f); ASSERT_STREQ("A", sb.as_cslice().c_str()); - scheduler.run_no_guard(0); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("AAA", sb.as_cslice().c_str()); } @@ -63,21 +72,21 @@ class X { X(const X &) { sb << "[cnstr_copy]"; } - X(X &&) { + X(X &&) noexcept { sb << "[cnstr_move]"; } X &operator=(const X &) { sb << "[set_copy]"; return *this; } - X &operator=(X &&) { + X &operator=(X &&) noexcept { sb << "[set_move]"; return *this; } ~X() = default; }; -class XReceiver final : public Actor { +class XReceiver final : public td::Actor { public: void by_const_ref(const X &) { sb << "[by_const_ref]"; @@ -91,13 +100,12 @@ class XReceiver final : public Actor { }; TEST(Actors, simple_pass_event_arguments) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - Scheduler scheduler; - scheduler.init(); + td::Scheduler scheduler; + scheduler.init(0, create_queues(), nullptr); auto guard = scheduler.get_guard(); - auto id = create_actor<XReceiver>("XR").release(); - scheduler.run_no_guard(0); + auto id = td::create_actor<XReceiver>("XR").release(); + scheduler.run_no_guard(td::Timestamp::in(1)); X x; @@ -112,47 +120,47 @@ TEST(Actors, simple_pass_event_arguments) { // Tmp-->ConstRef sb.clear(); - send_closure(id, &XReceiver::by_const_ref, X()); + td::send_closure(id, &XReceiver::by_const_ref, X()); ASSERT_STREQ("[cnstr_default][by_const_ref]", sb.as_cslice().c_str()); // Tmp-->ConstRef (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_const_ref, X()); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_const_ref, X()); + scheduler.run_no_guard(td::Timestamp::in(1)); // LOG(ERROR) << sb.as_cslice(); ASSERT_STREQ("[cnstr_default][cnstr_move][by_const_ref]", sb.as_cslice().c_str()); // Tmp-->LvalueRef sb.clear(); - send_closure(id, &XReceiver::by_lvalue_ref, X()); + td::send_closure(id, &XReceiver::by_lvalue_ref, X()); ASSERT_STREQ("[cnstr_default][by_lvalue_ref]", sb.as_cslice().c_str()); // Tmp-->LvalueRef (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_lvalue_ref, X()); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_lvalue_ref, X()); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("[cnstr_default][cnstr_move][by_lvalue_ref]", sb.as_cslice().c_str()); // Tmp-->Value sb.clear(); - send_closure(id, &XReceiver::by_value, X()); + td::send_closure(id, &XReceiver::by_value, X()); ASSERT_STREQ("[cnstr_default][cnstr_move][by_value]", sb.as_cslice().c_str()); // Tmp-->Value (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_value, X()); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_value, X()); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("[cnstr_default][cnstr_move][cnstr_move][by_value]", sb.as_cslice().c_str()); // Var-->ConstRef sb.clear(); - send_closure(id, &XReceiver::by_const_ref, x); + td::send_closure(id, &XReceiver::by_const_ref, x); ASSERT_STREQ("[by_const_ref]", sb.as_cslice().c_str()); // Var-->ConstRef (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_const_ref, x); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_const_ref, x); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("[cnstr_copy][by_const_ref]", sb.as_cslice().c_str()); // Var-->LvalueRef @@ -161,24 +169,24 @@ TEST(Actors, simple_pass_event_arguments) { // Var-->Value sb.clear(); - send_closure(id, &XReceiver::by_value, x); + td::send_closure(id, &XReceiver::by_value, x); ASSERT_STREQ("[cnstr_copy][by_value]", sb.as_cslice().c_str()); // Var-->Value (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_value, x); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_value, x); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("[cnstr_copy][cnstr_move][by_value]", sb.as_cslice().c_str()); } -class PrintChar final : public Actor { +class PrintChar final : public td::Actor { public: PrintChar(char c, int cnt) : char_(c), cnt_(cnt) { } - void start_up() override { + void start_up() final { yield(); } - void wakeup() override { + void wakeup() final { if (cnt_ == 0) { stop(); } else { @@ -192,25 +200,23 @@ class PrintChar final : public Actor { char char_; int cnt_; }; -} // namespace // // Yield must add actor to the end of queue // TEST(Actors, simple_hand_yield) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - Scheduler scheduler; - scheduler.init(); + td::Scheduler scheduler; + scheduler.init(0, create_queues(), nullptr); sb.clear(); int cnt = 1000; { auto guard = scheduler.get_guard(); - create_actor<PrintChar>("PrintA", 'A', cnt).release(); - create_actor<PrintChar>("PrintB", 'B', cnt).release(); - create_actor<PrintChar>("PrintC", 'C', cnt).release(); + td::create_actor<PrintChar>("PrintA", 'A', cnt).release(); + td::create_actor<PrintChar>("PrintB", 'B', cnt).release(); + td::create_actor<PrintChar>("PrintC", 'C', cnt).release(); } - scheduler.run(0); - std::string expected; + scheduler.run(td::Timestamp::in(1)); + td::string expected; for (int i = 0; i < cnt; i++) { expected += "ABC"; } @@ -219,7 +225,7 @@ TEST(Actors, simple_hand_yield) { class Ball { public: - friend void start_migrate(Ball &ball, int32 sched_id) { + friend void start_migrate(Ball &ball, td::int32 sched_id) { sb << "start"; } friend void finish_migrate(Ball &ball) { @@ -227,31 +233,30 @@ class Ball { } }; -class Pong final : public Actor { +class Pong final : public td::Actor { public: void pong(Ball ball) { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } }; -class Ping final : public Actor { +class Ping final : public td::Actor { public: - explicit Ping(ActorId<Pong> pong) : pong_(pong) { + explicit Ping(td::ActorId<Pong> pong) : pong_(pong) { } - void start_up() override { - send_closure(pong_, &Pong::pong, Ball()); + void start_up() final { + td::send_closure(pong_, &Pong::pong, Ball()); } private: - ActorId<Pong> pong_; + td::ActorId<Pong> pong_; }; TEST(Actors, simple_migrate) { sb.clear(); sb2.clear(); - ConcurrentScheduler scheduler; - scheduler.init(2); + td::ConcurrentScheduler scheduler(2, 0); auto pong = scheduler.create_actor_unsafe<Pong>(2, "Pong").release(); scheduler.create_actor_unsafe<Ping>(1, "Ping", pong).release(); scheduler.start(); @@ -267,26 +272,25 @@ TEST(Actors, simple_migrate) { #endif } -class OpenClose final : public Actor { +class OpenClose final : public td::Actor { public: explicit OpenClose(int cnt) : cnt_(cnt) { } - void start_up() override { + void start_up() final { yield(); } - void wakeup() override { - ObserverBase *observer = reinterpret_cast<ObserverBase *>(123); + void wakeup() final { + auto observer = reinterpret_cast<td::ObserverBase *>(123); if (cnt_ > 0) { - auto r_file_fd = FileFd::open("server", FileFd::Read | FileFd::Create); - CHECK(r_file_fd.is_ok()) << r_file_fd.error(); + auto r_file_fd = td::FileFd::open("server", td::FileFd::Read | td::FileFd::Create); + LOG_CHECK(r_file_fd.is_ok()) << r_file_fd.error(); auto file_fd = r_file_fd.move_as_ok(); - // LOG(ERROR) << file_fd.get_native_fd(); - file_fd.get_fd().set_observer(observer); + { auto pollable_fd = file_fd.get_poll_info().extract_pollable_fd(observer); } file_fd.close(); cnt_--; yield(); } else { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } } @@ -295,14 +299,8 @@ class OpenClose final : public Actor { }; TEST(Actors, open_close) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - ConcurrentScheduler scheduler; - scheduler.init(2); - int cnt = 1000000; -#if TD_WINDOWS || TD_ANDROID - // TODO(perf) optimize - cnt = 100; -#endif + td::ConcurrentScheduler scheduler(2, 0); + int cnt = 10000; // TODO(perf) optimize scheduler.create_actor_unsafe<OpenClose>(1, "A", cnt).release(); scheduler.create_actor_unsafe<OpenClose>(2, "B", cnt).release(); scheduler.start(); @@ -311,62 +309,59 @@ TEST(Actors, open_close) { scheduler.finish(); } -namespace { -class MsgActor : public Actor { +class MsgActor : public td::Actor { public: virtual void msg() = 0; }; -class Slave : public Actor { +class Slave final : public td::Actor { public: - ActorId<MsgActor> msg; - explicit Slave(ActorId<MsgActor> msg) : msg(msg) { + td::ActorId<MsgActor> msg; + explicit Slave(td::ActorId<MsgActor> msg) : msg(msg) { } - void hangup() override { - send_closure(msg, &MsgActor::msg); + void hangup() final { + td::send_closure(msg, &MsgActor::msg); } }; -class MasterActor : public MsgActor { +class MasterActor final : public MsgActor { public: - void loop() override { + void loop() final { alive_ = true; - slave = create_actor<Slave>("slave", static_cast<ActorId<MsgActor>>(actor_id(this))); + slave = td::create_actor<Slave>("Slave", static_cast<td::ActorId<MsgActor>>(actor_id(this))); stop(); } - ActorOwn<Slave> slave; + td::ActorOwn<Slave> slave; MasterActor() = default; MasterActor(const MasterActor &) = delete; MasterActor &operator=(const MasterActor &) = delete; MasterActor(MasterActor &&) = delete; MasterActor &operator=(MasterActor &&) = delete; - ~MasterActor() override { + ~MasterActor() final { alive_ = 987654321; } - void msg() override { + void msg() final { CHECK(alive_ == 123456789); } - uint64 alive_ = 123456789; + td::uint64 alive_ = 123456789; }; -} // namespace TEST(Actors, call_after_destruct) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - Scheduler scheduler; - scheduler.init(); + td::Scheduler scheduler; + scheduler.init(0, create_queues(), nullptr); { auto guard = scheduler.get_guard(); - create_actor<MasterActor>("Master").release(); + td::create_actor<MasterActor>("Master").release(); } - scheduler.run(0); + scheduler.run(td::Timestamp::in(1)); } -class LinkTokenSlave : public Actor { +class LinkTokenSlave final : public td::Actor { public: - explicit LinkTokenSlave(ActorShared<> parent) : parent_(std::move(parent)) { + explicit LinkTokenSlave(td::ActorShared<> parent) : parent_(std::move(parent)) { } - void add(uint64 link_token) { + void add(td::uint64 link_token) { CHECK(link_token == get_link_token()); } void close() { @@ -374,62 +369,61 @@ class LinkTokenSlave : public Actor { } private: - ActorShared<> parent_; + td::ActorShared<> parent_; }; -class LinkTokenMasterActor : public Actor { +class LinkTokenMasterActor final : public td::Actor { public: explicit LinkTokenMasterActor(int cnt) : cnt_(cnt) { } - void start_up() override { - child_ = create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release(); + void start_up() final { + child_ = td::create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release(); yield(); } - void loop() override { + void loop() final { for (int i = 0; i < 100 && cnt_ > 0; cnt_--, i++) { + auto token = static_cast<td::uint64>(cnt_) + 1; switch (i % 4) { case 0: { - send_closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1); + td::send_closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token); break; } case 1: { - send_closure_later(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1); + td::send_closure_later(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token); break; } case 2: { - EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1) + td::EventCreator::closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token) .try_emit(); break; } case 3: { - EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1) + td::EventCreator::closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token) .try_emit_later(); break; } } } if (cnt_ == 0) { - send_closure(child_, &LinkTokenSlave::close); + td::send_closure(child_, &LinkTokenSlave::close); } else { yield(); } } - void hangup_shared() override { + void hangup_shared() final { CHECK(get_link_token() == 123); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); stop(); } private: int cnt_; - ActorId<LinkTokenSlave> child_; + td::ActorId<LinkTokenSlave> child_; }; TEST(Actors, link_token) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); auto cnt = 100000; scheduler.create_actor_unsafe<LinkTokenMasterActor>(0, "A", cnt).release(); scheduler.start(); @@ -440,25 +434,25 @@ TEST(Actors, link_token) { TEST(Actors, promise) { int value = -1; - Promise<int> p1 = PromiseCreator::lambda([&](int x) { value = x; }); - p1.set_error(Status::Error("Test error")); + td::Promise<int> p1 = td::PromiseCreator::lambda([&](int x) { value = x; }); + p1.set_error(td::Status::Error("Test error")); ASSERT_EQ(0, value); - Promise<int32> p2 = PromiseCreator::lambda([&](Result<int32> x) { value = 1; }); - p2.set_error(Status::Error("Test error")); + td::Promise<td::int32> p2 = td::PromiseCreator::lambda([&](td::Result<td::int32> x) { value = 1; }); + p2.set_error(td::Status::Error("Test error")); ASSERT_EQ(1, value); } -class LaterSlave : public Actor { +class LaterSlave final : public td::Actor { public: - explicit LaterSlave(ActorShared<> parent) : parent_(std::move(parent)) { + explicit LaterSlave(td::ActorShared<> parent) : parent_(std::move(parent)) { } private: - ActorShared<> parent_; + td::ActorShared<> parent_; - void hangup() override { + void hangup() final { sb << "A"; - send_closure(actor_id(this), &LaterSlave::finish); + td::send_closure(actor_id(this), &LaterSlave::finish); } void finish() { sb << "B"; @@ -466,31 +460,29 @@ class LaterSlave : public Actor { } }; -class LaterMasterActor : public Actor { +class LaterMasterActor final : public td::Actor { int cnt_ = 3; - std::vector<ActorOwn<LaterSlave>> children_; - void start_up() override { + td::vector<td::ActorOwn<LaterSlave>> children_; + void start_up() final { for (int i = 0; i < cnt_; i++) { - children_.push_back(create_actor<LaterSlave>("B", actor_shared())); + children_.push_back(td::create_actor<LaterSlave>("B", actor_shared(this))); } yield(); } - void loop() override { + void loop() final { children_.clear(); } - void hangup_shared() override { + void hangup_shared() final { if (!--cnt_) { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); stop(); } } }; TEST(Actors, later) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); sb.clear(); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<LaterMasterActor>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { @@ -499,39 +491,36 @@ TEST(Actors, later) { ASSERT_STREQ(sb.as_cslice().c_str(), "AAABBB"); } -class MultiPromise2 : public Actor { +class MultiPromise2 final : public td::Actor { public: - void start_up() override { - auto promise = PromiseCreator::lambda([](Result<Unit> result) { + void start_up() final { + auto promise = td::PromiseCreator::lambda([](td::Result<td::Unit> result) { result.ensure(); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); }); - MultiPromiseActorSafe multi_promise; + td::MultiPromiseActorSafe multi_promise{"MultiPromiseActor2"}; multi_promise.add_promise(std::move(promise)); for (int i = 0; i < 10; i++) { - create_actor<SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release(); + td::create_actor<td::SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release(); } } }; -class MultiPromise1 : public Actor { +class MultiPromise1 final : public td::Actor { public: - void start_up() override { - auto promise = PromiseCreator::lambda([](Result<Unit> result) { + void start_up() final { + auto promise = td::PromiseCreator::lambda([](td::Result<td::Unit> result) { CHECK(result.is_error()); - create_actor<MultiPromise2>("B").release(); + td::create_actor<MultiPromise2>("B").release(); }); - MultiPromiseActorSafe multi_promise; + td::MultiPromiseActorSafe multi_promise{"MultiPromiseActor1"}; multi_promise.add_promise(std::move(promise)); } }; TEST(Actors, MultiPromise) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - sb.clear(); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<MultiPromise1>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { @@ -539,23 +528,20 @@ TEST(Actors, MultiPromise) { scheduler.finish(); } -class FastPromise : public Actor { +class FastPromise final : public td::Actor { public: - void start_up() override { - PromiseFuture<int> pf; + void start_up() final { + td::PromiseFuture<int> pf; auto promise = pf.move_promise(); auto future = pf.move_future(); promise.set_value(123); CHECK(future.move_as_ok() == 123); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } }; TEST(Actors, FastPromise) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - sb.clear(); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<FastPromise>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { @@ -563,21 +549,18 @@ TEST(Actors, FastPromise) { scheduler.finish(); } -class StopInTeardown : public Actor { - void loop() override { +class StopInTeardown final : public td::Actor { + void loop() final { stop(); } - void tear_down() override { + void tear_down() final { stop(); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } }; TEST(Actors, stop_in_teardown) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - sb.clear(); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<StopInTeardown>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { @@ -585,38 +568,113 @@ TEST(Actors, stop_in_teardown) { scheduler.finish(); } -class AlwaysWaitForMailbox : public Actor { +class AlwaysWaitForMailbox final : public td::Actor { public: - void start_up() override { + void start_up() final { always_wait_for_mailbox(); - create_actor<SleepActor>("Sleep", 0.1, PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](Unit) { - send_closure(actor_id, &AlwaysWaitForMailbox::g); - send_closure(actor_id, &AlwaysWaitForMailbox::g); - CHECK(!ptr->was_f_); - })) + td::create_actor<td::SleepActor>("Sleep", 0.1, + td::PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](td::Unit) { + td::send_closure(actor_id, &AlwaysWaitForMailbox::g); + td::send_closure(actor_id, &AlwaysWaitForMailbox::g); + CHECK(!ptr->was_f_); + })) .release(); } void f() { was_f_ = true; - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } void g() { - send_closure(actor_id(this), &AlwaysWaitForMailbox::f); + td::send_closure(actor_id(this), &AlwaysWaitForMailbox::f); } private: - Timeout timeout_; bool was_f_{false}; }; TEST(Actors, always_wait_for_mailbox) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<AlwaysWaitForMailbox>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { } scheduler.finish(); } + +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED +TEST(Actors, send_from_other_threads) { + td::ConcurrentScheduler scheduler(1, 0); + int thread_n = 10; + class Listener final : public td::Actor { + public: + explicit Listener(int cnt) : cnt_(cnt) { + } + void dec() { + if (--cnt_ == 0) { + td::Scheduler::instance()->finish(); + } + } + + private: + int cnt_; + }; + + auto A = scheduler.create_actor_unsafe<Listener>(1, "A", thread_n).release(); + scheduler.start(); + td::vector<td::thread> threads(thread_n); + for (auto &thread : threads) { + thread = td::thread([&A, &scheduler] { + auto guard = scheduler.get_send_guard(); + td::send_closure(A, &Listener::dec); + }); + } + while (scheduler.run_main(10)) { + } + for (auto &thread : threads) { + thread.join(); + } + scheduler.finish(); +} +#endif + +class DelayedCall final : public td::Actor { + public: + void on_called(int *step) { + CHECK(*step == 0); + *step = 1; + } +}; + +class MultiPromiseSendClosureLaterTest final : public td::Actor { + public: + void start_up() final { + delayed_call_ = td::create_actor<DelayedCall>("DelayedCall").release(); + mpa_.add_promise(td::PromiseCreator::lambda([this](td::Unit) { + CHECK(step_ == 1); + step_++; + td::Scheduler::instance()->finish(); + })); + auto lock = mpa_.get_promise(); + td::send_closure_later(delayed_call_, &DelayedCall::on_called, &step_); + lock.set_value(td::Unit()); + } + + void tear_down() final { + CHECK(step_ == 2); + } + + private: + int step_ = 0; + td::MultiPromiseActor mpa_{"MultiPromiseActor"}; + td::ActorId<DelayedCall> delayed_call_; +}; + +TEST(Actors, MultiPromiseSendClosureLater) { + td::ConcurrentScheduler scheduler(0, 0); + scheduler.create_actor_unsafe<MultiPromiseSendClosureLaterTest>(0, "MultiPromiseSendClosureLaterTest").release(); + scheduler.start(); + while (scheduler.run_main(1)) { + } + scheduler.finish(); +} |