diff options
Diffstat (limited to 'protocols/Telegram/tdlib/td/tdactor')
39 files changed, 1651 insertions, 3940 deletions
diff --git a/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt b/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt index c0c83025e5..a156384ce9 100644 --- a/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt +++ b/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt @@ -1,14 +1,20 @@ -cmake_minimum_required(VERSION 3.0.2 FATAL_ERROR) +if ((CMAKE_MAJOR_VERSION LESS 3) OR (CMAKE_VERSION VERSION_LESS "3.0.2")) + message(FATAL_ERROR "CMake >= 3.0.2 is required") +endif() + +if (NOT DEFINED CMAKE_INSTALL_LIBDIR) + set(CMAKE_INSTALL_LIBDIR "lib") +endif() #SOURCE SETS set(TDACTOR_SOURCE - td/actor/impl/ConcurrentScheduler.cpp + td/actor/ConcurrentScheduler.cpp td/actor/impl/Scheduler.cpp td/actor/MultiPromise.cpp - td/actor/Timeout.cpp - - td/actor/impl2/Scheduler.cpp + td/actor/MultiTimeout.cpp + td/actor/actor.h + td/actor/ConcurrentScheduler.h td/actor/impl/Actor-decl.h td/actor/impl/Actor.h td/actor/impl/ActorId-decl.h @@ -17,28 +23,19 @@ set(TDACTOR_SOURCE td/actor/impl/ActorInfo.h td/actor/impl/EventFull-decl.h td/actor/impl/EventFull.h - td/actor/impl/ConcurrentScheduler.h td/actor/impl/Event.h td/actor/impl/Scheduler-decl.h td/actor/impl/Scheduler.h - td/actor/Condition.h td/actor/MultiPromise.h + td/actor/MultiTimeout.h td/actor/PromiseFuture.h td/actor/SchedulerLocalStorage.h td/actor/SignalSlot.h td/actor/SleepActor.h td/actor/Timeout.h - td/actor/actor.h - - td/actor/impl2/ActorLocker.h - td/actor/impl2/ActorSignals.h - td/actor/impl2/ActorState.h - td/actor/impl2/Scheduler.h - td/actor/impl2/SchedulerId.h ) set(TDACTOR_TEST_SOURCE - ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_impl2.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_main.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_simple.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_workers.cpp @@ -54,12 +51,12 @@ add_library(tdactor STATIC ${TDACTOR_SOURCE}) target_include_directories(tdactor PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>) target_link_libraries(tdactor PUBLIC tdutils) -add_executable(example example/example.cpp) -target_link_libraries(example PRIVATE tdactor) +if (NOT CMAKE_CROSSCOMPILING) + add_executable(example example/example.cpp) + target_link_libraries(example PRIVATE tdactor) +endif() install(TARGETS tdactor EXPORT TdTargets - LIBRARY DESTINATION lib - ARCHIVE DESTINATION lib - RUNTIME DESTINATION bin - INCLUDES DESTINATION include + LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}" + ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}" ) diff --git a/protocols/Telegram/tdlib/td/tdactor/example/example.cpp b/protocols/Telegram/tdlib/td/tdactor/example/example.cpp index 4c2415c5e2..8f182d8350 100644 --- a/protocols/Telegram/tdlib/td/tdactor/example/example.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/example/example.cpp @@ -1,31 +1,33 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "td/actor/actor.h" +#include "td/actor/ConcurrentScheduler.h" #include "td/utils/logging.h" +#include "td/utils/Time.h" -class Worker : public td::Actor { +class Worker final : public td::Actor { public: void ping(int x) { - LOG(ERROR) << "got ping " << x; + LOG(ERROR) << "Got ping " << x; } }; -class MainActor : public td::Actor { +class MainActor final : public td::Actor { public: - void start_up() override { - LOG(ERROR) << "start up"; + void start_up() final { + LOG(ERROR) << "Start up"; set_timeout_in(10); worker_ = td::create_actor_on_scheduler<Worker>("Worker", 1); send_closure(worker_, &Worker::ping, 123); } - void timeout_expired() override { - LOG(ERROR) << "timeout expired"; + void timeout_expired() final { + LOG(ERROR) << "Timeout expired"; td::Scheduler::instance()->finish(); } @@ -33,17 +35,15 @@ class MainActor : public td::Actor { td::ActorOwn<Worker> worker_; }; -int main(void) { - td::ConcurrentScheduler scheduler; - scheduler.init(4 /*threads_count*/); +int main() { + td::ConcurrentScheduler scheduler(4 /*thread_count*/, 0); scheduler.start(); { - auto guard = scheduler.get_current_guard(); + auto guard = scheduler.get_main_guard(); td::create_actor_on_scheduler<MainActor>("Main actor", 0).release(); } while (!scheduler.is_finished()) { - scheduler.run_main(10); + scheduler.run_main(td::Timestamp::in(10)); } scheduler.finish(); - return 0; } diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/ConcurrentScheduler.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/ConcurrentScheduler.cpp new file mode 100644 index 0000000000..9a3cf21338 --- /dev/null +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/ConcurrentScheduler.cpp @@ -0,0 +1,205 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/actor/ConcurrentScheduler.h" + +#include "td/utils/ExitGuard.h" +#include "td/utils/MpscPollableQueue.h" +#include "td/utils/port/thread_local.h" +#include "td/utils/ScopeGuard.h" + +#include <memory> + +namespace td { + +ConcurrentScheduler::ConcurrentScheduler(int32 additional_thread_count, uint64 thread_affinity_mask) { +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + additional_thread_count = 0; +#endif + additional_thread_count++; + std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound(additional_thread_count); +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + for (int32 i = 0; i < additional_thread_count; i++) { + auto queue = std::make_shared<MpscPollableQueue<EventFull>>(); + queue->init(); + outbound[i] = queue; + } + thread_affinity_mask_ = thread_affinity_mask; +#endif + + // +1 for extra scheduler for IOCP and send_closure from unrelated threads + // It will know about other schedulers + // Other schedulers will have no idea about its existence + extra_scheduler_ = 1; +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + extra_scheduler_ = 0; +#endif + + schedulers_.resize(additional_thread_count + extra_scheduler_); + for (int32 i = 0; i < additional_thread_count + extra_scheduler_; i++) { + auto &sched = schedulers_[i]; + sched = make_unique<Scheduler>(); + +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + if (i >= additional_thread_count) { + auto queue = std::make_shared<MpscPollableQueue<EventFull>>(); + queue->init(); + outbound.push_back(std::move(queue)); + } +#endif + + sched->init(i, outbound, static_cast<Scheduler::Callback *>(this)); + } + +#if TD_PORT_WINDOWS + iocp_ = make_unique<detail::Iocp>(); + iocp_->init(); +#endif + + state_ = State::Start; +} + +void ConcurrentScheduler::test_one_thread_run() { + do { + for (auto &sched : schedulers_) { + sched->run(Timestamp::now_cached()); + } + } while (!is_finished_.load(std::memory_order_relaxed)); +} + +#if !TD_THREAD_UNSUPPORTED +thread::id ConcurrentScheduler::get_scheduler_thread_id(int32 sched_id) { + auto thread_pos = static_cast<size_t>(sched_id - 1); + CHECK(thread_pos < threads_.size()); + return threads_[thread_pos].get_id(); +} +#endif + +void ConcurrentScheduler::start() { + CHECK(state_ == State::Start); + is_finished_.store(false, std::memory_order_relaxed); +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + for (size_t i = 1; i + extra_scheduler_ < schedulers_.size(); i++) { + auto &sched = schedulers_[i]; + threads_.push_back(td::thread([&, thread_affinity_mask = thread_affinity_mask_] { +#if TD_PORT_WINDOWS + detail::Iocp::Guard iocp_guard(iocp_.get()); +#endif +#if TD_HAVE_THREAD_AFFINITY + if (thread_affinity_mask != 0) { + thread::set_affinity_mask(this_thread::get_id(), thread_affinity_mask).ignore(); + } +#else + (void)thread_affinity_mask; +#endif + while (!is_finished()) { + sched->run(Timestamp::in(10)); + } + })); + } +#if TD_PORT_WINDOWS + iocp_thread_ = td::thread([this] { + auto guard = this->get_send_guard(); + this->iocp_->loop(); + }); +#endif +#endif + + state_ = State::Run; +} + +static TD_THREAD_LOCAL double emscripten_timeout; + +bool ConcurrentScheduler::run_main(Timestamp timeout) { + CHECK(state_ == State::Run); + // run main scheduler in same thread + auto &main_sched = schedulers_[0]; + if (!is_finished()) { +#if TD_PORT_WINDOWS + detail::Iocp::Guard iocp_guard(iocp_.get()); +#endif + main_sched->run(timeout); + } + + // hack for emscripten + emscripten_timeout = get_main_timeout().at(); + + return !is_finished(); +} + +Timestamp ConcurrentScheduler::get_main_timeout() { + CHECK(state_ == State::Run); + return schedulers_[0]->get_timeout(); +} + +double ConcurrentScheduler::emscripten_get_main_timeout() { + return Timestamp::at(emscripten_timeout).in(); +} +void ConcurrentScheduler::emscripten_clear_main_timeout() { + emscripten_timeout = 0; +} + +void ConcurrentScheduler::finish() { + CHECK(state_ == State::Run); + if (!is_finished()) { + on_finish(); + } +#if TD_PORT_WINDOWS + SCOPE_EXIT { + iocp_->clear(); + }; + detail::Iocp::Guard iocp_guard(iocp_.get()); +#endif + + if (ExitGuard::is_exited()) { +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + // prevent closing of schedulers from already killed by OS threads + for (auto &thread : threads_) { + thread.detach(); + } +#endif + +#if TD_PORT_WINDOWS + iocp_->interrupt_loop(); + iocp_thread_.detach(); +#endif + return; + } + +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + for (auto &thread : threads_) { + thread.join(); + } + threads_.clear(); +#endif + +#if TD_PORT_WINDOWS + iocp_->interrupt_loop(); + iocp_thread_.join(); +#endif + + schedulers_.clear(); + for (auto &f : at_finish_) { + f(); + } + at_finish_.clear(); + + state_ = State::Start; +} + +void ConcurrentScheduler::on_finish() { + is_finished_.store(true, std::memory_order_relaxed); + for (auto &it : schedulers_) { + it->wakeup(); + } +} + +void ConcurrentScheduler::register_at_finish(std::function<void()> f) { + std::lock_guard<std::mutex> lock(at_finish_mutex_); + at_finish_.push_back(std::move(f)); +} + +} // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/ConcurrentScheduler.h index 1e9793eab4..3f574de5ef 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/ConcurrentScheduler.h @@ -1,17 +1,21 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #pragma once -#include "td/actor/impl/Scheduler-decl.h" +#include "td/actor/actor.h" #include "td/utils/common.h" -#include "td/utils/logging.h" #include "td/utils/port/thread.h" #include "td/utils/Slice.h" +#include "td/utils/Time.h" + +#if TD_PORT_WINDOWS +#include "td/utils/port/detail/Iocp.h" +#endif #include <atomic> #include <functional> @@ -20,34 +24,55 @@ namespace td { -class ConcurrentScheduler : private Scheduler::Callback { +class ConcurrentScheduler final : private Scheduler::Callback { public: - void init(int32 threads_n); + explicit ConcurrentScheduler(int32 additional_thread_count, uint64 thread_affinity_mask = 0); void finish_async() { schedulers_[0]->finish(); } + void wakeup() { schedulers_[0]->wakeup(); } - SchedulerGuard get_current_guard() { + + SchedulerGuard get_main_guard() { return schedulers_[0]->get_guard(); } + SchedulerGuard get_send_guard() { + return schedulers_.back()->get_const_guard(); + } + void test_one_thread_run(); - bool is_finished() { + bool is_finished() const { return is_finished_.load(std::memory_order_relaxed); } +#if TD_THREAD_UNSUPPORTED + int get_scheduler_thread_id(int32 sched_id) { + return 1; + } +#else + thread::id get_scheduler_thread_id(int32 sched_id); +#endif + void start(); - bool run_main(double timeout); + bool run_main(double timeout) { + return run_main(Timestamp::in(timeout)); + } + bool run_main(Timestamp timeout); + + Timestamp get_main_timeout(); + static double emscripten_get_main_timeout(); + static void emscripten_clear_main_timeout(); void finish(); template <class ActorT, class... Args> - ActorOwn<ActorT> create_actor_unsafe(int32 sched_id, Slice name, Args &&... args) { + ActorOwn<ActorT> create_actor_unsafe(int32 sched_id, Slice name, Args &&...args) { #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED sched_id = 0; #endif @@ -68,26 +93,24 @@ class ConcurrentScheduler : private Scheduler::Callback { private: enum class State { Start, Run }; - State state_; - std::vector<unique_ptr<Scheduler>> schedulers_; - std::atomic<bool> is_finished_; + State state_ = State::Start; std::mutex at_finish_mutex_; - std::vector<std::function<void()>> at_finish_; + vector<std::function<void()>> at_finish_; // can be used during destruction by Scheduler destructors + vector<unique_ptr<Scheduler>> schedulers_; + std::atomic<bool> is_finished_{false}; #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED - std::vector<thread> threads_; + vector<td::thread> threads_; + uint64 thread_affinity_mask_ = 0; #endif +#if TD_PORT_WINDOWS + unique_ptr<detail::Iocp> iocp_; + td::thread iocp_thread_; +#endif + int32 extra_scheduler_ = 0; - void on_finish() override { - is_finished_.store(true, std::memory_order_relaxed); - for (auto &it : schedulers_) { - it->wakeup(); - } - } + void on_finish() final; - void register_at_finish(std::function<void()> f) override { - std::lock_guard<std::mutex> lock(at_finish_mutex_); - at_finish_.push_back(std::move(f)); - } + void register_at_finish(std::function<void()> f) final; }; } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/Condition.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/Condition.h deleted file mode 100644 index c3799df487..0000000000 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/Condition.h +++ /dev/null @@ -1,47 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#pragma once - -#include "td/actor/actor.h" - -#include "td/utils/logging.h" - -namespace td { -class Condition { - class Helper : public Actor { - public: - void wait(Promise<> promise) { - pending_promises_.push_back(std::move(promise)); - } - - private: - std::vector<Promise<>> pending_promises_; - void tear_down() override { - for (auto &promise : pending_promises_) { - promise.set_value(Unit()); - } - } - }; - - public: - Condition() { - own_actor_ = create_actor<Helper>("helper"); - actor_ = own_actor_.get(); - } - void wait(Promise<> promise) { - send_closure(actor_, &Helper::wait, std::move(promise)); - } - void set_true() { - CHECK(!own_actor_.empty()); - own_actor_.reset(); - } - - private: - ActorId<Helper> actor_; - ActorOwn<Helper> own_actor_; -}; -} // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.cpp index 0d98f5cfb4..f78e0f0161 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.cpp @@ -1,19 +1,23 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "td/actor/MultiPromise.h" +#include "td/utils/logging.h" + namespace td { + void MultiPromiseActor::add_promise(Promise<Unit> &&promise) { promises_.emplace_back(std::move(promise)); + LOG(DEBUG) << "Add promise #" << promises_.size() << " to " << name_; } Promise<Unit> MultiPromiseActor::get_promise() { if (empty()) { - register_actor("MultiPromise", this).release(); + register_actor(name_, this).release(); } CHECK(!promises_.empty()); @@ -23,11 +27,13 @@ Promise<Unit> MultiPromiseActor::get_promise() { future.set_event(EventCreator::raw(actor_id(), nullptr)); futures_.emplace_back(std::move(future)); - return PromiseCreator::from_promise_actor(std::move(promise)); + LOG(DEBUG) << "Get promise #" << futures_.size() << " for " << name_; + return create_promise_from_promise_actor(std::move(promise)); } void MultiPromiseActor::raw_event(const Event::Raw &event) { received_results_++; + LOG(DEBUG) << "Receive result #" << received_results_ << " out of " << futures_.size() << " for " << name_; if (received_results_ == futures_.size()) { if (!ignore_errors_) { for (auto &future : futures_) { @@ -46,13 +52,21 @@ void MultiPromiseActor::set_ignore_errors(bool ignore_errors) { } void MultiPromiseActor::set_result(Result<Unit> &&result) { - // MultiPromiseActor should be cleared before he begins to send out result + result_ = std::move(result); + stop(); +} + +void MultiPromiseActor::tear_down() { + LOG(DEBUG) << "Set result for " << promises_.size() << " promises in " << name_; + + // MultiPromiseActor should be cleared before it begins to send out result auto promises_copy = std::move(promises_); promises_.clear(); auto futures_copy = std::move(futures_); futures_.clear(); received_results_ = 0; - stop(); + auto result = std::move(result_); + result_ = Unit(); if (!promises_copy.empty()) { for (size_t i = 0; i + 1 < promises_copy.size(); i++) { @@ -87,4 +101,5 @@ MultiPromiseActorSafe::~MultiPromiseActorSafe() { register_existing_actor(std::move(multi_promise_)).release(); } } + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.h index aa28947464..73b24d5d1c 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -10,7 +10,7 @@ #include "td/actor/PromiseFuture.h" #include "td/utils/common.h" -#include "td/utils/logging.h" +#include "td/utils/Promise.h" #include "td/utils/Status.h" namespace td { @@ -20,7 +20,6 @@ class MultiPromiseInterface { virtual void add_promise(Promise<> &&promise) = 0; virtual Promise<> get_promise() = 0; - // deprecated? virtual size_t promise_count() const = 0; virtual void set_ignore_errors(bool ignore_errors) = 0; @@ -32,85 +31,90 @@ class MultiPromiseInterface { virtual ~MultiPromiseInterface() = default; }; -class MultiPromise : public MultiPromiseInterface { +class MultiPromise final : public MultiPromiseInterface { public: - void add_promise(Promise<> &&promise) override { + void add_promise(Promise<> &&promise) final { impl_->add_promise(std::move(promise)); } - Promise<> get_promise() override { + Promise<> get_promise() final { return impl_->get_promise(); } - // deprecated? - size_t promise_count() const override { + size_t promise_count() const final { return impl_->promise_count(); } - void set_ignore_errors(bool ignore_errors) override { + void set_ignore_errors(bool ignore_errors) final { impl_->set_ignore_errors(ignore_errors); } MultiPromise() = default; - explicit MultiPromise(std::unique_ptr<MultiPromiseInterface> impl) : impl_(std::move(impl)) { + explicit MultiPromise(unique_ptr<MultiPromiseInterface> impl) : impl_(std::move(impl)) { } private: - std::unique_ptr<MultiPromiseInterface> impl_; + unique_ptr<MultiPromiseInterface> impl_; }; class MultiPromiseActor final : public Actor , public MultiPromiseInterface { public: - MultiPromiseActor() = default; + explicit MultiPromiseActor(string name) : name_(std::move(name)) { + } - void add_promise(Promise<Unit> &&promise) override; + void add_promise(Promise<Unit> &&promise) final; - Promise<Unit> get_promise() override; + Promise<Unit> get_promise() final; - void set_ignore_errors(bool ignore_errors) override; + void set_ignore_errors(bool ignore_errors) final; - size_t promise_count() const override; + size_t promise_count() const final; private: void set_result(Result<Unit> &&result); + string name_; vector<Promise<Unit>> promises_; // promises waiting for result vector<FutureActor<Unit>> futures_; // futures waiting for result of the queries size_t received_results_ = 0; bool ignore_errors_ = false; + Result<Unit> result_; + + void raw_event(const Event::Raw &event) final; - void raw_event(const Event::Raw &event) override; + void tear_down() final; - void on_start_migrate(int32) override { + void on_start_migrate(int32) final { UNREACHABLE(); } - void on_finish_migrate() override { + void on_finish_migrate() final { UNREACHABLE(); } }; -class MultiPromiseActorSafe : public MultiPromiseInterface { +template <> +class ActorTraits<MultiPromiseActor> { + public: + static constexpr bool need_context = false; + static constexpr bool need_start_up = true; +}; + +class MultiPromiseActorSafe final : public MultiPromiseInterface { public: - void add_promise(Promise<Unit> &&promise) override; - Promise<Unit> get_promise() override; - void set_ignore_errors(bool ignore_errors) override; - size_t promise_count() const override; - MultiPromiseActorSafe() = default; + void add_promise(Promise<Unit> &&promise) final; + Promise<Unit> get_promise() final; + void set_ignore_errors(bool ignore_errors) final; + size_t promise_count() const final; + explicit MultiPromiseActorSafe(string name) : multi_promise_(td::make_unique<MultiPromiseActor>(std::move(name))) { + } MultiPromiseActorSafe(const MultiPromiseActorSafe &other) = delete; MultiPromiseActorSafe &operator=(const MultiPromiseActorSafe &other) = delete; MultiPromiseActorSafe(MultiPromiseActorSafe &&other) = delete; MultiPromiseActorSafe &operator=(MultiPromiseActorSafe &&other) = delete; - ~MultiPromiseActorSafe() override; + ~MultiPromiseActorSafe() final; private: - std::unique_ptr<MultiPromiseActor> multi_promise_ = std::make_unique<MultiPromiseActor>(); -}; - -class MultiPromiseCreator { - public: - static MultiPromise create() { - return MultiPromise(std::make_unique<MultiPromiseActor>()); - } + unique_ptr<MultiPromiseActor> multi_promise_; }; } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiTimeout.cpp index fa2e5ffff3..8c3dcb942f 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiTimeout.cpp @@ -1,21 +1,21 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#include "td/actor/Timeout.h" +#include "td/actor/MultiTimeout.h" -#include "td/utils/Time.h" +#include "td/utils/logging.h" namespace td { bool MultiTimeout::has_timeout(int64 key) const { - return items_.find(Item(key)) != items_.end(); + return items_.count(Item(key)) > 0; } void MultiTimeout::set_timeout_at(int64 key, double timeout) { - LOG(DEBUG) << "Set timeout for " << key << " in " << timeout - Time::now(); + LOG(DEBUG) << "Set " << get_name() << " for " << key << " in " << timeout - Time::now(); auto item = items_.emplace(key); auto heap_node = static_cast<HeapNode *>(const_cast<Item *>(&*item.first)); if (heap_node->in_heap()) { @@ -35,7 +35,7 @@ void MultiTimeout::set_timeout_at(int64 key, double timeout) { } void MultiTimeout::add_timeout_at(int64 key, double timeout) { - LOG(DEBUG) << "Add timeout for " << key << " in " << timeout - Time::now(); + LOG(DEBUG) << "Add " << get_name() << " for " << key << " in " << timeout - Time::now(); auto item = items_.emplace(key); auto heap_node = static_cast<HeapNode *>(const_cast<Item *>(&*item.first)); if (heap_node->in_heap()) { @@ -50,7 +50,7 @@ void MultiTimeout::add_timeout_at(int64 key, double timeout) { } void MultiTimeout::cancel_timeout(int64 key) { - LOG(DEBUG) << "Cancel timeout for " << key; + LOG(DEBUG) << "Cancel " << get_name() << " for " << key; auto item = items_.find(Item(key)); if (item != items_.end()) { auto heap_node = static_cast<HeapNode *>(const_cast<Item *>(&*item)); @@ -67,30 +67,44 @@ void MultiTimeout::cancel_timeout(int64 key) { void MultiTimeout::update_timeout() { if (items_.empty()) { - LOG(DEBUG) << "Cancel timeout"; + LOG(DEBUG) << "Cancel timeout of " << get_name(); CHECK(timeout_queue_.empty()); CHECK(Actor::has_timeout()); Actor::cancel_timeout(); } else { - LOG(DEBUG) << "Set timeout in " << timeout_queue_.top_key() - Time::now_cached(); + LOG(DEBUG) << "Set timeout of " << get_name() << " in " << timeout_queue_.top_key() - Time::now_cached(); Actor::set_timeout_at(timeout_queue_.top_key()); } } -void MultiTimeout::timeout_expired() { - double now = Time::now_cached(); +vector<int64> MultiTimeout::get_expired_keys(double now) { + vector<int64> expired_keys; while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) { int64 key = static_cast<Item *>(timeout_queue_.pop())->key; items_.erase(Item(key)); - expired_.push_back(key); + expired_keys.push_back(key); } + return expired_keys; +} + +void MultiTimeout::timeout_expired() { + vector<int64> expired_keys = get_expired_keys(Time::now_cached()); if (!items_.empty()) { update_timeout(); } - for (auto key : expired_) { + for (auto key : expired_keys) { + callback_(data_, key); + } +} + +void MultiTimeout::run_all() { + vector<int64> expired_keys = get_expired_keys(Time::now_cached() + 1e10); + if (!expired_keys.empty()) { + update_timeout(); + } + for (auto key : expired_keys) { callback_(data_, key); } - expired_.clear(); } } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiTimeout.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiTimeout.h new file mode 100644 index 0000000000..64803d346d --- /dev/null +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiTimeout.h @@ -0,0 +1,81 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/actor.h" + +#include "td/utils/common.h" +#include "td/utils/Heap.h" +#include "td/utils/Slice.h" +#include "td/utils/Time.h" + +#include <set> + +namespace td { + +// TODO optimize +class MultiTimeout final : public Actor { + struct Item final : public HeapNode { + int64 key; + + explicit Item(int64 key) : key(key) { + } + + bool operator<(const Item &other) const { + return key < other.key; + } + }; + + public: + using Data = void *; + using Callback = void (*)(Data, int64); + explicit MultiTimeout(Slice name) { + register_actor(name, this).release(); + } + + void set_callback(Callback callback) { + callback_ = callback; + } + void set_callback_data(Data data) { + data_ = data; + } + + bool has_timeout(int64 key) const; + + void set_timeout_in(int64 key, double timeout) { + set_timeout_at(key, Time::now() + timeout); + } + + void add_timeout_in(int64 key, double timeout) { + add_timeout_at(key, Time::now() + timeout); + } + + void set_timeout_at(int64 key, double timeout); + + void add_timeout_at(int64 key, double timeout); // memcache semantics, doesn't replace old timeout + + void cancel_timeout(int64 key); + + void run_all(); + + private: + friend class Scheduler; + + Callback callback_; + Data data_; + + KHeap<double> timeout_queue_; + std::set<Item> items_; + + void update_timeout(); + + void timeout_expired() final; + + vector<int64> get_expired_keys(double now); +}; + +} // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h index 63156c3838..5ddc6e9848 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -10,174 +10,23 @@ #include "td/utils/Closure.h" #include "td/utils/common.h" -#include "td/utils/invoke.h" // for tuple_for_each -#include "td/utils/logging.h" +#include "td/utils/invoke.h" +#include "td/utils/Promise.h" #include "td/utils/ScopeGuard.h" #include "td/utils/Status.h" #include <tuple> -#include <type_traits> #include <utility> namespace td { -template <class T = Unit> -class PromiseInterface { - public: - PromiseInterface() = default; - PromiseInterface(const PromiseInterface &) = delete; - PromiseInterface &operator=(const PromiseInterface &) = delete; - PromiseInterface(PromiseInterface &&) = default; - PromiseInterface &operator=(PromiseInterface &&) = default; - virtual ~PromiseInterface() = default; - virtual void set_value(T &&value) { - set_result(std::move(value)); - } - virtual void set_error(Status &&error) { - set_result(std::move(error)); - } - virtual void set_result(Result<T> &&result) { - if (result.is_ok()) { - set_value(result.move_as_ok()); - } else { - set_error(result.move_as_error()); - } - } - virtual void start_migrate(int32 sched_id) { - } - virtual void finish_migrate() { - } -}; - -template <class T = Unit> -class FutureInterface { - public: - FutureInterface() = default; - FutureInterface(const FutureInterface &) = delete; - FutureInterface &operator=(const FutureInterface &) = delete; - FutureInterface(FutureInterface &&) = default; - FutureInterface &operator=(FutureInterface &&) = default; - virtual ~FutureInterface() = default; - virtual bool is_ready() = 0; - virtual bool is_ok() = 0; - virtual bool is_error() = 0; - virtual const T &ok() = 0; - virtual T move_as_ok() = 0; - virtual const Status &error() = 0; - virtual Status move_as_error() TD_WARN_UNUSED_RESULT = 0; - virtual const Result<T> &result() = 0; - virtual Result<T> move_as_result() TD_WARN_UNUSED_RESULT = 0; -}; - -template <class T> -class SafePromise; - -template <class T = Unit> -class Promise { - public: - void set_value(T &&value) { - if (!promise_) { - return; - } - promise_->set_value(std::move(value)); - promise_.reset(); - } - void set_error(Status &&error) { - if (!promise_) { - return; - } - promise_->set_error(std::move(error)); - promise_.reset(); - } - void set_result(Result<T> &&result) { - if (!promise_) { - return; - } - promise_->set_result(std::move(result)); - promise_.reset(); - } - void reset() { - promise_.reset(); - } - void start_migrate(int32 sched_id) { - if (!promise_) { - return; - } - promise_->start_migrate(sched_id); - } - void finish_migrate() { - if (!promise_) { - return; - } - promise_->finish_migrate(); - } - std::unique_ptr<PromiseInterface<T>> release() { - return std::move(promise_); - } - - Promise() = default; - explicit Promise(std::unique_ptr<PromiseInterface<T>> promise) : promise_(std::move(promise)) { - } - Promise(SafePromise<T> &&other); - Promise &operator=(SafePromise<T> &&other); - - explicit operator bool() { - return static_cast<bool>(promise_); - } - - private: - std::unique_ptr<PromiseInterface<T>> promise_; -}; - -template <class T> -void start_migrate(Promise<T> &promise, int32 sched_id) { - // promise.start_migrate(sched_id); -} -template <class T> -void finish_migrate(Promise<T> &promise) { - // promise.finish_migrate(); -} - -template <class T = Unit> -class SafePromise { - public: - SafePromise(Promise<T> promise, Result<T> result) : promise_(std::move(promise)), result_(std::move(result)) { - } - SafePromise(const SafePromise &other) = delete; - SafePromise &operator=(const SafePromise &other) = delete; - SafePromise(SafePromise &&other) = default; - SafePromise &operator=(SafePromise &&other) = default; - ~SafePromise() { - if (promise_) { - promise_.set_result(std::move(result_)); - } - } - Promise<T> release() { - return std::move(promise_); - } - - private: - Promise<T> promise_; - Result<T> result_; -}; - -template <class T> -Promise<T>::Promise(SafePromise<T> &&other) : Promise(other.release()) { -} -template <class T> -Promise<T> &Promise<T>::operator=(SafePromise<T> &&other) { - *this = other.release(); - return *this; -} - namespace detail { - -class EventPromise : public PromiseInterface<Unit> { +class EventPromise final : public PromiseInterface<Unit> { public: - void set_value(Unit &&) override { + void set_value(Unit &&) final { ok_.try_emit(); fail_.clear(); } - void set_error(Status &&) override { + void set_error(Status &&) final { do_set_error(); } @@ -185,7 +34,7 @@ class EventPromise : public PromiseInterface<Unit> { EventPromise &operator=(const EventPromise &other) = delete; EventPromise(EventPromise &&other) = delete; EventPromise &operator=(EventPromise &&other) = delete; - ~EventPromise() override { + ~EventPromise() final { do_set_error(); } @@ -209,109 +58,23 @@ class EventPromise : public PromiseInterface<Unit> { } }; -template <typename T> -struct GetArg : public GetArg<decltype(&T::operator())> {}; - -template <class C, class R, class Arg> -class GetArg<R (C::*)(Arg)> { - public: - using type = Arg; -}; -template <class C, class R, class Arg> -class GetArg<R (C::*)(Arg) const> { +class SendClosure { public: - using type = Arg; -}; - -template <class T> -using get_arg_t = std::decay_t<typename GetArg<T>::type>; - -template <class T> -struct DropResult { - using type = T; -}; - -template <class T> -struct DropResult<Result<T>> { - using type = T; -}; - -template <class T> -using drop_result_t = typename DropResult<T>::type; - -template <class ValueT, class FunctionOkT, class FunctionFailT> -class LambdaPromise : public PromiseInterface<ValueT> { - enum OnFail { None, Ok, Fail }; - - public: - void set_value(ValueT &&value) override { - ok_(std::move(value)); - on_fail_ = None; - } - void set_error(Status &&error) override { - do_error(std::move(error)); - } - LambdaPromise(const LambdaPromise &other) = delete; - LambdaPromise &operator=(const LambdaPromise &other) = delete; - LambdaPromise(LambdaPromise &&other) = delete; - LambdaPromise &operator=(LambdaPromise &&other) = delete; - ~LambdaPromise() override { - do_error(Status::Error("Lost promise")); - } - - template <class FromOkT, class FromFailT> - LambdaPromise(FromOkT &&ok, FromFailT &&fail, bool use_ok_as_fail) - : ok_(std::forward<FromOkT>(ok)), fail_(std::forward<FromFailT>(fail)), on_fail_(use_ok_as_fail ? Ok : Fail) { - } - - private: - FunctionOkT ok_; - FunctionFailT fail_; - OnFail on_fail_ = None; - - template <class FuncT, class ArgT = detail::get_arg_t<FuncT>> - std::enable_if_t<std::is_assignable<ArgT, Status>::value> do_error_impl(FuncT &func, Status &&status) { - func(std::move(status)); - } - - template <class FuncT, class ArgT = detail::get_arg_t<FuncT>> - std::enable_if_t<!std::is_assignable<ArgT, Status>::value> do_error_impl(FuncT &func, Status &&status) { - func(Auto()); - } - - void do_error(Status &&error) { - switch (on_fail_) { - case None: - break; - case Ok: - do_error_impl(ok_, std::move(error)); - break; - case Fail: - fail_(std::move(error)); - break; - } - on_fail_ = None; + template <class... ArgsT> + void operator()(ArgsT &&...args) const { + send_closure(std::forward<ArgsT>(args)...); } }; +} // namespace detail -template <class... ArgsT> -class JoinPromise : public PromiseInterface<Unit> { - public: - explicit JoinPromise(ArgsT &&... arg) : promises_(std::forward<ArgsT>(arg)...) { - } - void set_value(Unit &&) override { - tuple_for_each(promises_, [](auto &promise) { promise.set_value(Unit()); }); - } - void set_error(Status &&error) override { - tuple_for_each(promises_, [&error](auto &promise) { promise.set_error(error.clone()); }); - } +inline Promise<Unit> create_event_promise(EventFull &&ok) { + return Promise<Unit>(td::make_unique<detail::EventPromise>(std::move(ok))); +} - private: - std::tuple<std::decay_t<ArgsT>...> promises_; -}; -} // namespace detail +inline Promise<Unit> create_event_promise(EventFull ok, EventFull fail) { + return Promise<Unit>(td::make_unique<detail::EventPromise>(std::move(ok), std::move(fail))); +} -/*** FutureActor and PromiseActor ***/ template <class T> class FutureActor; @@ -321,7 +84,8 @@ class PromiseActor; template <class T> class ActorTraits<FutureActor<T>> { public: - static constexpr bool is_lite = true; + static constexpr bool need_context = false; + static constexpr bool need_start_up = false; }; template <class T> @@ -335,12 +99,12 @@ class PromiseActor final : public PromiseInterface<T> { PromiseActor &operator=(const PromiseActor &other) = delete; PromiseActor(PromiseActor &&) = default; PromiseActor &operator=(PromiseActor &&) = default; - ~PromiseActor() override { + ~PromiseActor() final { close(); } - void set_value(T &&value) override; - void set_error(Status &&error) override; + void set_value(T &&value) final; + void set_error(Status &&error) final; void close() { future_id_.reset(); @@ -373,7 +137,7 @@ class PromiseActor final : public PromiseInterface<T> { private: ActorOwn<FutureActor<T>> future_id_; EventFull event_; - State state_; + State state_ = State::Hangup; void init() { state_ = State::Waiting; @@ -384,9 +148,12 @@ class PromiseActor final : public PromiseInterface<T> { template <class T> class FutureActor final : public Actor { friend class PromiseActor<T>; - enum State { Waiting, Ready }; public: + enum State { Waiting, Ready }; + + static constexpr int HANGUP_ERROR_CODE = 426487; + FutureActor() = default; FutureActor(const FutureActor &other) = delete; @@ -395,7 +162,7 @@ class FutureActor final : public Actor { FutureActor(FutureActor &&other) = default; FutureActor &operator=(FutureActor &&other) = default; - ~FutureActor() override = default; + ~FutureActor() final = default; bool is_ok() const { return is_ready() && result_.is_ok(); @@ -435,13 +202,17 @@ class FutureActor final : public Actor { } } + State get_state() const { + return state_; + } + template <class S> friend void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future); private: EventFull event_; - Result<T> result_; - State state_; + Result<T> result_ = Status::Error(500, "Empty FutureActor"); + State state_ = State::Waiting; void set_value(T &&value) { set_result(std::move(value)); @@ -459,11 +230,11 @@ class FutureActor final : public Actor { event_.try_emit_later(); } - void hangup() override { - set_error(Status::Hangup()); + void hangup() final { + set_error(Status::Error<HANGUP_ERROR_CODE>()); } - void start_up() override { + void start_up() final { // empty } @@ -520,51 +291,26 @@ class PromiseFuture { FutureActor<T> future_; }; -template <class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, class... ArgsT> -FutureActor<T> send_promise(ActorId<ActorAT> actor_id, Send::Flags flags, - ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...), ArgsT &&... args) { +template <ActorSendType send_type, class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, + class... ArgsT> +FutureActor<T> send_promise(ActorId<ActorAT> actor_id, ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...), + ArgsT &&...args) { PromiseFuture<T> pf; - ::td::Scheduler::instance()->send_closure( - std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...), flags); + Scheduler::instance()->send_closure<send_type>( + std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...)); return pf.move_future(); } -class PromiseCreator { - public: - struct Ignore { - void operator()(Status &&error) { - error.ignore(); - } +template <class... ArgsT> +auto promise_send_closure(ArgsT &&...args) { + return [t = std::make_tuple(std::forward<ArgsT>(args)...)](auto &&res) mutable { + call_tuple(detail::SendClosure(), std::tuple_cat(std::move(t), std::make_tuple(std::forward<decltype(res)>(res)))); }; +} - template <class OkT, class ArgT = detail::drop_result_t<detail::get_arg_t<OkT>>> - static Promise<ArgT> lambda(OkT &&ok) { - return Promise<ArgT>(std::make_unique<detail::LambdaPromise<ArgT, std::decay_t<OkT>, Ignore>>(std::forward<OkT>(ok), - Ignore(), true)); - } - - template <class OkT, class FailT, class ArgT = detail::get_arg_t<OkT>> - static Promise<ArgT> lambda(OkT &&ok, FailT &&fail) { - return Promise<ArgT>(std::make_unique<detail::LambdaPromise<ArgT, std::decay_t<OkT>, std::decay_t<FailT>>>( - std::forward<OkT>(ok), std::forward<FailT>(fail), false)); - } - - static Promise<> event(EventFull &&ok) { - return Promise<>(std::make_unique<detail::EventPromise>(std::move(ok))); - } - - static Promise<> event(EventFull ok, EventFull fail) { - return Promise<>(std::make_unique<detail::EventPromise>(std::move(ok), std::move(fail))); - } - - template <class... ArgsT> - static Promise<> join(ArgsT &&... args) { - return Promise<>(std::make_unique<detail::JoinPromise<ArgsT...>>(std::forward<ArgsT>(args)...)); - } +template <class T> +Promise<T> create_promise_from_promise_actor(PromiseActor<T> &&from) { + return Promise<T>(td::make_unique<PromiseActor<T>>(std::move(from))); +} - template <class T> - static Promise<T> from_promise_actor(PromiseActor<T> &&from) { - return Promise<T>(std::make_unique<PromiseActor<T>>(std::move(from))); - } -}; } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h index f505836a16..b89a283f74 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -8,12 +8,13 @@ #include "td/actor/actor.h" -#include "td/utils/logging.h" +#include "td/utils/common.h" #include "td/utils/optional.h" #include <functional> namespace td { + template <class T> class SchedulerLocalStorage { public: @@ -45,6 +46,16 @@ class LazySchedulerLocalStorage { LazySchedulerLocalStorage() = default; explicit LazySchedulerLocalStorage(std::function<T()> create_func) : create_func_(std::move(create_func)) { } + void set_create_func(std::function<T()> create_func) { + CHECK(!create_func_); + create_func_ = create_func; + } + + void set(T &&t) { + auto &optional_value_ = sls_optional_value_.get(); + CHECK(!optional_value_); + optional_value_ = std::move(t); + } T &get() { auto &optional_value_ = sls_optional_value_.get(); diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/SignalSlot.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/SignalSlot.h index 73b48f58ed..e1fd36323a 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/SignalSlot.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/SignalSlot.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -9,17 +9,20 @@ #include "td/actor/actor.h" namespace td { + class Slot; + class Signal { public: void emit(); - explicit Signal(ActorId<Slot> slot_id) : slot_id_(slot_id) { + explicit Signal(ActorId<Slot> slot_id) : slot_id_(std::move(slot_id)) { } private: ActorId<Slot> slot_id_; }; + class Slot final : public Actor { public: Slot() = default; @@ -27,7 +30,7 @@ class Slot final : public Actor { Slot &operator=(const Slot &other) = delete; Slot(Slot &&) = default; Slot &operator=(Slot &&) = default; - ~Slot() override { + ~Slot() final { close(); } void set_event(EventFull &&event) { @@ -69,18 +72,18 @@ class Slot final : public Actor { } ActorShared<> get_signal_new() { register_if_empty(); - return actor_shared(); + return actor_shared(this); } private: bool was_signal_ = false; EventFull event_; - void timeout_expired() override { + void timeout_expired() final { signal(); } - void start_up() override { + void start_up() final { empty(); } @@ -97,10 +100,11 @@ class Slot final : public Actor { event_.try_emit_later(); } } - void hangup_shared() override { + void hangup_shared() final { signal(); } }; + inline void Signal::emit() { send_closure(slot_id_, &Slot::signal); } diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/SleepActor.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/SleepActor.h index 9b9981ec38..8682ab0df5 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/SleepActor.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/SleepActor.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -8,11 +8,12 @@ #include "td/actor/actor.h" -#include "td/actor/PromiseFuture.h" +#include "td/utils/common.h" +#include "td/utils/Promise.h" namespace td { -class SleepActor : public Actor { +class SleepActor final : public Actor { public: SleepActor(double timeout, Promise<> promise) : timeout_(timeout), promise_(std::move(promise)) { } @@ -21,13 +22,20 @@ class SleepActor : public Actor { double timeout_; Promise<> promise_; - void start_up() override { + void start_up() final { set_timeout_in(timeout_); } - void timeout_expired() override { + void timeout_expired() final { promise_.set_value(Unit()); stop(); } }; +template <> +class ActorTraits<SleepActor> { + public: + static constexpr bool need_context = false; + static constexpr bool need_start_up = true; +}; + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.h index a3a9ba1913..cde72657b8 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -8,13 +8,10 @@ #include "td/actor/actor.h" -#include "td/utils/Heap.h" -#include "td/utils/logging.h" -#include "td/utils/Time.h" - -#include <set> +#include "td/utils/common.h" namespace td { + class Timeout final : public Actor { public: using Data = void *; @@ -33,9 +30,15 @@ class Timeout final : public Actor { bool has_timeout() const { return Actor::has_timeout(); } + double get_timeout() const { + return Actor::get_timeout(); + } void set_timeout_in(double timeout) { Actor::set_timeout_in(timeout); } + void set_timeout_at(double timeout) { + Actor::set_timeout_at(timeout); + } void cancel_timeout() { if (has_timeout()) { Actor::cancel_timeout(); @@ -47,14 +50,10 @@ class Timeout final : public Actor { private: friend class Scheduler; - Callback callback_; - Data data_; + Callback callback_{}; + Data data_{}; - void set_timeout_at(double timeout) { - Actor::set_timeout_at(timeout); - } - - void timeout_expired() override { + void timeout_expired() final { CHECK(!has_timeout()); CHECK(callback_ != Callback()); Callback callback = callback_; @@ -66,62 +65,4 @@ class Timeout final : public Actor { } }; -// TODO optimize -class MultiTimeout final : public Actor { - struct Item : public HeapNode { - int64 key; - - explicit Item(int64 key) : key(key) { - } - - bool operator<(const Item &other) const { - return key < other.key; - } - }; - - public: - using Data = void *; - using Callback = void (*)(Data, int64); - MultiTimeout() { - register_actor("MultiTimeout", this).release(); - } - - void set_callback(Callback callback) { - callback_ = callback; - } - void set_callback_data(Data data) { - data_ = data; - } - - bool has_timeout(int64 key) const; - - void set_timeout_in(int64 key, double timeout) { - set_timeout_at(key, Time::now() + timeout); - } - - void add_timeout_in(int64 key, double timeout) { - add_timeout_at(key, Time::now() + timeout); - } - - void set_timeout_at(int64 key, double timeout); - - void add_timeout_at(int64 key, double timeout); // memcache semantics, doesn't replace old timeout - - void cancel_timeout(int64 key); - - private: - friend class Scheduler; - - Callback callback_; - Data data_; - - KHeap<double> timeout_queue_; - std::set<Item> items_; - std::vector<int64> expired_; - - void update_timeout(); - - void timeout_expired() override; -}; - } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/actor.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/actor.h index dadfadc055..0aed51710c 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/actor.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/actor.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -9,6 +9,5 @@ #include "td/actor/impl/Actor.h" #include "td/actor/impl/ActorId.h" #include "td/actor/impl/ActorInfo.h" -#include "td/actor/impl/ConcurrentScheduler.h" #include "td/actor/impl/EventFull.h" #include "td/actor/impl/Scheduler.h" diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h index 4342214800..b0e75bd21c 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -24,8 +24,8 @@ class Actor : public ObserverBase { Actor() = default; Actor(const Actor &) = delete; Actor &operator=(const Actor &) = delete; - Actor(Actor &&other); - Actor &operator=(Actor &&other); + Actor(Actor &&other) noexcept; + Actor &operator=(Actor &&other) noexcept; ~Actor() override { if (!empty()) { do_stop(); @@ -67,6 +67,7 @@ class Actor : public ObserverBase { void stop(); void do_stop(); bool has_timeout() const; + double get_timeout() const; void set_timeout_in(double timeout_in); void set_timeout_at(double timeout_at); void cancel_timeout(); @@ -74,8 +75,9 @@ class Actor : public ObserverBase { void do_migrate(int32 sched_id); uint64 get_link_token(); - void set_context(std::shared_ptr<ActorContext> context); - void set_tag(CSlice tag); + std::weak_ptr<ActorContext> get_context_weak_ptr() const; + std::shared_ptr<ActorContext> set_context(std::shared_ptr<ActorContext> context); + string set_tag(string tag); void always_wait_for_mailbox(); @@ -88,10 +90,10 @@ class Actor : public ObserverBase { bool empty() const; template <class FuncT, class... ArgsT> - auto self_closure(FuncT &&func, ArgsT &&... args); + auto self_closure(FuncT &&func, ArgsT &&...args); template <class SelfT, class FuncT, class... ArgsT> - auto self_closure(SelfT *self, FuncT &&func, ArgsT &&... args); + auto self_closure(SelfT *self, FuncT &&func, ArgsT &&...args); template <class LambdaT> auto self_lambda(LambdaT &&lambda); @@ -101,7 +103,6 @@ class Actor : public ObserverBase { template <class SelfT> ActorId<SelfT> actor_id(SelfT *self); - ActorShared<> actor_shared(); template <class SelfT> ActorShared<SelfT> actor_shared(SelfT *self, uint64 id = static_cast<uint64>(-1)); @@ -114,7 +115,8 @@ class Actor : public ObserverBase { template <class ActorT> class ActorTraits { public: - static constexpr bool is_lite = false; + static constexpr bool need_context = true; + static constexpr bool need_start_up = true; }; } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h index 3fe5e20abf..d190a2158e 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -10,7 +10,7 @@ #include "td/actor/impl/EventFull-decl.h" #include "td/actor/impl/Scheduler-decl.h" -#include "td/utils/logging.h" +#include "td/utils/common.h" #include "td/utils/ObjectPool.h" #include "td/utils/Slice.h" @@ -19,14 +19,15 @@ #include <utility> namespace td { -inline Actor::Actor(Actor &&other) { + +inline Actor::Actor(Actor &&other) noexcept { CHECK(info_.empty()); info_ = std::move(other.info_); if (!empty()) { info_->on_actor_moved(this); } } -inline Actor &Actor::operator=(Actor &&other) { +inline Actor &Actor::operator=(Actor &&other) noexcept { CHECK(info_.empty()); info_ = std::move(other.info_); if (!empty()) { @@ -51,7 +52,10 @@ inline void Actor::do_stop() { CHECK(empty()); } inline bool Actor::has_timeout() const { - return Scheduler::instance()->has_actor_timeout(this); + return get_info()->get_heap_node()->in_heap(); +} +inline double Actor::get_timeout() const { + return Scheduler::instance()->get_actor_timeout(this); } inline void Actor::set_timeout_in(double timeout_in) { Scheduler::instance()->set_actor_timeout_in(this, timeout_in); @@ -75,32 +79,48 @@ std::enable_if_t<std::is_base_of<Actor, ActorType>::value> start_migrate(ActorTy Scheduler::instance()->start_migrate_actor(&obj, sched_id); } } + template <class ActorType> std::enable_if_t<std::is_base_of<Actor, ActorType>::value> finish_migrate(ActorType &obj) { if (!obj.empty()) { Scheduler::instance()->finish_migrate_actor(&obj); } } + inline uint64 Actor::get_link_token() { return Scheduler::instance()->get_link_token(this); } -inline void Actor::set_context(std::shared_ptr<ActorContext> context) { - info_->set_context(std::move(context)); + +inline std::weak_ptr<ActorContext> Actor::get_context_weak_ptr() const { + return info_->get_context_weak_ptr(); +} + +inline std::shared_ptr<ActorContext> Actor::set_context(std::shared_ptr<ActorContext> context) { + return info_->set_context(std::move(context)); } -inline void Actor::set_tag(CSlice tag) { - info_->get_context()->tag_ = tag.c_str(); + +inline string Actor::set_tag(string tag) { + auto *ctx = info_->get_context(); + string old_tag; + if (ctx->tag_) { + old_tag = ctx->tag_; + } + ctx->set_tag(std::move(tag)); Scheduler::on_context_updated(); + return old_tag; } inline void Actor::init(ObjectPool<ActorInfo>::OwnerPtr &&info) { info_ = std::move(info); } + inline ActorInfo *Actor::get_info() { return &*info_; } inline const ActorInfo *Actor::get_info() const { return &*info_; } + inline ObjectPool<ActorInfo>::OwnerPtr Actor::clear() { return std::move(info_); } @@ -118,22 +138,20 @@ ActorId<SelfT> Actor::actor_id(SelfT *self) { return ActorId<SelfT>(info_.get_weak()); } -inline ActorShared<> Actor::actor_shared() { - return actor_shared(this); -} template <class SelfT> ActorShared<SelfT> Actor::actor_shared(SelfT *self, uint64 id) { CHECK(static_cast<Actor *>(self) == this); + CHECK(id != 0); return ActorShared<SelfT>(actor_id(self), id); } template <class FuncT, class... ArgsT> -auto Actor::self_closure(FuncT &&func, ArgsT &&... args) { +auto Actor::self_closure(FuncT &&func, ArgsT &&...args) { return self_closure(this, std::forward<FuncT>(func), std::forward<ArgsT>(args)...); } template <class SelfT, class FuncT, class... ArgsT> -auto Actor::self_closure(SelfT *self, FuncT &&func, ArgsT &&... args) { +auto Actor::self_closure(SelfT *self, FuncT &&func, ArgsT &&...args) { return EventCreator::closure(actor_id(self), std::forward<FuncT>(func), std::forward<ArgsT>(args)...); } diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h index 5e82ed6a05..2b60c72472 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -12,8 +12,10 @@ #include <type_traits> namespace td { -class ActorInfo; + class Actor; +class ActorInfo; + template <class ActorType = Actor> class ActorId { public: @@ -21,12 +23,12 @@ class ActorId { explicit ActorId(ObjectPool<ActorInfo>::WeakPtr ptr) : ptr_(ptr) { } ActorId() = default; - ActorId(const ActorId &) = default; - ActorId &operator=(const ActorId &) = default; - ActorId(ActorId &&other) : ptr_(other.ptr_) { - other.ptr_.clear(); + ActorId(const ActorId &other) = default; + ActorId &operator=(const ActorId &other) = default; + ActorId(ActorId &&other) noexcept : ptr_(other.ptr_) { + other.clear(); } - ActorId &operator=(ActorId &&other) { + ActorId &operator=(ActorId &&other) noexcept { if (&other == this) { return *this; } @@ -48,10 +50,8 @@ class ActorId { } ActorInfo *get_actor_info() const; - ActorType *get_actor_unsafe() const; - // returns pointer to actor if it is on current thread. nullptr otherwise - ActorType *try_get_actor() const; + ActorType *get_actor_unsafe() const; Slice get_name() const; @@ -60,33 +60,27 @@ class ActorId { return ActorId<ToActorType>(ptr_); } - template <class AsActorType> - ActorId<AsActorType> as() const { - return ActorId<AsActorType>(ptr_); - } - private: ObjectPool<ActorInfo>::WeakPtr ptr_; }; -// threat ActorId as pointer and ActorOwn as -// unique_ptr<ActorId> +// treat ActorId as pointer and ActorOwn as unique_ptr<ActorId> template <class ActorType = Actor> class ActorOwn { public: using ActorT = ActorType; ActorOwn() = default; - explicit ActorOwn(ActorId<ActorType>); + explicit ActorOwn(ActorId<ActorType> id); template <class OtherActorType> explicit ActorOwn(ActorId<OtherActorType> id); template <class OtherActorType> - explicit ActorOwn(ActorOwn<OtherActorType> &&); + explicit ActorOwn(ActorOwn<OtherActorType> &&other); template <class OtherActorType> - ActorOwn &operator=(ActorOwn<OtherActorType> &&); - ActorOwn(ActorOwn &&); - ActorOwn &operator=(ActorOwn &&); - ActorOwn(const ActorOwn &) = delete; - ActorOwn &operator=(const ActorOwn &) = delete; + ActorOwn &operator=(ActorOwn<OtherActorType> &&other); + ActorOwn(ActorOwn &&other) noexcept; + ActorOwn &operator=(ActorOwn &&other) noexcept; + ActorOwn(const ActorOwn &other) = delete; + ActorOwn &operator=(const ActorOwn &other) = delete; ~ActorOwn(); bool empty() const; @@ -96,11 +90,7 @@ class ActorOwn { ActorId<ActorType> get() const; ActorId<ActorType> release(); void reset(ActorId<ActorType> other = ActorId<ActorType>()); - void hangup() const; - const ActorId<ActorType> *operator->() const; - - using ActorIdConstRef = const ActorId<ActorType> &; - // operator ActorIdConstRef(); + ActorType *get_actor_unsafe() const; private: ActorId<ActorType> id_; @@ -112,17 +102,17 @@ class ActorShared { using ActorT = ActorType; ActorShared() = default; template <class OtherActorType> - ActorShared(ActorId<OtherActorType>, uint64 token); + ActorShared(ActorId<OtherActorType> id, uint64 token); template <class OtherActorType> - ActorShared(ActorShared<OtherActorType> &&); + ActorShared(ActorShared<OtherActorType> &&other); template <class OtherActorType> - ActorShared(ActorOwn<OtherActorType> &&); + ActorShared(ActorOwn<OtherActorType> &&other); template <class OtherActorType> - ActorShared &operator=(ActorShared<OtherActorType> &&); - ActorShared(ActorShared &&); - ActorShared &operator=(ActorShared &&); - ActorShared(const ActorShared &) = delete; - ActorShared &operator=(const ActorShared &) = delete; + ActorShared &operator=(ActorShared<OtherActorType> &&other); + ActorShared(ActorShared &&other) noexcept; + ActorShared &operator=(ActorShared &&other) noexcept; + ActorShared(const ActorShared &other) = delete; + ActorShared &operator=(const ActorShared &other) = delete; ~ActorShared(); uint64 token() const; @@ -133,13 +123,10 @@ class ActorShared { ActorId<ActorType> get() const; ActorId<ActorType> release(); void reset(ActorId<ActorType> other = ActorId<ActorType>()); - template <class OtherActorType> - void reset(ActorId<OtherActorType> other); - const ActorId<ActorType> *operator->() const; private: ActorId<ActorType> id_; - uint64 token_; + uint64 token_ = 0; }; class ActorRef { @@ -148,6 +135,8 @@ class ActorRef { template <class T> ActorRef(const ActorId<T> &actor_id); template <class T> + ActorRef(ActorId<T> &&actor_id); + template <class T> ActorRef(const ActorShared<T> &actor_id); template <class T> ActorRef(ActorShared<T> &&actor_id); @@ -166,4 +155,5 @@ class ActorRef { ActorId<> actor_id_; uint64 token_ = 0; }; + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId.h index 34d7970633..76ef6d9a8b 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -13,7 +13,6 @@ #include "td/utils/Slice.h" namespace td { -/*** ActorId ***/ // If actor is on our scheduler(thread) result will be valid // If actor is on another scheduler we will see it in migrate_dest_flags @@ -31,31 +30,24 @@ ActorType *ActorId<ActorType>::get_actor_unsafe() const { } template <class ActorType> -ActorType *ActorId<ActorType>::try_get_actor() const { - auto info = get_actor_info(); - if (info && !info->is_migrating() && Scheduler::instance()->sched_id() == info->migrate_dest()) { - return static_cast<ActorType *>(info->get_actor_unsafe()); - } - return nullptr; -} - -template <class ActorType> Slice ActorId<ActorType>::get_name() const { return ptr_->get_name(); } -// ActorOwn template <class ActorType> ActorOwn<ActorType>::ActorOwn(ActorId<ActorType> id) : id_(std::move(id)) { } + template <class ActorType> template <class OtherActorType> ActorOwn<ActorType>::ActorOwn(ActorId<OtherActorType> id) : id_(std::move(id)) { } + template <class ActorType> template <class OtherActorType> ActorOwn<ActorType>::ActorOwn(ActorOwn<OtherActorType> &&other) : id_(other.release()) { } + template <class ActorType> template <class OtherActorType> ActorOwn<ActorType> &ActorOwn<ActorType>::operator=(ActorOwn<OtherActorType> &&other) { @@ -64,10 +56,11 @@ ActorOwn<ActorType> &ActorOwn<ActorType>::operator=(ActorOwn<OtherActorType> &&o } template <class ActorType> -ActorOwn<ActorType>::ActorOwn(ActorOwn &&other) : id_(other.release()) { +ActorOwn<ActorType>::ActorOwn(ActorOwn &&other) noexcept : id_(other.release()) { } + template <class ActorType> -ActorOwn<ActorType> &ActorOwn<ActorType>::operator=(ActorOwn &&other) { +ActorOwn<ActorType> &ActorOwn<ActorType>::operator=(ActorOwn &&other) noexcept { reset(other.release()); return *this; } @@ -81,6 +74,7 @@ template <class ActorType> bool ActorOwn<ActorType>::empty() const { return id_.empty(); } + template <class ActorType> ActorId<ActorType> ActorOwn<ActorType>::get() const { return id_; @@ -90,37 +84,36 @@ template <class ActorType> ActorId<ActorType> ActorOwn<ActorType>::release() { return std::move(id_); } + template <class ActorType> void ActorOwn<ActorType>::reset(ActorId<ActorType> other) { static_assert(sizeof(ActorType) > 0, "Can't use ActorOwn with incomplete type"); - hangup(); - id_ = std::move(other); -} - -template <class ActorType> -void ActorOwn<ActorType>::hangup() const { if (!id_.empty()) { send_event(id_, Event::hangup()); } + id_ = std::move(other); } + template <class ActorType> -const ActorId<ActorType> *ActorOwn<ActorType>::operator->() const { - return &id_; +ActorType *ActorOwn<ActorType>::get_actor_unsafe() const { + return id_.get_actor_unsafe(); } -// ActorShared template <class ActorType> template <class OtherActorType> ActorShared<ActorType>::ActorShared(ActorId<OtherActorType> id, uint64 token) : id_(std::move(id)), token_(token) { } + template <class ActorType> template <class OtherActorType> ActorShared<ActorType>::ActorShared(ActorShared<OtherActorType> &&other) : id_(other.release()), token_(other.token()) { } + template <class ActorType> template <class OtherActorType> ActorShared<ActorType>::ActorShared(ActorOwn<OtherActorType> &&other) : id_(other.release()), token_(0) { } + template <class ActorType> template <class OtherActorType> ActorShared<ActorType> &ActorShared<ActorType>::operator=(ActorShared<OtherActorType> &&other) { @@ -130,10 +123,11 @@ ActorShared<ActorType> &ActorShared<ActorType>::operator=(ActorShared<OtherActor } template <class ActorType> -ActorShared<ActorType>::ActorShared(ActorShared &&other) : id_(other.release()), token_(other.token_) { +ActorShared<ActorType>::ActorShared(ActorShared &&other) noexcept : id_(other.release()), token_(other.token_) { } + template <class ActorType> -ActorShared<ActorType> &ActorShared<ActorType>::operator=(ActorShared &&other) { +ActorShared<ActorType> &ActorShared<ActorType>::operator=(ActorShared &&other) noexcept { reset(other.release()); token_ = other.token_; return *this; @@ -148,10 +142,12 @@ template <class ActorType> uint64 ActorShared<ActorType>::token() const { return token_; } + template <class ActorType> bool ActorShared<ActorType>::empty() const { return id_.empty(); } + template <class ActorType> ActorId<ActorType> ActorShared<ActorType>::get() const { return id_; @@ -161,38 +157,37 @@ template <class ActorType> ActorId<ActorType> ActorShared<ActorType>::release() { return std::move(id_); } -template <class ActorType> -void ActorShared<ActorType>::reset(ActorId<ActorType> other) { - reset<ActorType>(std::move(other)); -} template <class ActorType> -template <class OtherActorType> -void ActorShared<ActorType>::reset(ActorId<OtherActorType> other) { +void ActorShared<ActorType>::reset(ActorId<ActorType> other) { static_assert(sizeof(ActorType) > 0, "Can't use ActorShared with incomplete type"); if (!id_.empty()) { send_event(*this, Event::hangup()); } - id_ = static_cast<ActorId<ActorType>>(other); -} -template <class ActorType> -const ActorId<ActorType> *ActorShared<ActorType>::operator->() const { - return &id_; + id_ = std::move(other); } -/*** ActorRef ***/ template <class T> ActorRef::ActorRef(const ActorId<T> &actor_id) : actor_id_(actor_id) { } + +template <class T> +ActorRef::ActorRef(ActorId<T> &&actor_id) : actor_id_(actor_id) { + actor_id.clear(); +} + template <class T> ActorRef::ActorRef(const ActorShared<T> &actor_id) : actor_id_(actor_id.get()), token_(actor_id.token()) { } + template <class T> ActorRef::ActorRef(ActorShared<T> &&actor_id) : actor_id_(actor_id.release()), token_(actor_id.token()) { } + template <class T> ActorRef::ActorRef(const ActorOwn<T> &actor_id) : actor_id_(actor_id.get()) { } + template <class T> ActorRef::ActorRef(ActorOwn<T> &&actor_id) : actor_id_(actor_id.release()) { } diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h index de9fba794e..3b9d3c2f2a 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -32,13 +32,24 @@ class ActorContext { ActorContext(ActorContext &&) = delete; ActorContext &operator=(ActorContext &&) = delete; virtual ~ActorContext() = default; + + virtual int32 get_id() const { + return 0; + } + + void set_tag(string tag) { + tag_storage_ = std::move(tag); + tag_ = tag_storage_.c_str(); + } + const char *tag_ = nullptr; + string tag_storage_; // sometimes tag_ == tag_storage_.c_str() std::weak_ptr<ActorContext> this_ptr_; }; -class ActorInfo +class ActorInfo final : private ListNode - , HeapNode { + , private HeapNode { public: enum class Deleter : uint8 { Destroy, None }; @@ -52,11 +63,11 @@ class ActorInfo ActorInfo &operator=(const ActorInfo &) = delete; void init(int32 sched_id, Slice name, ObjectPool<ActorInfo>::OwnerPtr &&this_ptr, Actor *actor_ptr, Deleter deleter, - bool is_lite); + bool need_context, bool need_start_up); void on_actor_moved(Actor *actor_new_ptr); template <class ActorT> - ActorOwn<ActorT> transfer_ownership_to_scheduler(std::unique_ptr<ActorT> actor); + ActorOwn<ActorT> transfer_ownership_to_scheduler(unique_ptr<ActorT> actor); void clear(); void destroy_actor(); @@ -74,7 +85,8 @@ class ActorInfo Actor *get_actor_unsafe(); const Actor *get_actor_unsafe() const; - void set_context(std::shared_ptr<ActorContext> context); + std::shared_ptr<ActorContext> set_context(std::shared_ptr<ActorContext> context); + std::weak_ptr<ActorContext> get_context_weak_ptr() const; ActorContext *get_context(); const ActorContext *get_context() const; CSlice get_name() const; @@ -93,16 +105,18 @@ class ActorInfo vector<Event> mailbox_; - bool is_lite() const; + bool need_context() const; + bool need_start_up() const; void set_wait_generation(uint32 wait_generation); bool must_wait(uint32 wait_generation) const; void always_wait_for_mailbox(); private: - Deleter deleter_; - bool is_lite_; - bool is_running_; + Deleter deleter_ = Deleter::None; + bool need_context_ = true; + bool need_start_up_ = true; + bool is_running_ = false; bool always_wait_for_mailbox_{false}; uint32 wait_generation_{0}; @@ -116,4 +130,5 @@ class ActorInfo }; StringBuilder &operator<<(StringBuilder &sb, const ActorInfo &info); + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo.h index df0b0dfd81..35ec31b168 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -11,7 +11,6 @@ #include "td/actor/impl/Scheduler-decl.h" #include "td/utils/common.h" -#include "td/utils/format.h" #include "td/utils/Heap.h" #include "td/utils/List.h" #include "td/utils/logging.h" @@ -24,59 +23,69 @@ #include <utility> namespace td { -/*** ActorInfo ***/ + inline StringBuilder &operator<<(StringBuilder &sb, const ActorInfo &info) { sb << info.get_name() << ":" << const_cast<void *>(static_cast<const void *>(&info)) << ":" << const_cast<void *>(static_cast<const void *>(info.get_context())); return sb; } + inline void ActorInfo::init(int32 sched_id, Slice name, ObjectPool<ActorInfo>::OwnerPtr &&this_ptr, Actor *actor_ptr, - Deleter deleter, bool is_lite) { + Deleter deleter, bool need_context, bool need_start_up) { CHECK(!is_running()); CHECK(!is_migrating()); sched_id_.store(sched_id, std::memory_order_relaxed); actor_ = actor_ptr; - if (!is_lite) { + if (need_context) { context_ = Scheduler::context()->this_ptr_.lock(); + VLOG(actor) << "Set context " << context_.get() << " for " << name; + } #ifdef TD_DEBUG - name_ = name.str(); + name_.assign(name.data(), name.size()); #endif - } actor_->init(std::move(this_ptr)); deleter_ = deleter; - is_lite_ = is_lite; + need_context_ = need_context; + need_start_up_ = need_start_up; is_running_ = false; wait_generation_ = 0; } -inline bool ActorInfo::is_lite() const { - return is_lite_; + +inline bool ActorInfo::need_context() const { + return need_context_; +} + +inline bool ActorInfo::need_start_up() const { + return need_start_up_; } + inline void ActorInfo::set_wait_generation(uint32 wait_generation) { wait_generation_ = wait_generation; } + inline bool ActorInfo::must_wait(uint32 wait_generation) const { return wait_generation_ == wait_generation || (always_wait_for_mailbox_ && !mailbox_.empty()); } + inline void ActorInfo::always_wait_for_mailbox() { always_wait_for_mailbox_ = true; } + inline void ActorInfo::on_actor_moved(Actor *actor_new_ptr) { actor_ = actor_new_ptr; } inline void ActorInfo::clear() { - // LOG_IF(WARNING, !mailbox_.empty()) << "Destroy actor with non-empty mailbox: " << get_name() - // << format::as_array(mailbox_); - mailbox_.clear(); + CHECK(mailbox_.empty()); + CHECK(!actor_); CHECK(!is_running()); CHECK(!is_migrating()); // NB: must be in non migrating state // store invalid scheduler id. sched_id_.store((1 << 30) - 1, std::memory_order_relaxed); - destroy_actor(); - // Destroy context only after destructor. + VLOG(actor) << "Clear context " << context_.get() << " for " << get_name(); context_.reset(); } @@ -92,10 +101,11 @@ inline void ActorInfo::destroy_actor() { break; } actor_ = nullptr; + mailbox_.clear(); } template <class ActorT> -ActorOwn<ActorT> ActorInfo::transfer_ownership_to_scheduler(std::unique_ptr<ActorT> actor) { +ActorOwn<ActorT> ActorInfo::transfer_ownership_to_scheduler(unique_ptr<ActorT> actor) { CHECK(!empty()); CHECK(deleter_ == Deleter::None); ActorT *actor_ptr = actor.release(); @@ -142,14 +152,22 @@ inline const Actor *ActorInfo::get_actor_unsafe() const { return actor_; } -inline void ActorInfo::set_context(std::shared_ptr<ActorContext> context) { +inline std::shared_ptr<ActorContext> ActorInfo::set_context(std::shared_ptr<ActorContext> context) { CHECK(is_running()); context->this_ptr_ = context; - context->tag_ = Scheduler::context()->tag_; - context_ = std::move(context); + if (Scheduler::context()->tag_) { + context->set_tag(Scheduler::context()->tag_); + } + std::swap(context_, context); Scheduler::context() = context_.get(); Scheduler::on_context_updated(); + return context; } + +inline std::weak_ptr<ActorContext> ActorInfo::get_context_weak_ptr() const { + return context_; +} + inline const ActorContext *ActorInfo::get_context() const { return context_.get(); } @@ -167,13 +185,15 @@ inline CSlice ActorInfo::get_name() const { } inline void ActorInfo::start_run() { - VLOG(actor) << "start_run: " << *this; - CHECK(!is_running_) << "Recursive call of actor " << tag("name", get_name()); + VLOG(actor) << "Start run actor: " << *this; + LOG_CHECK(!is_running_) << "Recursive call of actor " << get_name(); is_running_ = true; } inline void ActorInfo::finish_run() { is_running_ = false; - VLOG(actor) << "stop_run: " << *this; + if (!empty()) { + VLOG(actor) << "Stop run actor: " << *this; + } } inline bool ActorInfo::is_running() const { @@ -198,4 +218,5 @@ inline const ListNode *ActorInfo::get_list_node() const { inline ActorInfo *ActorInfo::from_list_node(ListNode *node) { return static_cast<ActorInfo *>(node); } + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp deleted file mode 100644 index 47593db90b..0000000000 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp +++ /dev/null @@ -1,102 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#include "td/actor/impl/ConcurrentScheduler.h" - -#include "td/actor/impl/Actor.h" -#include "td/actor/impl/ActorId.h" -#include "td/actor/impl/ActorInfo.h" -#include "td/actor/impl/Scheduler.h" - -#include "td/utils/MpscPollableQueue.h" -#include "td/utils/port/thread_local.h" - -#include <memory> - -namespace td { - -void ConcurrentScheduler::init(int32 threads_n) { -#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED - threads_n = 0; -#endif - threads_n++; - std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound(threads_n); - for (int32 i = 0; i < threads_n; i++) { -#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED -#else - auto queue = std::make_shared<MpscPollableQueue<EventFull>>(); - queue->init(); - outbound[i] = queue; -#endif - } - - schedulers_.resize(threads_n); - for (int32 i = 0; i < threads_n; i++) { - auto &sched = schedulers_[i]; - sched = make_unique<Scheduler>(); - sched->init(i, outbound, static_cast<Scheduler::Callback *>(this)); - } - - state_ = State::Start; -} - -void ConcurrentScheduler::test_one_thread_run() { - do { - for (auto &sched : schedulers_) { - sched->run(0); - } - } while (!is_finished_.load(std::memory_order_relaxed)); -} - -void ConcurrentScheduler::start() { - CHECK(state_ == State::Start); - is_finished_.store(false, std::memory_order_relaxed); - set_thread_id(0); -#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED - for (size_t i = 1; i < schedulers_.size(); i++) { - auto &sched = schedulers_[i]; - threads_.push_back(td::thread([&, tid = i]() { - set_thread_id(static_cast<int32>(tid)); - while (!is_finished()) { - sched->run(10); - } - })); - } -#endif - state_ = State::Run; -} - -bool ConcurrentScheduler::run_main(double timeout) { - CHECK(state_ == State::Run); - // run main scheduler in same thread - auto &main_sched = schedulers_[0]; - if (!is_finished()) { - main_sched->run(timeout); - } - return !is_finished(); -} - -void ConcurrentScheduler::finish() { - CHECK(state_ == State::Run); - if (!is_finished()) { - on_finish(); - } -#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED - for (auto &thread : threads_) { - thread.join(); - } - threads_.clear(); -#endif - schedulers_.clear(); - for (auto &f : at_finish_) { - f(); - } - at_finish_.clear(); - - state_ = State::Start; -} - -} // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h index fac66dd120..2796c701e5 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -8,8 +8,6 @@ #include "td/utils/Closure.h" #include "td/utils/common.h" -#include "td/utils/format.h" -#include "td/utils/logging.h" #include "td/utils/StringBuilder.h" #include <type_traits> @@ -50,7 +48,6 @@ class CustomEvent { virtual ~CustomEvent() = default; virtual void run(Actor *actor) = 0; - virtual CustomEvent *clone() const = 0; virtual void start_migrate(int32 sched_id) { } virtual void finish_migrate() { @@ -58,26 +55,23 @@ class CustomEvent { }; template <class ClosureT> -class ClosureEvent : public CustomEvent { +class ClosureEvent final : public CustomEvent { public: - void run(Actor *actor) override { + void run(Actor *actor) final { closure_.run(static_cast<typename ClosureT::ActorType *>(actor)); } - CustomEvent *clone() const override { - return new ClosureEvent<ClosureT>(closure_.clone()); - } template <class... ArgsT> - explicit ClosureEvent(ArgsT &&... args) : closure_(std::forward<ArgsT>(args)...) { + explicit ClosureEvent(ArgsT &&...args) : closure_(std::forward<ArgsT>(args)...) { } - void start_migrate(int32 sched_id) override { + void start_migrate(int32 sched_id) final { closure_.for_each([sched_id](auto &obj) { using ::td::start_migrate; start_migrate(obj, sched_id); }); } - void finish_migrate() override { + void finish_migrate() final { closure_.for_each([](auto &obj) { using ::td::finish_migrate; finish_migrate(obj); @@ -89,16 +83,12 @@ class ClosureEvent : public CustomEvent { }; template <class LambdaT> -class LambdaEvent : public CustomEvent { +class LambdaEvent final : public CustomEvent { public: - void run(Actor *actor) override { + void run(Actor *actor) final { f_(); } - CustomEvent *clone() const override { - LOG(FATAL) << "Not supported"; - return nullptr; - } - template <class FromLambdaT> + template <class FromLambdaT, std::enable_if_t<!std::is_same<std::decay_t<FromLambdaT>, LambdaEvent>::value, int> = 0> explicit LambdaEvent(FromLambdaT &&lambda) : f_(std::forward<FromLambdaT>(lambda)) { } @@ -153,7 +143,7 @@ class Event { new ClosureEvent<typename FromImmediateClosureT::Delayed>(std::forward<FromImmediateClosureT>(closure))); } template <class... ArgsT> - static Event delayed_closure(ArgsT &&... args) { + static Event delayed_closure(ArgsT &&...args) { using DelayedClosureT = decltype(create_delayed_closure(std::forward<ArgsT>(args)...)); return custom(new ClosureEvent<DelayedClosureT>(std::forward<ArgsT>(args)...)); } @@ -167,10 +157,10 @@ class Event { } Event(const Event &other) = delete; Event &operator=(const Event &) = delete; - Event(Event &&other) : type(other.type), link_token(other.link_token), data(other.data) { + Event(Event &&other) noexcept : type(other.type), link_token(other.link_token), data(other.data) { other.type = Type::NoType; } - Event &operator=(Event &&other) { + Event &operator=(Event &&other) noexcept { destroy(); type = other.type; link_token = other.link_token; @@ -182,17 +172,6 @@ class Event { destroy(); } - Event clone() const { - Event res; - res.type = type; - if (type == Type::Custom) { - res.data.custom_event = data.custom_event->clone(); - } else { - res.data = data; - } - return res; - } - bool empty() const { return type == Type::NoType; } @@ -241,7 +220,28 @@ class Event { } } }; -inline StringBuilder &operator<<(StringBuilder &sb, const Event &e) { - return sb << tag("Event", static_cast<int32>(e.type)); + +inline StringBuilder &operator<<(StringBuilder &string_builder, const Event &e) { + string_builder << "Event::"; + switch (e.type) { + case Event::Type::Start: + return string_builder << "Start"; + case Event::Type::Stop: + return string_builder << "Stop"; + case Event::Type::Yield: + return string_builder << "Yield"; + case Event::Type::Hangup: + return string_builder << "Hangup"; + case Event::Type::Timeout: + return string_builder << "Timeout"; + case Event::Type::Raw: + return string_builder << "Raw"; + case Event::Type::Custom: + return string_builder << "Custom"; + case Event::Type::NoType: + default: + return string_builder << "NoType"; + } } + } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h index ef2f1c2dcb..4137765111 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -45,7 +45,7 @@ class EventFull { data_.link_token = actor_ref.token(); } template <class T> - EventFull(ActorId<T> actor_id, Event &&data) : actor_id_(actor_id), data_(std::move(data)) { + EventFull(ActorId<T> actor_id, Event &&data) : actor_id_(std::move(actor_id)), data_(std::move(data)) { } ActorId<> actor_id_; @@ -56,7 +56,7 @@ class EventFull { class EventCreator { public: template <class ActorIdT, class FunctionT, class... ArgsT> - static EventFull closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) { + static EventFull closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&...args) { using ActorT = typename std::decay_t<ActorIdT>::ActorT; using FunctionClassT = member_function_class_t<FunctionT>; static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure"); diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull.h index 1e997ee4b3..89eabef768 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -9,7 +9,7 @@ #include "td/actor/impl/EventFull-decl.h" #include "td/actor/impl/Scheduler-decl.h" -#include "td/utils/logging.h" +#include "td/utils/common.h" #include <utility> diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h index 4b51c102a5..8ed9feb10a 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -11,37 +11,40 @@ #include "td/actor/impl/EventFull-decl.h" #include "td/utils/Closure.h" +#include "td/utils/common.h" +#include "td/utils/FlatHashMap.h" #include "td/utils/Heap.h" #include "td/utils/List.h" +#include "td/utils/logging.h" #include "td/utils/MovableValue.h" #include "td/utils/MpscPollableQueue.h" #include "td/utils/ObjectPool.h" -#include "td/utils/port/EventFd.h" -#include "td/utils/port/Fd.h" +#include "td/utils/port/detail/PollableFd.h" #include "td/utils/port/Poll.h" +#include "td/utils/port/PollFlags.h" #include "td/utils/port/thread_local.h" +#include "td/utils/Promise.h" #include "td/utils/Slice.h" +#include "td/utils/Time.h" #include "td/utils/type_traits.h" #include <functional> -#include <map> #include <memory> #include <type_traits> #include <utility> namespace td { + +extern int VERBOSITY_NAME(actor); + class ActorInfo; -struct Send { - using Flags = uint32; - static const Flags immediate = 0x001; - static const Flags later = 0x002; - static const Flags later_weak = 0x004; -}; + +enum class ActorSendType { Immediate, Later, LaterWeak }; class Scheduler; class SchedulerGuard { public: - explicit SchedulerGuard(Scheduler *scheduler); + explicit SchedulerGuard(Scheduler *scheduler, bool lock = true); ~SchedulerGuard(); SchedulerGuard(const SchedulerGuard &other) = delete; SchedulerGuard &operator=(const SchedulerGuard &other) = delete; @@ -50,6 +53,7 @@ class SchedulerGuard { private: MovableValue<bool> is_valid_ = true; + bool is_locked_; Scheduler *scheduler_; ActorContext *save_context_; Scheduler *save_scheduler_; @@ -82,9 +86,9 @@ class Scheduler { int32 sched_count() const; template <class ActorT, class... Args> - TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&... args); + TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&...args); template <class ActorT, class... Args> - TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args); + TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&...args); template <class ActorT> TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, ActorT *actor_ptr, int32 sched_id = -1); template <class ActorT> @@ -96,22 +100,28 @@ class Scheduler { void send_to_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event); void send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event); - template <class EventT> - void send_lambda(ActorRef actor_ref, EventT &&lambda, Send::Flags flags = 0); + void run_on_scheduler(int32 sched_id, Promise<Unit> action); // TODO Action + + template <class T> + void destroy_on_scheduler(int32 sched_id, T &value); + + template <class... ArgsT> + void destroy_on_scheduler(int32 sched_id, ArgsT &...values); + + template <ActorSendType send_type, class EventT> + void send_lambda(ActorRef actor_ref, EventT &&lambda); - template <class EventT> - void send_closure(ActorRef actor_ref, EventT &&closure, Send::Flags flags = 0); + template <ActorSendType send_type, class EventT> + void send_closure(ActorRef actor_ref, EventT &&closure); - void send(ActorRef actor_ref, Event &&event, Send::Flags flags = 0); + template <ActorSendType send_type> + void send(ActorRef actor_ref, Event &&event); - void hack(const ActorId<> &actor_id, Event &&event) { - actor_id.get_actor_unsafe()->raw_event(event.data); - } void before_tail_send(const ActorId<> &actor_id); - void subscribe(const Fd &fd, Fd::Flags flags = Fd::Write | Fd::Read); - void unsubscribe(const Fd &fd); - void unsubscribe_before_close(const Fd &fd); + static void subscribe(PollableFd fd, PollFlags flags = PollFlags::ReadWrite()); + static void unsubscribe(PollableFdRef fd); + static void unsubscribe_before_close(PollableFdRef fd); void yield_actor(Actor *actor); void stop_actor(Actor *actor); @@ -122,15 +132,15 @@ class Scheduler { void start_migrate_actor(Actor *actor, int32 dest_sched_id); void finish_migrate_actor(Actor *actor); - bool has_actor_timeout(const Actor *actor) const; + double get_actor_timeout(const Actor *actor) const; void set_actor_timeout_in(Actor *actor, double timeout); void set_actor_timeout_at(Actor *actor, double timeout_at); void cancel_actor_timeout(Actor *actor); void finish(); void yield(); - void run(double timeout); - void run_no_guard(double timeout); + void run(Timestamp timeout); + void run_no_guard(Timestamp timeout); void wakeup(); @@ -139,22 +149,29 @@ class Scheduler { static void on_context_updated(); SchedulerGuard get_guard(); + SchedulerGuard get_const_guard(); + + Timestamp get_timeout(); private: static void set_scheduler(Scheduler *scheduler); - /*** ServiceActor ***/ + + void destroy_on_scheduler_impl(int32 sched_id, Promise<Unit> action); + class ServiceActor final : public Actor { public: void set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues); - void start_up() override; private: std::shared_ptr<MpscPollableQueue<EventFull>> inbound_; - void loop() override; + bool subscribed_{false}; + + void start_up() final; + void loop() final; + void tear_down() final; }; friend class ServiceActor; - void do_custom_event(ActorInfo *actor, CustomEvent &event); void do_event(ActorInfo *actor, Event &&event); void enter_actor(ActorInfo *actor_info); @@ -168,7 +185,7 @@ class Scheduler { void do_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id); void start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id); - bool has_actor_timeout(const ActorInfo *actor_info) const; + double get_actor_timeout(const ActorInfo *actor_info) const; void set_actor_timeout_in(ActorInfo *actor_info, double timeout); void set_actor_timeout_at(ActorInfo *actor_info, double timeout_at); void cancel_actor_timeout(ActorInfo *actor_info); @@ -180,15 +197,15 @@ class Scheduler { template <class RunFuncT, class EventFuncT> void flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func); - template <class RunFuncT, class EventFuncT> - void send_impl(const ActorId<> &actor_id, Send::Flags flags, const RunFuncT &run_func, const EventFuncT &event_func); + template <ActorSendType send_type, class RunFuncT, class EventFuncT> + void send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func); void inc_wait_generation(); - double run_timeout(); + Timestamp run_timeout(); void run_mailbox(); - double run_events(); - void run_poll(double timeout); + Timestamp run_events(Timestamp timeout); + void run_poll(Timestamp timeout); template <class ActorT> ActorOwn<ActorT> register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id); @@ -198,42 +215,39 @@ class Scheduler { static TD_THREAD_LOCAL ActorContext *context_; Callback *callback_ = nullptr; - std::unique_ptr<ObjectPool<ActorInfo>> actor_info_pool_; + unique_ptr<ObjectPool<ActorInfo>> actor_info_pool_; - int32 actor_count_; + int32 actor_count_ = 0; ListNode pending_actors_list_; ListNode ready_actors_list_; KHeap<double> timeout_queue_; - std::map<ActorInfo *, std::vector<Event>> pending_events_; + FlatHashMap<ActorInfo *, std::vector<Event>> pending_events_; -#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED - EventFd event_fd_; -#endif ServiceActor service_actor_; Poll poll_; - bool yield_flag_; + bool yield_flag_ = false; bool has_guard_ = false; bool close_flag_ = false; - uint32 wait_generation_ = 0; - int32 sched_id_; - int32 sched_n_; + uint32 wait_generation_ = 1; + int32 sched_id_ = 0; + int32 sched_n_ = 0; std::shared_ptr<MpscPollableQueue<EventFull>> inbound_queue_; std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound_queues_; std::shared_ptr<ActorContext> save_context_; struct EventContext { - int32 dest_sched_id; + int32 dest_sched_id{0}; enum Flags { Stop = 1, Migrate = 2 }; int32 flags{0}; - uint64 link_token; + uint64 link_token{0}; - ActorInfo *actor_info; + ActorInfo *actor_info{nullptr}; }; - EventContext *event_context_ptr_; + EventContext *event_context_ptr_{nullptr}; friend class GlobalScheduler; friend class SchedulerGuard; @@ -241,14 +255,10 @@ class Scheduler { }; /*** Interface to current scheduler ***/ -void subscribe(const Fd &fd, Fd::Flags flags = Fd::Write | Fd::Read); -void unsubscribe(const Fd &fd); -void unsubscribe_before_close(const Fd &fd); - template <class ActorT, class... Args> -TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&... args); +TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&...args); template <class ActorT, class... Args> -TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args); +TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&...args); template <class ActorT> TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, ActorT *actor_ptr, int32 sched_id = -1); template <class ActorT> @@ -258,39 +268,38 @@ template <class ActorT> TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_existing_actor(unique_ptr<ActorT> actor_ptr); template <class ActorIdT, class FunctionT, class... ArgsT> -void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) { +void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&...args) { using ActorT = typename std::decay_t<ActorIdT>::ActorT; using FunctionClassT = member_function_class_t<FunctionT>; static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure"); - Scheduler::instance()->send_closure(std::forward<ActorIdT>(actor_id), - create_immediate_closure(function, std::forward<ArgsT>(args)...)); + Scheduler::instance()->send_closure<ActorSendType::Immediate>( + std::forward<ActorIdT>(actor_id), create_immediate_closure(function, std::forward<ArgsT>(args)...)); } template <class ActorIdT, class FunctionT, class... ArgsT> -void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) { +void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&...args) { using ActorT = typename std::decay_t<ActorIdT>::ActorT; using FunctionClassT = member_function_class_t<FunctionT>; static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure"); - Scheduler::instance()->send(std::forward<ActorIdT>(actor_id), - Event::delayed_closure(function, std::forward<ArgsT>(args)...), Send::later); + Scheduler::instance()->send<ActorSendType::Later>(std::forward<ActorIdT>(actor_id), + Event::delayed_closure(function, std::forward<ArgsT>(args)...)); } template <class... ArgsT> -void send_lambda(ActorRef actor_ref, ArgsT &&... args) { - Scheduler::instance()->send_lambda(actor_ref, std::forward<ArgsT>(args)...); +void send_lambda(ActorRef actor_ref, ArgsT &&...args) { + Scheduler::instance()->send_lambda<ActorSendType::Immediate>(actor_ref, std::forward<ArgsT>(args)...); } template <class... ArgsT> -void send_event(ActorRef actor_ref, ArgsT &&... args) { - Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)...); +void send_event(ActorRef actor_ref, ArgsT &&...args) { + Scheduler::instance()->send<ActorSendType::Immediate>(actor_ref, std::forward<ArgsT>(args)...); } template <class... ArgsT> -void send_event_later(ActorRef actor_ref, ArgsT &&... args) { - Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)..., Send::later); +void send_event_later(ActorRef actor_ref, ArgsT &&...args) { + Scheduler::instance()->send<ActorSendType::Later>(actor_ref, std::forward<ArgsT>(args)...); } -void yield_scheduler(); } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp index 479e419d62..38f2fc2e6f 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -13,19 +13,27 @@ #include "td/actor/impl/EventFull.h" #include "td/utils/common.h" +#include "td/utils/ExitGuard.h" #include "td/utils/format.h" #include "td/utils/List.h" #include "td/utils/logging.h" +#include "td/utils/misc.h" +#include "td/utils/MpscPollableQueue.h" #include "td/utils/ObjectPool.h" #include "td/utils/port/thread_local.h" +#include "td/utils/Promise.h" #include "td/utils/ScopeGuard.h" #include "td/utils/Time.h" #include <functional> +#include <iterator> +#include <memory> #include <utility> namespace td { +int VERBOSITY_NAME(actor) = VERBOSITY_NAME(DEBUG) + 10; + TD_THREAD_LOCAL Scheduler *Scheduler::scheduler_; // static zero-initialized TD_THREAD_LOCAL ActorContext *Scheduler::context_; // static zero-initialized @@ -49,6 +57,10 @@ void Scheduler::set_scheduler(Scheduler *scheduler) { scheduler_ = scheduler; } +void Scheduler::ServiceActor::set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues) { + inbound_ = std::move(queues); +} + void Scheduler::ServiceActor::start_up() { #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED CHECK(!inbound_); @@ -56,10 +68,11 @@ void Scheduler::ServiceActor::start_up() { if (!inbound_) { return; } +#if !TD_PORT_WINDOWS auto &fd = inbound_->reader_get_event_fd(); - - fd.get_fd().set_observer(this); - ::td::subscribe(fd.get_fd(), Fd::Read); + Scheduler::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read()); + subscribed_ = true; +#endif yield(); #endif } @@ -73,7 +86,11 @@ void Scheduler::ServiceActor::loop() { while (ready_n-- > 0) { EventFull event = queue->reader_get_unsafe(); if (event.actor_id().empty()) { - Scheduler::instance()->register_migrated_actor(static_cast<ActorInfo *>(event.data().data.ptr)); + if (event.data().empty()) { + Scheduler::instance()->yield(); + } else { + Scheduler::instance()->register_migrated_actor(static_cast<ActorInfo *>(event.data().data.ptr)); + } } else { VLOG(actor) << "Receive " << event.data(); finish_migrate(event.data()); @@ -84,10 +101,30 @@ void Scheduler::ServiceActor::loop() { yield(); } +void Scheduler::ServiceActor::tear_down() { + if (!subscribed_) { + return; + } +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + CHECK(!inbound_); +#else + if (!inbound_) { + return; + } + auto &fd = inbound_->reader_get_event_fd(); + Scheduler::unsubscribe(fd.get_poll_info().get_pollable_fd_ref()); + subscribed_ = false; +#endif +} + /*** SchedlerGuard ***/ -SchedulerGuard::SchedulerGuard(Scheduler *scheduler) : scheduler_(scheduler) { - CHECK(!scheduler_->has_guard_); - scheduler_->has_guard_ = true; +SchedulerGuard::SchedulerGuard(Scheduler *scheduler, bool lock) : scheduler_(scheduler) { + if (lock) { + // the next check can fail if OS killed the scheduler's thread without releasing the guard + CHECK(!scheduler_->has_guard_); + scheduler_->has_guard_ = true; + } + is_locked_ = lock; save_scheduler_ = Scheduler::instance(); Scheduler::set_scheduler(scheduler_); @@ -102,8 +139,10 @@ SchedulerGuard::~SchedulerGuard() { if (is_valid_.get()) { std::swap(save_context_, scheduler_->context()); Scheduler::set_scheduler(save_scheduler_); - CHECK(scheduler_->has_guard_); - scheduler_->has_guard_ = false; + if (is_locked_) { + CHECK(scheduler_->has_guard_); + scheduler_->has_guard_ = false; + } LOG_TAG = save_tag_; } } @@ -132,9 +171,11 @@ EventGuard::~EventGuard() { } info->finish_run(); swap_context(info); - CHECK(info->is_lite() || save_context_ == info->get_context()); + CHECK(!info->need_context() || save_context_ == info->get_context()); #ifdef TD_DEBUG - CHECK(info->is_lite() || save_log_tag2_ == info->get_name().c_str()); + LOG_CHECK(!info->need_context() || save_log_tag2_ == info->get_name().c_str()) + << info->need_context() << " " << info->empty() << " " << info->is_migrating() << " " << save_log_tag2_ << " " + << info->get_name() << " " << scheduler_->close_flag_; #endif if (event_context_.flags & Scheduler::EventContext::Stop) { scheduler_->do_stop_actor(info); @@ -148,7 +189,7 @@ EventGuard::~EventGuard() { void EventGuard::swap_context(ActorInfo *info) { std::swap(scheduler_->event_context_ptr_, event_context_ptr_); - if (info->is_lite()) { + if (!info->need_context()) { return; } @@ -180,11 +221,6 @@ void Scheduler::init(int32 id, std::vector<std::shared_ptr<MpscPollableQueue<Eve poll_.init(); -#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED - event_fd_.init(); - subscribe(event_fd_.get_fd(), Fd::Read); -#endif - if (!outbound.empty()) { inbound_queue_ = std::move(outbound[id]); } @@ -214,20 +250,12 @@ void Scheduler::clear() { auto actor_info = ActorInfo::from_list_node(ready_actors_list_.get()); do_stop_actor(actor_info); } - LOG_IF(FATAL, !ready_actors_list_.empty()) << ActorInfo::from_list_node(ready_actors_list_.next)->get_name(); - CHECK(ready_actors_list_.empty()); poll_.clear(); -#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED - if (!event_fd_.empty()) { - event_fd_.close(); - } -#endif - - if (callback_) { + if (callback_ && !ExitGuard::is_exited()) { // can't move lambda with unique_ptr inside into std::function auto ptr = actor_info_pool_.release(); - callback_->register_at_finish([=]() { delete ptr; }); + callback_->register_at_finish([ptr] { delete ptr; }); } else { actor_info_pool_.reset(); } @@ -236,59 +264,48 @@ void Scheduler::clear() { void Scheduler::do_event(ActorInfo *actor_info, Event &&event) { event_context_ptr_->link_token = event.link_token; auto actor = actor_info->get_actor_unsafe(); + VLOG(actor) << *actor_info << ' ' << event; switch (event.type) { - case Event::Type::Start: { - VLOG(actor) << *actor_info << " Event::Start"; + case Event::Type::Start: actor->start_up(); break; - } - case Event::Type::Stop: { - VLOG(actor) << *actor_info << " Event::Stop"; + case Event::Type::Stop: actor->tear_down(); break; - } - case Event::Type::Yield: { - VLOG(actor) << *actor_info << " Event::Yield"; + case Event::Type::Yield: actor->wakeup(); break; - } - case Event::Type::Hangup: { - auto token = get_link_token(actor); - VLOG(actor) << *actor_info << " Event::Hangup " << tag("token", format::as_hex(token)); - if (token != 0) { + case Event::Type::Hangup: + if (get_link_token(actor) != 0) { actor->hangup_shared(); } else { actor->hangup(); } break; - } - case Event::Type::Timeout: { - VLOG(actor) << *actor_info << " Event::Timeout"; + case Event::Type::Timeout: actor->timeout_expired(); break; - } - case Event::Type::Raw: { - VLOG(actor) << *actor_info << " Event::Raw"; + case Event::Type::Raw: actor->raw_event(event.data); break; - } - case Event::Type::Custom: { - do_custom_event(actor_info, *event.data.custom_event); + case Event::Type::Custom: + event.data.custom_event->run(actor); break; - } - case Event::Type::NoType: { + case Event::Type::NoType: + default: UNREACHABLE(); break; - } } - // can't clear event here. It may be already destroyed during destory_actor + // can't clear event here. It may be already destroyed during destroy_actor } void Scheduler::register_migrated_actor(ActorInfo *actor_info) { VLOG(actor) << "Register migrated actor: " << tag("name", *actor_info) << tag("ptr", actor_info) << tag("actor_count", actor_count_); actor_count_++; - CHECK(actor_info->is_migrating()); + LOG_CHECK(actor_info->is_migrating()) << *actor_info << ' ' << actor_count_ << ' ' << sched_id_ << ' ' + << actor_info->migrate_dest() << ' ' << actor_info->is_running() << ' ' + << close_flag_; CHECK(sched_id_ == actor_info->migrate_dest()); // CHECK(!actor_info->is_running()); actor_info->finish_migrate(); @@ -297,8 +314,8 @@ void Scheduler::register_migrated_actor(ActorInfo *actor_info) { } auto it = pending_events_.find(actor_info); if (it != pending_events_.end()) { - actor_info->mailbox_.insert(actor_info->mailbox_.end(), make_move_iterator(begin(it->second)), - make_move_iterator(end(it->second))); + actor_info->mailbox_.insert(actor_info->mailbox_.end(), std::make_move_iterator(it->second.begin()), + std::make_move_iterator(it->second.end())); pending_events_.erase(it); } if (actor_info->mailbox_.empty()) { @@ -323,6 +340,43 @@ void Scheduler::send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_i } } +void Scheduler::run_on_scheduler(int32 sched_id, Promise<Unit> action) { + if (sched_id >= 0 && sched_id_ != sched_id) { + class Worker final : public Actor { + public: + explicit Worker(Promise<Unit> action) : action_(std::move(action)) { + } + + private: + Promise<Unit> action_; + + void start_up() final { + action_.set_value(Unit()); + stop(); + } + }; + create_actor_on_scheduler<Worker>("RunOnSchedulerWorker", sched_id, std::move(action)).release(); + return; + } + + action.set_value(Unit()); +} + +void Scheduler::destroy_on_scheduler_impl(int32 sched_id, Promise<Unit> action) { + auto empty_context = std::make_shared<ActorContext>(); + empty_context->this_ptr_ = empty_context; + ActorContext *current_context = context_; + context_ = empty_context.get(); + + const char *current_tag = LOG_TAG; + LOG_TAG = nullptr; + + run_on_scheduler(sched_id, std::move(action)); + + context_ = current_context; + LOG_TAG = current_tag; +} + void Scheduler::add_to_mailbox(ActorInfo *actor_info, Event &&event) { if (!actor_info->is_running()) { auto node = actor_info->get_list_node(); @@ -338,9 +392,9 @@ void Scheduler::do_stop_actor(Actor *actor) { } void Scheduler::do_stop_actor(ActorInfo *actor_info) { CHECK(!actor_info->is_migrating()); - CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_; + LOG_CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_; ObjectPool<ActorInfo>::OwnerPtr owner_ptr; - if (!actor_info->is_lite()) { + if (actor_info->need_start_up()) { EventGuard guard(this, actor_info); do_event(actor_info, Event::stop()); owner_ptr = actor_info->get_actor_unsafe()->clear(); @@ -349,6 +403,7 @@ void Scheduler::do_stop_actor(ActorInfo *actor_info) { event_context_ptr_->flags = 0; } else { owner_ptr = actor_info->get_actor_unsafe()->clear(); + actor_info->destroy_actor(); } destroy_actor(actor_info); } @@ -382,6 +437,7 @@ void Scheduler::do_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) { void Scheduler::start_migrate_actor(Actor *actor, int32 dest_sched_id) { start_migrate_actor(actor->get_info(), dest_sched_id); } + void Scheduler::start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) { VLOG(actor) << "Start migrate actor: " << tag("name", actor_info) << tag("ptr", actor_info) << tag("actor_count", actor_count_); @@ -396,6 +452,11 @@ void Scheduler::start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) cancel_actor_timeout(actor_info); } +double Scheduler::get_actor_timeout(const ActorInfo *actor_info) const { + const HeapNode *heap_node = actor_info->get_heap_node(); + return heap_node->in_heap() ? timeout_queue_.get_key(heap_node) - Time::now() : 0.0; +} + void Scheduler::set_actor_timeout_in(ActorInfo *actor_info, double timeout) { if (timeout > 1e10) { timeout = 1e10; @@ -403,13 +464,13 @@ void Scheduler::set_actor_timeout_in(ActorInfo *actor_info, double timeout) { if (timeout < 0) { timeout = 0; } - double expire_at = Time::now() + timeout; - set_actor_timeout_at(actor_info, expire_at); + double expires_at = Time::now() + timeout; + set_actor_timeout_at(actor_info, expires_at); } void Scheduler::set_actor_timeout_at(ActorInfo *actor_info, double timeout_at) { HeapNode *heap_node = actor_info->get_heap_node(); - VLOG(actor) << "set actor " << *actor_info << " " << tag("timeout", timeout_at) << timeout_at - Time::now_cached(); + VLOG(actor) << "Set actor " << *actor_info << " timeout in " << timeout_at - Time::now_cached(); if (heap_node->in_heap()) { timeout_queue_.fix(timeout_at, heap_node); } else { @@ -417,21 +478,20 @@ void Scheduler::set_actor_timeout_at(ActorInfo *actor_info, double timeout_at) { } } -void Scheduler::run_poll(double timeout) { - // LOG(DEBUG) << "run poll [timeout:" << format::as_time(timeout) << "]"; +void Scheduler::run_poll(Timestamp timeout) { // we can't wait for less than 1ms - poll_.run(static_cast<int32>(timeout * 1000 + 1)); - -#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED - if (can_read(event_fd_.get_fd())) { - std::atomic_thread_fence(std::memory_order_acquire); - event_fd_.acquire(); - } + auto timeout_ms = static_cast<int>(clamp(timeout.in(), 0.0, 1000000.0) * 1000 + 1); +#if TD_PORT_WINDOWS + CHECK(inbound_queue_); + inbound_queue_->reader_get_event_fd().wait(timeout_ms); + service_actor_.notify(); +#elif TD_PORT_POSIX + poll_.run(timeout_ms); #endif } void Scheduler::run_mailbox() { - VLOG(actor) << "run mailbox : begin"; + VLOG(actor) << "Run mailbox : begin"; ListNode actors_list = std::move(ready_actors_list_); while (!actors_list.empty()) { ListNode *node = actors_list.get(); @@ -440,7 +500,7 @@ void Scheduler::run_mailbox() { inc_wait_generation(); flush_mailbox(actor_info, static_cast<void (*)(ActorInfo *)>(nullptr), static_cast<Event (*)()>(nullptr)); } - VLOG(actor) << "run mailbox : finish " << actor_count_; + VLOG(actor) << "Run mailbox : finish " << actor_count_; //Useful for debug, but O(ActorsCount) check @@ -457,40 +517,54 @@ void Scheduler::run_mailbox() { //LOG(ERROR) << *actor_info; //cnt++; //} - //CHECK(cnt == actor_count_) << cnt << " vs " << actor_count_; + //LOG_CHECK(cnt == actor_count_) << cnt << " vs " << actor_count_; } -double Scheduler::run_timeout() { +Timestamp Scheduler::run_timeout() { double now = Time::now(); + //TODO: use Timestamp().is_in_past() while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) { HeapNode *node = timeout_queue_.pop(); ActorInfo *actor_info = ActorInfo::from_heap_node(node); inc_wait_generation(); - send(actor_info->actor_id(), Event::timeout(), Send::immediate); - } - if (timeout_queue_.empty()) { - return 10000; + send<ActorSendType::Immediate>(actor_info->actor_id(), Event::timeout()); } - double timeout = timeout_queue_.top_key() - now; - // LOG(DEBUG) << "Timeout [cnt:" << timeout_queue_.size() << "] in " << format::as_time(timeout); - return timeout; + return get_timeout(); } -void Scheduler::run_no_guard(double timeout) { +Timestamp Scheduler::run_events(Timestamp timeout) { + Timestamp res; + VLOG(actor) << "Run events " << sched_id_ << " " << tag("pending", pending_events_.size()) + << tag("actors", actor_count_); + do { + run_mailbox(); + res = run_timeout(); + } while (!ready_actors_list_.empty() && !timeout.is_in_past()); + return res; +} + +void Scheduler::run_no_guard(Timestamp timeout) { CHECK(has_guard_); SCOPE_EXIT { yield_flag_ = false; }; - double next_timeout = run_events(); - if (next_timeout < timeout) { - timeout = next_timeout; - } + timeout.relax(run_events(timeout)); if (yield_flag_) { return; } run_poll(timeout); - run_events(); + run_events(timeout); +} + +Timestamp Scheduler::get_timeout() { + if (!ready_actors_list_.empty()) { + return Timestamp::in(0); + } + if (timeout_queue_.empty()) { + return Timestamp::in(10000); + } + return Timestamp::at(timeout_queue_.top_key()); } } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h index 7edf3f1d2d..d4d075785f 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -9,26 +9,22 @@ #include "td/actor/impl/ActorInfo-decl.h" #include "td/actor/impl/Scheduler-decl.h" -#include "td/utils/format.h" +#include "td/utils/common.h" #include "td/utils/Heap.h" #include "td/utils/logging.h" -#include "td/utils/MpscPollableQueue.h" #include "td/utils/ObjectPool.h" -#include "td/utils/port/Fd.h" +#include "td/utils/port/detail/PollableFd.h" +#include "td/utils/port/PollFlags.h" +#include "td/utils/Promise.h" #include "td/utils/Slice.h" +#include "td/utils/Time.h" #include <atomic> -#include <memory> #include <tuple> #include <utility> namespace td { -/*** ServiceActor ***/ -inline void Scheduler::ServiceActor::set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues) { - inbound_ = std::move(queues); -} - /*** EventGuard ***/ class EventGuard { public: @@ -58,6 +54,9 @@ class EventGuard { inline SchedulerGuard Scheduler::get_guard() { return SchedulerGuard(this); } +inline SchedulerGuard Scheduler::get_const_guard() { + return SchedulerGuard(this, false); +} inline void Scheduler::init() { init(0, {}, nullptr); @@ -71,12 +70,12 @@ inline int32 Scheduler::sched_count() const { } template <class ActorT, class... Args> -ActorOwn<ActorT> Scheduler::create_actor(Slice name, Args &&... args) { +ActorOwn<ActorT> Scheduler::create_actor(Slice name, Args &&...args) { return register_actor_impl(name, new ActorT(std::forward<Args>(args)...), Actor::Deleter::Destroy, sched_id_); } template <class ActorT, class... Args> -ActorOwn<ActorT> Scheduler::create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args) { +ActorOwn<ActorT> Scheduler::create_actor_on_scheduler(Slice name, int32 sched_id, Args &&...args) { return register_actor_impl(name, new ActorT(std::forward<Args>(args)...), Actor::Deleter::Destroy, sched_id); } @@ -92,29 +91,31 @@ ActorOwn<ActorT> Scheduler::register_actor(Slice name, unique_ptr<ActorT> actor_ template <class ActorT> ActorOwn<ActorT> Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id) { + CHECK(has_guard_); if (sched_id == -1) { sched_id = sched_id_; } #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED sched_id = 0; #endif - CHECK(sched_id == sched_id_ || (0 <= sched_id && sched_id < static_cast<int32>(outbound_queues_.size()))) << sched_id; + LOG_CHECK(sched_id == sched_id_ || (0 <= sched_id && sched_id < static_cast<int32>(outbound_queues_.size()))) + << sched_id; auto info = actor_info_pool_->create_empty(); - VLOG(actor) << "Create actor: " << tag("name", name) << tag("ptr", *info) << tag("context", context()) - << tag("this", this) << tag("actor_count", actor_count_); actor_count_++; auto weak_info = info.get_weak(); auto actor_info = info.get(); - info->init(sched_id_, name, std::move(info), static_cast<Actor *>(actor_ptr), deleter, ActorTraits<ActorT>::is_lite); + actor_info->init(sched_id_, name, std::move(info), static_cast<Actor *>(actor_ptr), deleter, + ActorTraits<ActorT>::need_context, ActorTraits<ActorT>::need_start_up); + VLOG(actor) << "Create actor " << *actor_info << " (actor_count = " << actor_count_ << ')'; ActorId<ActorT> actor_id = weak_info->actor_id(actor_ptr); if (sched_id != sched_id_) { - send(actor_id, Event::start(), Send::later_weak); + send<ActorSendType::LaterWeak>(actor_id, Event::start()); do_migrate_actor(actor_info, sched_id); } else { pending_actors_list_.put(weak_info->get_list_node()); - if (!ActorTraits<ActorT>::is_lite) { - send(actor_id, Event::start(), Send::later_weak); + if (ActorTraits<ActorT>::need_start_up) { + send<ActorSendType::LaterWeak>(actor_id, Event::start()); } } @@ -122,7 +123,7 @@ ActorOwn<ActorT> Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, A } template <class ActorT> -ActorOwn<ActorT> Scheduler::register_existing_actor(std::unique_ptr<ActorT> actor_ptr) { +ActorOwn<ActorT> Scheduler::register_existing_actor(unique_ptr<ActorT> actor_ptr) { CHECK(!actor_ptr->empty()); auto actor_info = actor_ptr->get_info(); CHECK(actor_info->migrate_dest_flag_atomic().first == sched_id_); @@ -130,10 +131,9 @@ ActorOwn<ActorT> Scheduler::register_existing_actor(std::unique_ptr<ActorT> acto } inline void Scheduler::destroy_actor(ActorInfo *actor_info) { - VLOG(actor) << "Destroy actor: " << tag("name", *actor_info) << tag("ptr", actor_info) - << tag("actor_count", actor_count_); + VLOG(actor) << "Destroy actor " << *actor_info << " (actor_count = " << actor_count_ << ')'; - CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_; + LOG_CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_; cancel_actor_timeout(actor_info); actor_info->get_list_node()->remove(); // called by ObjectPool @@ -142,11 +142,6 @@ inline void Scheduler::destroy_actor(ActorInfo *actor_info) { CHECK(actor_count_ >= 0); } -inline void Scheduler::do_custom_event(ActorInfo *actor_info, CustomEvent &event) { - VLOG(actor) << *actor_info << " Event::Custom"; - event.run(actor_info->get_actor_unsafe()); -} - template <class RunFuncT, class EventFuncT> void Scheduler::flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func) { auto &mailbox = actor_info->mailbox_; @@ -161,13 +156,13 @@ void Scheduler::flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, c if (guard.can_run()) { (*run_func)(actor_info); } else { - mailbox.insert(begin(mailbox) + i, (*event_func)()); + mailbox.insert(mailbox.begin() + i, (*event_func)()); } } - mailbox.erase(begin(mailbox), begin(mailbox) + i); + mailbox.erase(mailbox.begin(), mailbox.begin() + i); } -inline void Scheduler::send_to_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event) { +inline void Scheduler::send_to_scheduler(int32 sched_id, const ActorId<Actor> &actor_id, Event &&event) { if (sched_id == sched_id_) { ActorInfo *actor_info = actor_id.get_actor_info(); pending_events_[actor_info].push_back(std::move(event)); @@ -176,21 +171,34 @@ inline void Scheduler::send_to_scheduler(int32 sched_id, const ActorId<> &actor_ } } +template <class T> +void Scheduler::destroy_on_scheduler(int32 sched_id, T &value) { + if (!value.empty()) { + destroy_on_scheduler_impl(sched_id, PromiseCreator::lambda([value = std::move(value)](Unit) { + // destroy value + })); + } +} + +template <class... ArgsT> +void Scheduler::destroy_on_scheduler(int32 sched_id, ArgsT &...values) { + destroy_on_scheduler_impl(sched_id, PromiseCreator::lambda([values = std::make_tuple(std::move(values)...)](Unit) { + // destroy values + })); +} + inline void Scheduler::before_tail_send(const ActorId<> &actor_id) { // TODO } inline void Scheduler::inc_wait_generation() { - wait_generation_++; + wait_generation_ += 2; } -template <class RunFuncT, class EventFuncT> -void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const RunFuncT &run_func, - const EventFuncT &event_func) { - CHECK(has_guard_); +template <ActorSendType send_type, class RunFuncT, class EventFuncT> +void Scheduler::send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func) { ActorInfo *actor_info = actor_id.get_actor_info(); if (unlikely(actor_info == nullptr || close_flag_)) { - // LOG(ERROR) << "Invalid actor id"; return; } @@ -199,8 +207,9 @@ void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const Ru bool is_migrating; std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic(); bool on_current_sched = !is_migrating && sched_id_ == actor_sched_id; + CHECK(has_guard_ || !on_current_sched); - if (likely(!(flags & Send::later) && !(flags & Send::later_weak) && on_current_sched && !actor_info->is_running() && + if (likely(send_type == ActorSendType::Immediate && on_current_sched && !actor_info->is_running() && !actor_info->must_wait(wait_generation_))) { // run immediately if (likely(actor_info->mailbox_.empty())) { EventGuard guard(this, actor_info); @@ -211,7 +220,7 @@ void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const Ru } else { if (on_current_sched) { add_to_mailbox(actor_info, event_func()); - if (flags & Send::later) { + if (send_type == ActorSendType::Later) { actor_info->set_wait_generation(wait_generation_); } } else { @@ -220,57 +229,61 @@ void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const Ru } } -template <class EventT> -void Scheduler::send_lambda(ActorRef actor_ref, EventT &&lambda, Send::Flags flags) { - return send_impl(actor_ref.get(), flags, - [&](ActorInfo *actor_info) { - event_context_ptr_->link_token = actor_ref.token(); - lambda(); - }, - [&]() { - auto event = Event::lambda(std::forward<EventT>(lambda)); - event.set_link_token(actor_ref.token()); - return std::move(event); - }); -} - -template <class EventT> -void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure, Send::Flags flags) { - return send_impl(actor_ref.get(), flags, - [&](ActorInfo *actor_info) { - event_context_ptr_->link_token = actor_ref.token(); - closure.run(static_cast<typename EventT::ActorType *>(actor_info->get_actor_unsafe())); - }, - [&]() { - auto event = Event::immediate_closure(std::forward<EventT>(closure)); - event.set_link_token(actor_ref.token()); - return std::move(event); - }); -} - -inline void Scheduler::send(ActorRef actor_ref, Event &&event, Send::Flags flags) { +template <ActorSendType send_type, class EventT> +void Scheduler::send_lambda(ActorRef actor_ref, EventT &&lambda) { + return send_impl<send_type>( + actor_ref.get(), + [&](ActorInfo *actor_info) { + event_context_ptr_->link_token = actor_ref.token(); + lambda(); + }, + [&] { + auto event = Event::lambda(std::forward<EventT>(lambda)); + event.set_link_token(actor_ref.token()); + return event; + }); +} + +template <ActorSendType send_type, class EventT> +void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure) { + return send_impl<send_type>( + actor_ref.get(), + [&](ActorInfo *actor_info) { + event_context_ptr_->link_token = actor_ref.token(); + closure.run(static_cast<typename EventT::ActorType *>(actor_info->get_actor_unsafe())); + }, + [&] { + auto event = Event::immediate_closure(std::forward<EventT>(closure)); + event.set_link_token(actor_ref.token()); + return event; + }); +} + +template <ActorSendType send_type> +void Scheduler::send(ActorRef actor_ref, Event &&event) { event.set_link_token(actor_ref.token()); - return send_impl(actor_ref.get(), flags, [&](ActorInfo *actor_info) { do_event(actor_info, std::move(event)); }, - [&]() { return std::move(event); }); + return send_impl<send_type>( + actor_ref.get(), [&](ActorInfo *actor_info) { do_event(actor_info, std::move(event)); }, + [&] { return std::move(event); }); } -inline void Scheduler::subscribe(const Fd &fd, Fd::Flags flags) { - poll_.subscribe(fd, flags); +inline void Scheduler::subscribe(PollableFd fd, PollFlags flags) { + instance()->poll_.subscribe(std::move(fd), flags); } -inline void Scheduler::unsubscribe(const Fd &fd) { - poll_.unsubscribe(fd); +inline void Scheduler::unsubscribe(PollableFdRef fd) { + instance()->poll_.unsubscribe(std::move(fd)); } -inline void Scheduler::unsubscribe_before_close(const Fd &fd) { - poll_.unsubscribe_before_close(fd); +inline void Scheduler::unsubscribe_before_close(PollableFdRef fd) { + instance()->poll_.unsubscribe_before_close(std::move(fd)); } inline void Scheduler::yield_actor(Actor *actor) { yield_actor(actor->get_info()); } inline void Scheduler::yield_actor(ActorInfo *actor_info) { - send(actor_info->actor_id(), Event::yield(), Send::later_weak); + send<ActorSendType::LaterWeak>(actor_info->actor_id(), Event::yield()); } inline void Scheduler::stop_actor(Actor *actor) { @@ -285,7 +298,7 @@ inline uint64 Scheduler::get_link_token(Actor *actor) { return get_link_token(actor->get_info()); } inline uint64 Scheduler::get_link_token(ActorInfo *actor_info) { - CHECK(event_context_ptr_->actor_info == actor_info); + LOG_CHECK(event_context_ptr_->actor_info == actor_info) << actor_info->get_name(); return event_context_ptr_->link_token; } @@ -293,8 +306,8 @@ inline void Scheduler::finish_migrate_actor(Actor *actor) { register_migrated_actor(actor->get_info()); } -inline bool Scheduler::has_actor_timeout(const Actor *actor) const { - return has_actor_timeout(actor->get_info()); +inline double Scheduler::get_actor_timeout(const Actor *actor) const { + return get_actor_timeout(actor->get_info()); } inline void Scheduler::set_actor_timeout_in(Actor *actor, double timeout) { set_actor_timeout_in(actor->get_info(), timeout); @@ -306,11 +319,6 @@ inline void Scheduler::cancel_actor_timeout(Actor *actor) { cancel_actor_timeout(actor->get_info()); } -inline bool Scheduler::has_actor_timeout(const ActorInfo *actor_info) const { - const HeapNode *heap_node = actor_info->get_heap_node(); - return heap_node->in_heap(); -} - inline void Scheduler::cancel_actor_timeout(ActorInfo *actor_info) { HeapNode *heap_node = actor_info->get_heap_node(); if (heap_node->in_heap()) { @@ -332,46 +340,23 @@ inline void Scheduler::yield() { inline void Scheduler::wakeup() { std::atomic_thread_fence(std::memory_order_release); #if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED - event_fd_.release(); + inbound_queue_->writer_put({}); #endif } -inline double Scheduler::run_events() { - double res; - VLOG(actor) << "run events " << sched_id_ << " " << tag("pending", pending_events_.size()) - << tag("actors", actor_count_); - do { - run_mailbox(); - res = run_timeout(); - } while (!ready_actors_list_.empty()); - return res; -} - -inline void Scheduler::run(double timeout) { +inline void Scheduler::run(Timestamp timeout) { auto guard = get_guard(); run_no_guard(timeout); } /*** Interface to current scheduler ***/ -inline void subscribe(const Fd &fd, Fd::Flags flags) { - Scheduler::instance()->subscribe(fd, flags); -} - -inline void unsubscribe(const Fd &fd) { - Scheduler::instance()->unsubscribe(fd); -} - -inline void unsubscribe_before_close(const Fd &fd) { - Scheduler::instance()->unsubscribe_before_close(fd); -} - template <class ActorT, class... Args> -ActorOwn<ActorT> create_actor(Slice name, Args &&... args) { +ActorOwn<ActorT> create_actor(Slice name, Args &&...args) { return Scheduler::instance()->create_actor<ActorT>(name, std::forward<Args>(args)...); } template <class ActorT, class... Args> -ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args) { +ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&...args) { return Scheduler::instance()->create_actor_on_scheduler<ActorT>(name, sched_id, std::forward<Args>(args)...); } @@ -390,8 +375,4 @@ ActorOwn<ActorT> register_existing_actor(unique_ptr<ActorT> actor_ptr) { return Scheduler::instance()->register_existing_actor(std::move(actor_ptr)); } -inline void yield_scheduler() { - Scheduler::instance()->yield(); -} - } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h deleted file mode 100644 index 2cb5cb2127..0000000000 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h +++ /dev/null @@ -1,117 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#pragma once - -#include "td/actor/impl2/ActorSignals.h" -#include "td/actor/impl2/ActorState.h" - -#include "td/utils/logging.h" - -#include <atomic> - -namespace td { -namespace actor2 { -class ActorLocker { - public: - struct Options { - Options() { - } - bool can_execute_paused = false; - bool is_shared = true; - Options &with_can_execute_paused(bool new_can_execute_paused) { - can_execute_paused = new_can_execute_paused; - return *this; - } - Options &with_is_shared(bool new_is_shared) { - is_shared = new_is_shared; - return *this; - } - }; - explicit ActorLocker(ActorState *state, Options options = {}) - : state_(state), flags_(state->get_flags_unsafe()), new_flags_{}, options_{options} { - } - bool try_lock() { - CHECK(!own_lock()); - while (!can_try_add_signals()) { - new_flags_ = flags_; - new_flags_.set_locked(true); - new_flags_.clear_signals(); - if (state_->state_.compare_exchange_strong(flags_.raw_ref(), new_flags_.raw(), std::memory_order_acq_rel)) { - own_lock_ = true; - return true; - } - } - return false; - } - bool try_unlock(ActorState::Flags flags) { - CHECK(!flags.is_locked()); - CHECK(own_lock()); - // can't unlock with signals set - //CHECK(!flags.has_signals()); - - flags_ = flags; - //try unlock - if (state_->state_.compare_exchange_strong(new_flags_.raw_ref(), flags.raw(), std::memory_order_acq_rel)) { - own_lock_ = false; - return true; - } - - // read all signals - flags.set_locked(true); - flags.clear_signals(); - do { - flags_.add_signals(new_flags_.get_signals()); - } while (!state_->state_.compare_exchange_strong(new_flags_.raw_ref(), flags.raw(), std::memory_order_acq_rel)); - new_flags_ = flags; - return false; - } - - bool try_add_signals(ActorSignals signals) { - CHECK(!own_lock()); - CHECK(can_try_add_signals()); - new_flags_ = flags_; - new_flags_.add_signals(signals); - return state_->state_.compare_exchange_strong(flags_.raw_ref(), new_flags_.raw(), std::memory_order_acq_rel); - } - bool add_signals(ActorSignals signals) { - CHECK(!own_lock()); - while (true) { - if (can_try_add_signals()) { - if (try_add_signals(signals)) { - return false; - } - } else { - if (try_lock()) { - flags_.add_signals(signals); - return true; - } - } - } - } - bool own_lock() const { - return own_lock_; - } - ActorState::Flags flags() const { - return flags_; - } - bool can_execute() const { - return flags_.is_shared() == options_.is_shared && (options_.can_execute_paused || !flags_.is_pause()); - } - - private: - ActorState *state_{nullptr}; - ActorState::Flags flags_; - ActorState::Flags new_flags_; - bool own_lock_{false}; - Options options_; - - bool can_try_add_signals() const { - return flags_.is_locked() || (flags_.is_in_queue() && !can_execute()); - } -}; -} // namespace actor2 -} // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h deleted file mode 100644 index b7a7483022..0000000000 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h +++ /dev/null @@ -1,84 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#pragma once - -#include "td/utils/common.h" - -namespace td { -namespace actor2 { -class ActorSignals { - public: - ActorSignals() = default; - uint32 raw() const { - return raw_; - } - bool empty() const { - return raw_ == 0; - } - bool has_signal(uint32 signal) const { - return (raw_ & (1u << signal)) != 0; - } - void add_signal(uint32 signal) { - raw_ |= (1u << signal); - } - void add_signals(ActorSignals signals) { - raw_ |= signals.raw(); - } - void clear_signal(uint32 signal) { - raw_ &= ~(1u << signal); - } - uint32 first_signal() { - if (!raw_) { - return 0; - } -#if TD_MSVC - int res = 0; - int bit = 1; - while ((raw_ & bit) == 0) { - res++; - bit <<= 1; - } - return res; -#else - return __builtin_ctz(raw_); -#endif - } - enum Signal : uint32 { - // Signals in order of priority - Wakeup = 1, - Alarm = 2, - Kill = 3, // immediate kill - Io = 4, // move to io thread - Cpu = 5, // move to cpu thread - StartUp = 6, - TearDown = 7, - // Two signals for mpmc queue logic - // - // PopSignal is set after actor is popped from queue - // When processed it should set InQueue and Pause flags to false. - // - // MessagesSignal is set after new messages was added to actor - // If owner of actor wish to delay message handling, she should set InQueue flag to true and - // add actor into mpmc queue. - Pop = 8, // got popped from queue - Message = 9, // got new message - }; - - static ActorSignals one(uint32 signal) { - ActorSignals res; - res.add_signal(signal); - return res; - } - - private: - uint32 raw_{0}; - friend class ActorState; - explicit ActorSignals(uint32 raw) : raw_(raw) { - } -}; -} // namespace actor2 -} // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorState.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorState.h deleted file mode 100644 index 02ead6bcf6..0000000000 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorState.h +++ /dev/null @@ -1,166 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#pragma once - -#include "td/actor/impl2/ActorSignals.h" -#include "td/actor/impl2/SchedulerId.h" - -#include "td/utils/common.h" - -#include <atomic> - -namespace td { -namespace actor2 { -class ActorState { - public: - class Flags { - public: - Flags() = default; - uint32 raw() const { - return raw_; - } - uint32 &raw_ref() { - return raw_; - } - SchedulerId get_scheduler_id() const { - return SchedulerId{static_cast<uint8>(raw_ & SchedulerMask)}; - } - void set_scheduler_id(SchedulerId id) { - raw_ = (raw_ & ~SchedulerMask) | id.value(); - } - - bool is_shared() const { - return check_flag(SharedFlag); - } - void set_shared(bool shared) { - set_flag(SharedFlag, shared); - } - - bool is_locked() const { - return check_flag(LockFlag); - } - void set_locked(bool locked) { - set_flag(LockFlag, locked); - } - - bool is_migrate() const { - return check_flag(MigrateFlag); - } - void set_migrate(bool migrate) { - set_flag(MigrateFlag, migrate); - } - - bool is_pause() const { - return check_flag(PauseFlag); - } - void set_pause(bool pause) { - set_flag(PauseFlag, pause); - } - - bool is_closed() const { - return check_flag(ClosedFlag); - } - void set_closed(bool closed) { - set_flag(ClosedFlag, closed); - } - - bool is_in_queue() const { - return check_flag(InQueueFlag); - } - void set_in_queue(bool in_queue) { - set_flag(InQueueFlag, in_queue); - } - - bool has_signals() const { - return check_flag(SignalMask); - } - void clear_signals() { - set_flag(SignalMask, false); - } - void set_signals(ActorSignals signals) { - raw_ = (raw_ & ~SignalMask) | (signals.raw() << SignalOffset); - } - void add_signals(ActorSignals signals) { - raw_ = raw_ | (signals.raw() << SignalOffset); - } - ActorSignals get_signals() const { - return ActorSignals{(raw_ & SignalMask) >> SignalOffset}; - } - - private: - uint32 raw_{0}; - - friend class ActorState; - Flags(uint32 raw) : raw_(raw) { - } - - bool check_flag(uint32 mask) const { - return (raw_ & mask) != 0; - } - void set_flag(uint32 mask, bool flag) { - raw_ = (raw_ & ~mask) | (flag * mask); - } - }; - - Flags get_flags_unsafe() { - return Flags(state_.load(std::memory_order_relaxed)); - } - void set_flags_unsafe(Flags flags) { - state_.store(flags.raw(), std::memory_order_relaxed); - } - - private: - friend class ActorLocker; - std::atomic<uint32> state_{0}; - enum : uint32 { - SchedulerMask = 255, - - // Actors can be shared or not. - // If actor is shared, than any thread may try to lock it - // If actor is not shared, than it is owned by its scheduler, and only - // its scheduler is allowed to access it - // This flag may NOT change during the lifetime of an actor - SharedFlag = 1 << 9, - - // Only shared actors need lock - // Lock if somebody is going to unlock it eventually. - // For example actor is locked, when some scheduler is executing its mailbox - // Or it is locked when it is in Mpmc queue, so someone will pop it eventually. - LockFlag = 1 << 10, - - // While actor is migrating from one scheduler to another no one is allowed to change it - // Could not be set for shared actors. - MigrateFlag = 1 << 11, - - // While set all messages are delayed - // Dropped from flush_maibox - // PauseFlag => InQueueFlag - PauseFlag = 1 << 12, - - ClosedFlag = 1 << 13, - - InQueueFlag = 1 << 14, - - // Signals - SignalOffset = 15, - Signal = 1 << SignalOffset, - WakeupSignalFlag = Signal << ActorSignals::Wakeup, - AlarmSignalFlag = Signal << ActorSignals::Alarm, - KillSignalFlag = Signal << ActorSignals::Kill, // immediate kill - IoSignalFlag = Signal << ActorSignals::Io, // move to io thread - CpuSignalFlag = Signal << ActorSignals::Cpu, // move to cpu thread - StartUpSignalFlag = Signal << ActorSignals::StartUp, - TearDownSignalFlag = Signal << ActorSignals::TearDown, - MessageSignalFlag = Signal << ActorSignals::Message, - PopSignalFlag = Signal << ActorSignals::Pop, - - SignalMask = WakeupSignalFlag | AlarmSignalFlag | KillSignalFlag | IoSignalFlag | CpuSignalFlag | - StartUpSignalFlag | TearDownSignalFlag | MessageSignalFlag | PopSignalFlag - }; -}; -} // namespace actor2 -} // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp deleted file mode 100644 index 720bf6bc4f..0000000000 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp +++ /dev/null @@ -1,11 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#include "td/actor/impl2/Scheduler.h" - -namespace td { -namespace actor2 {} -} // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.h deleted file mode 100644 index 9d5783b165..0000000000 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.h +++ /dev/null @@ -1,1508 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#pragma once - -#include "td/actor/impl2/ActorLocker.h" -#include "td/actor/impl2/SchedulerId.h" - -#include "td/utils/Closure.h" -#include "td/utils/common.h" -#include "td/utils/format.h" -#include "td/utils/Heap.h" -#include "td/utils/List.h" -#include "td/utils/logging.h" -#include "td/utils/MpmcQueue.h" -#include "td/utils/MpmcWaiter.h" -#include "td/utils/MpscLinkQueue.h" -#include "td/utils/MpscPollableQueue.h" -#include "td/utils/port/Fd.h" -#include "td/utils/port/Poll.h" -#include "td/utils/port/thread.h" -#include "td/utils/port/thread_local.h" -#include "td/utils/ScopeGuard.h" -#include "td/utils/SharedObjectPool.h" -#include "td/utils/Slice.h" -#include "td/utils/Time.h" -#include "td/utils/type_traits.h" - -#include <atomic> -#include <condition_variable> -#include <limits> -#include <memory> -#include <mutex> -#include <type_traits> -#include <utility> - -namespace td { -namespace actor2 { -class Actor; - -template <class Impl> -class Context { - public: - static Impl *get() { - return context_; - } - class Guard { - public: - explicit Guard(Impl *new_context) { - old_context_ = context_; - context_ = new_context; - } - ~Guard() { - context_ = old_context_; - } - Guard(const Guard &) = delete; - Guard &operator=(const Guard &) = delete; - Guard(Guard &&) = delete; - Guard &operator=(Guard &&) = delete; - - private: - Impl *old_context_; - }; - - private: - static TD_THREAD_LOCAL Impl *context_; -}; - -template <class Impl> -TD_THREAD_LOCAL Impl *Context<Impl>::context_; - -enum : uint64 { EmptyLinkToken = std::numeric_limits<uint64>::max() }; - -class ActorExecuteContext : public Context<ActorExecuteContext> { - public: - explicit ActorExecuteContext(Actor *actor, Timestamp alarm_timestamp = Timestamp::never()) - : actor_(actor), alarm_timestamp_(alarm_timestamp) { - } - Actor &actor() const { - CHECK(actor_); - return *actor_; - } - bool has_flags() const { - return flags_ != 0; - } - void set_stop() { - flags_ |= 1 << Stop; - } - bool get_stop() const { - return (flags_ & (1 << Stop)) != 0; - } - void set_pause() { - flags_ |= 1 << Pause; - } - bool get_pause() const { - return (flags_ & (1 << Pause)) != 0; - } - void clear_actor() { - actor_ = nullptr; - } - void set_link_token(uint64 link_token) { - link_token_ = link_token; - } - uint64 get_link_token() const { - return link_token_; - } - Timestamp &alarm_timestamp() { - flags_ |= 1 << Alarm; - return alarm_timestamp_; - } - bool get_alarm_flag() const { - return (flags_ & (1 << Alarm)) != 0; - } - Timestamp get_alarm_timestamp() const { - return alarm_timestamp_; - } - - private: - Actor *actor_; - uint32 flags_{0}; - uint64 link_token_{EmptyLinkToken}; - Timestamp alarm_timestamp_; - enum { Stop, Pause, Alarm }; -}; - -class ActorMessageImpl : private MpscLinkQueueImpl::Node { - public: - ActorMessageImpl() = default; - ActorMessageImpl(const ActorMessageImpl &) = delete; - ActorMessageImpl &operator=(const ActorMessageImpl &) = delete; - ActorMessageImpl(ActorMessageImpl &&other) = delete; - ActorMessageImpl &operator=(ActorMessageImpl &&other) = delete; - virtual ~ActorMessageImpl() = default; - virtual void run() = 0; - //virtual void run_anonymous() = 0; - - // ActorMessage <--> MpscLintQueue::Node - // Each actor's mailbox will be a queue - static ActorMessageImpl *from_mpsc_link_queue_node(MpscLinkQueueImpl::Node *node) { - return static_cast<ActorMessageImpl *>(node); - } - MpscLinkQueueImpl::Node *to_mpsc_link_queue_node() { - return static_cast<MpscLinkQueueImpl::Node *>(this); - } - - uint64 link_token_{EmptyLinkToken}; - bool is_big_{false}; -}; - -class ActorMessage { - public: - ActorMessage() = default; - explicit ActorMessage(std::unique_ptr<ActorMessageImpl> impl) : impl_(std::move(impl)) { - } - void run() { - CHECK(impl_); - impl_->run(); - } - explicit operator bool() { - return bool(impl_); - } - friend class ActorMailbox; - - void set_link_token(uint64 link_token) { - impl_->link_token_ = link_token; - } - uint64 get_link_token() const { - return impl_->link_token_; - } - bool is_big() const { - return impl_->is_big_; - } - void set_big() { - impl_->is_big_ = true; - } - - private: - std::unique_ptr<ActorMessageImpl> impl_; - - template <class T> - friend class td::MpscLinkQueue; - - static ActorMessage from_mpsc_link_queue_node(MpscLinkQueueImpl::Node *node) { - return ActorMessage(std::unique_ptr<ActorMessageImpl>(ActorMessageImpl::from_mpsc_link_queue_node(node))); - } - MpscLinkQueueImpl::Node *to_mpsc_link_queue_node() { - return impl_.release()->to_mpsc_link_queue_node(); - } -}; - -class ActorMailbox { - public: - ActorMailbox() = default; - ActorMailbox(const ActorMailbox &) = delete; - ActorMailbox &operator=(const ActorMailbox &) = delete; - ActorMailbox(ActorMailbox &&other) = delete; - ActorMailbox &operator=(ActorMailbox &&other) = delete; - ~ActorMailbox() { - pop_all(); - while (reader_.read()) { - // skip - } - } - class Reader; - void push(ActorMessage message) { - queue_.push(std::move(message)); - } - void push_unsafe(ActorMessage message) { - queue_.push_unsafe(std::move(message)); - } - - td::MpscLinkQueue<ActorMessage>::Reader &reader() { - return reader_; - } - - void pop_all() { - queue_.pop_all(reader_); - } - void pop_all_unsafe() { - queue_.pop_all_unsafe(reader_); - } - - private: - td::MpscLinkQueue<ActorMessage> queue_; - td::MpscLinkQueue<ActorMessage>::Reader reader_; -}; - -class ActorInfo - : private HeapNode - , private ListNode { - public: - ActorInfo(std::unique_ptr<Actor> actor, ActorState::Flags state_flags, Slice name) - : actor_(std::move(actor)), name_(name.begin(), name.size()) { - state_.set_flags_unsafe(state_flags); - } - - bool has_actor() const { - return bool(actor_); - } - Actor &actor() { - CHECK(has_actor()); - return *actor_; - } - Actor *actor_ptr() const { - return actor_.get(); - } - void destroy_actor() { - actor_.reset(); - } - ActorState &state() { - return state_; - } - ActorMailbox &mailbox() { - return mailbox_; - } - CSlice get_name() const { - return name_; - } - - HeapNode *as_heap_node() { - return this; - } - static ActorInfo *from_heap_node(HeapNode *node) { - return static_cast<ActorInfo *>(node); - } - - Timestamp &alarm_timestamp() { - return alarm_timestamp_; - } - - private: - std::unique_ptr<Actor> actor_; - ActorState state_; - ActorMailbox mailbox_; - std::string name_; - Timestamp alarm_timestamp_; -}; - -using ActorInfoPtr = SharedObjectPool<ActorInfo>::Ptr; - -class Actor { - public: - Actor() = default; - Actor(const Actor &) = delete; - Actor &operator=(const Actor &) = delete; - Actor(Actor &&other) = delete; - Actor &operator=(Actor &&other) = delete; - virtual ~Actor() = default; - - void set_actor_info_ptr(ActorInfoPtr actor_info_ptr) { - actor_info_ptr_ = std::move(actor_info_ptr); - } - ActorInfoPtr get_actor_info_ptr() { - return actor_info_ptr_; - } - - protected: - // Signal handlers - virtual void start_up(); // StartUp signal handler - virtual void tear_down(); // TearDown signal handler (or Kill) - virtual void hang_up(); // HangUp signal handler - virtual void wake_up(); // WakeUp signal handler - virtual void alarm(); // Alarm signal handler - - friend class ActorMessageHangup; - - // Event handlers - //virtual void hangup_shared(); - // TODO: raw event? - - virtual void loop(); // default handler - - // Useful functions - void yield(); // send wakeup signal to itself - void stop(); // send Kill signal to itself - Timestamp &alarm_timestamp() { - return ActorExecuteContext::get()->alarm_timestamp(); - } - Timestamp get_alarm_timestamp() { - return ActorExecuteContext::get()->get_alarm_timestamp(); - } - - CSlice get_name() { - return actor_info_ptr_->get_name(); - } - - // Inteface to scheduler - // Query will be just passed to current scheduler - // Timeout functions - //bool has_timeout() const; - //void set_timeout_in(double timeout_in); - //void set_timeout_at(double timeout_at); - //void cancel_timeout(); - //uint64 get_link_token(); // get current request's link_token - //set context that will be inherited by all childrens - //void set_context(std::shared_ptr<ActorContext> context); - - //ActorShared<> actor_shared(); // ActorShared to itself - //template <class SelfT> - //ActorShared<SelfT> actor_shared(SelfT *self, uint64 id = static_cast<uint64>(-1)); // ActorShared with type - - // Create EventFull to itself - //template <class FuncT, class... ArgsT> - //auto self_closure(FuncT &&func, ArgsT &&... args); - //template <class SelfT, class FuncT, class... ArgsT> - //auto self_closure(SelfT *self, FuncT &&func, ArgsT &&... args); - //template <class LambdaT> - //auto self_lambda(LambdaT &&lambda); - - //void do_stop(); // process Kill signal immediately - - private: - friend class ActorExecutor; - ActorInfoPtr actor_info_ptr_; -}; -// Signal handlers -inline void Actor::start_up() { - yield(); -} -inline void Actor::tear_down() { - // noop -} -inline void Actor::hang_up() { - stop(); -} -inline void Actor::wake_up() { - loop(); -} -inline void Actor::alarm() { - loop(); -} - -inline void Actor::loop() { - // noop -} - -// Useful functions -inline void Actor::yield() { - // TODO -} -inline void Actor::stop() { - ActorExecuteContext::get()->set_stop(); -} - -class ActorInfoCreator { - public: - class Options { - public: - Options() = default; - - Options &with_name(Slice new_name) { - name = new_name; - return *this; - } - - Options &on_scheduler(SchedulerId new_scheduler_id) { - scheduler_id = new_scheduler_id; - return *this; - } - bool has_scheduler() const { - return scheduler_id.is_valid(); - } - Options &with_poll() { - is_shared = false; - return *this; - } - - private: - friend class ActorInfoCreator; - Slice name; - SchedulerId scheduler_id; - bool is_shared{true}; - bool in_queue{true}; - //TODO: rename - }; - - //Create unlocked actor. One must send StartUp signal immediately. - ActorInfoPtr create(std::unique_ptr<Actor> actor, const Options &args) { - ActorState::Flags flags; - flags.set_scheduler_id(args.scheduler_id); - flags.set_shared(args.is_shared); - flags.set_in_queue(args.in_queue); - flags.set_signals(ActorSignals::one(ActorSignals::StartUp)); - - auto actor_info_ptr = pool_.alloc(std::move(actor), flags, args.name); - actor_info_ptr->actor().set_actor_info_ptr(actor_info_ptr); - return actor_info_ptr; - } - - ActorInfoCreator() = default; - ActorInfoCreator(const ActorInfoCreator &) = delete; - ActorInfoCreator &operator=(const ActorInfoCreator &) = delete; - ActorInfoCreator(ActorInfoCreator &&other) = delete; - ActorInfoCreator &operator=(ActorInfoCreator &&other) = delete; - ~ActorInfoCreator() { - pool_.for_each([](auto &actor_info) { actor_info.destroy_actor(); }); - } - - private: - SharedObjectPool<ActorInfo> pool_; -}; - -using ActorOptions = ActorInfoCreator::Options; - -class SchedulerDispatcher { - public: - virtual SchedulerId get_scheduler_id() const = 0; - virtual void add_to_queue(ActorInfoPtr actor_info_ptr, SchedulerId scheduler_id, bool need_poll) = 0; - virtual void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) = 0; - - SchedulerDispatcher() = default; - SchedulerDispatcher(const SchedulerDispatcher &) = delete; - SchedulerDispatcher &operator=(const SchedulerDispatcher &) = delete; - SchedulerDispatcher(SchedulerDispatcher &&other) = delete; - SchedulerDispatcher &operator=(SchedulerDispatcher &&other) = delete; - virtual ~SchedulerDispatcher() = default; -}; - -class ActorExecutor { - public: - struct Options { - Options &with_from_queue() { - from_queue = true; - return *this; - } - Options &with_has_poll(bool new_has_poll) { - this->has_poll = new_has_poll; - return *this; - } - bool from_queue{false}; - bool has_poll{false}; - }; - ActorExecutor(ActorInfo &actor_info, SchedulerDispatcher &dispatcher, Options options) - : actor_info_(actor_info), dispatcher_(dispatcher), options_(options) { - //LOG(ERROR) << "START " << actor_info_.get_name() << " " << tag("from_queue", from_queue); - start(); - } - ActorExecutor(const ActorExecutor &) = delete; - ActorExecutor &operator=(const ActorExecutor &) = delete; - ActorExecutor(ActorExecutor &&other) = delete; - ActorExecutor &operator=(ActorExecutor &&other) = delete; - ~ActorExecutor() { - //LOG(ERROR) << "FINISH " << actor_info_.get_name() << " " << tag("own_lock", actor_locker_.own_lock()); - finish(); - } - - // our best guess if actor is closed or not - bool can_send() { - return !flags().is_closed(); - } - - bool can_send_immediate() { - return actor_locker_.own_lock() && !actor_execute_context_.has_flags() && actor_locker_.can_execute(); - } - - template <class F> - void send_immediate(F &&f, uint64 link_token) { - CHECK(can_send_immediate()); - if (!can_send()) { - return; - } - actor_execute_context_.set_link_token(link_token); - f(); - } - void send_immediate(ActorMessage message) { - CHECK(can_send_immediate()); - if (message.is_big()) { - actor_info_.mailbox().reader().delay(std::move(message)); - pending_signals_.add_signal(ActorSignals::Message); - actor_execute_context_.set_pause(); - return; - } - actor_execute_context_.set_link_token(message.get_link_token()); - message.run(); - } - void send_immediate(ActorSignals signals) { - CHECK(can_send_immediate()); - while (flush_one_signal(signals) && can_send_immediate()) { - } - pending_signals_.add_signals(signals); - } - - void send(ActorMessage message) { - if (!can_send()) { - return; - } - if (can_send_immediate()) { - return send_immediate(std::move(message)); - } - actor_info_.mailbox().push(std::move(message)); - pending_signals_.add_signal(ActorSignals::Message); - } - - void send(ActorSignals signals) { - if (!can_send()) { - return; - } - - pending_signals_.add_signals(signals); - } - - private: - ActorInfo &actor_info_; - SchedulerDispatcher &dispatcher_; - Options options_; - ActorLocker actor_locker_{ - &actor_info_.state(), - ActorLocker::Options().with_can_execute_paused(options_.from_queue).with_is_shared(!options_.has_poll)}; - - ActorExecuteContext actor_execute_context_{actor_info_.actor_ptr(), actor_info_.alarm_timestamp()}; - ActorExecuteContext::Guard guard{&actor_execute_context_}; - - ActorState::Flags flags_; - ActorSignals pending_signals_; - - ActorState::Flags &flags() { - return flags_; - } - - void start() { - if (!can_send()) { - return; - } - - ActorSignals signals; - SCOPE_EXIT { - pending_signals_.add_signals(signals); - }; - - if (options_.from_queue) { - signals.add_signal(ActorSignals::Pop); - } - - actor_locker_.try_lock(); - flags_ = actor_locker_.flags(); - - if (!actor_locker_.own_lock()) { - return; - } - - if (options_.from_queue) { - flags().set_pause(false); - } - if (!actor_locker_.can_execute()) { - CHECK(!options_.from_queue); - return; - } - - signals.add_signals(flags().get_signals()); - actor_info_.mailbox().pop_all(); - - while (!actor_execute_context_.has_flags() && flush_one(signals)) { - } - } - - void finish() { - if (!actor_locker_.own_lock()) { - if (!pending_signals_.empty() && actor_locker_.add_signals(pending_signals_)) { - flags_ = actor_locker_.flags(); - } else { - return; - } - } - CHECK(actor_locker_.own_lock()); - - if (actor_execute_context_.has_flags()) { - if (actor_execute_context_.get_stop()) { - if (actor_info_.alarm_timestamp()) { - dispatcher_.set_alarm_timestamp(actor_info_.actor().get_actor_info_ptr(), Timestamp::never()); - } - flags_.set_closed(true); - actor_info_.actor().tear_down(); - actor_info_.destroy_actor(); - return; - } - if (actor_execute_context_.get_pause()) { - flags_.set_pause(true); - } - if (actor_execute_context_.get_alarm_flag()) { - auto old_timestamp = actor_info_.alarm_timestamp(); - auto new_timestamp = actor_execute_context_.get_alarm_timestamp(); - if (!(old_timestamp == new_timestamp)) { - actor_info_.alarm_timestamp() = new_timestamp; - dispatcher_.set_alarm_timestamp(actor_info_.actor().get_actor_info_ptr(), new_timestamp); - } - } - } - flags_.set_signals(pending_signals_); - - bool add_to_queue = false; - while (true) { - // Drop InQueue flag if has pop signal - // Can't delay this signal - auto signals = flags().get_signals(); - if (signals.has_signal(ActorSignals::Pop)) { - signals.clear_signal(ActorSignals::Pop); - flags().set_signals(signals); - flags().set_in_queue(false); - } - - if (flags().has_signals() && !flags().is_in_queue()) { - add_to_queue = true; - flags().set_in_queue(true); - } - if (actor_locker_.try_unlock(flags())) { - if (add_to_queue) { - dispatcher_.add_to_queue(actor_info_.actor().get_actor_info_ptr(), flags().get_scheduler_id(), - !flags().is_shared()); - } - break; - } - flags_ = actor_locker_.flags(); - } - } - - bool flush_one(ActorSignals &signals) { - return flush_one_signal(signals) || flush_one_message(); - } - - bool flush_one_signal(ActorSignals &signals) { - auto signal = signals.first_signal(); - if (!signal) { - return false; - } - switch (signal) { - case ActorSignals::Wakeup: - actor_info_.actor().wake_up(); - break; - case ActorSignals::Alarm: - if (actor_execute_context_.get_alarm_timestamp().is_in_past()) { - actor_execute_context_.alarm_timestamp() = Timestamp::never(); - actor_info_.actor().alarm(); - } - break; - case ActorSignals::Kill: - actor_execute_context_.set_stop(); - break; - case ActorSignals::StartUp: - actor_info_.actor().start_up(); - break; - case ActorSignals::TearDown: - actor_info_.actor().tear_down(); - break; - case ActorSignals::Pop: - flags().set_in_queue(false); - break; - - case ActorSignals::Message: - break; - case ActorSignals::Io: - case ActorSignals::Cpu: - LOG(FATAL) << "TODO"; - default: - UNREACHABLE(); - } - signals.clear_signal(signal); - return true; - } - - bool flush_one_message() { - auto message = actor_info_.mailbox().reader().read(); - if (!message) { - return false; - } - if (message.is_big() && !options_.from_queue) { - actor_info_.mailbox().reader().delay(std::move(message)); - pending_signals_.add_signal(ActorSignals::Message); - actor_execute_context_.set_pause(); - return false; - } - - actor_execute_context_.set_link_token(message.get_link_token()); - message.run(); - return true; - } -}; - -using SchedulerMessage = ActorInfoPtr; - -struct WorkerInfo { - enum class Type { Io, Cpu } type{Type::Io}; - WorkerInfo() = default; - explicit WorkerInfo(Type type) : type(type) { - } - ActorInfoCreator actor_info_creator; -}; - -struct SchedulerInfo { - SchedulerId id; - // will be read by all workers is any thread - std::unique_ptr<MpmcQueue<SchedulerMessage>> cpu_queue; - std::unique_ptr<MpmcWaiter> cpu_queue_waiter; - // only scheduler itself may read from io_queue_ - std::unique_ptr<MpscPollableQueue<SchedulerMessage>> io_queue; - size_t cpu_threads_count{0}; - - std::unique_ptr<WorkerInfo> io_worker; - std::vector<std::unique_ptr<WorkerInfo>> cpu_workers; -}; - -struct SchedulerGroupInfo { - explicit SchedulerGroupInfo(size_t n) : schedulers(n) { - } - std::atomic<bool> is_stop_requested{false}; - - int active_scheduler_count{0}; - std::mutex active_scheduler_count_mutex; - std::condition_variable active_scheduler_count_condition_variable; - - std::vector<SchedulerInfo> schedulers; -}; - -class SchedulerContext - : public Context<SchedulerContext> - , public SchedulerDispatcher { - public: - // DispatcherInterface - SchedulerDispatcher &dispatcher() { - return *this; - } - - // ActorCreator Interface - virtual ActorInfoCreator &get_actor_info_creator() = 0; - - // Poll interface - virtual bool has_poll() = 0; - virtual Poll &get_poll() = 0; - - // Timeout interface - virtual bool has_heap() = 0; - virtual KHeap<double> &get_heap() = 0; - - // Stop all schedulers - virtual bool is_stop_requested() = 0; - virtual void stop() = 0; -}; - -#if !TD_THREAD_UNSUPPORTED -class Scheduler { - public: - Scheduler(std::shared_ptr<SchedulerGroupInfo> scheduler_group_info, SchedulerId id, size_t cpu_threads_count) - : scheduler_group_info_(std::move(scheduler_group_info)), cpu_threads_(cpu_threads_count) { - scheduler_group_info_->active_scheduler_count++; - info_ = &scheduler_group_info_->schedulers.at(id.value()); - info_->id = id; - if (cpu_threads_count != 0) { - info_->cpu_threads_count = cpu_threads_count; - info_->cpu_queue = std::make_unique<MpmcQueue<SchedulerMessage>>(1024, max_thread_count()); - info_->cpu_queue_waiter = std::make_unique<MpmcWaiter>(); - } - info_->io_queue = std::make_unique<MpscPollableQueue<SchedulerMessage>>(); - info_->io_queue->init(); - - info_->cpu_workers.resize(cpu_threads_count); - for (auto &worker : info_->cpu_workers) { - worker = std::make_unique<WorkerInfo>(WorkerInfo::Type::Cpu); - } - info_->io_worker = std::make_unique<WorkerInfo>(WorkerInfo::Type::Io); - - poll_.init(); - io_worker_ = std::make_unique<IoWorker>(*info_->io_queue); - } - - Scheduler(const Scheduler &) = delete; - Scheduler &operator=(const Scheduler &) = delete; - Scheduler(Scheduler &&other) = delete; - Scheduler &operator=(Scheduler &&other) = delete; - ~Scheduler() { - // should stop - stop(); - do_stop(); - } - - void start() { - for (size_t i = 0; i < cpu_threads_.size(); i++) { - cpu_threads_[i] = td::thread([this, i] { - this->run_in_context_impl(*this->info_->cpu_workers[i], - [this] { CpuWorker(*info_->cpu_queue, *info_->cpu_queue_waiter).run(); }); - }); - } - this->run_in_context([this] { this->io_worker_->start_up(); }); - } - - template <class F> - void run_in_context(F &&f) { - run_in_context_impl(*info_->io_worker, std::forward<F>(f)); - } - - bool run(double timeout) { - bool res; - run_in_context_impl(*info_->io_worker, [this, timeout, &res] { - if (SchedulerContext::get()->is_stop_requested()) { - res = false; - } else { - res = io_worker_->run_once(timeout); - } - if (!res) { - io_worker_->tear_down(); - } - }); - if (!res) { - do_stop(); - } - return res; - } - - // Just syntactic sugar - void stop() { - run_in_context([] { SchedulerContext::get()->stop(); }); - } - - SchedulerId get_scheduler_id() const { - return info_->id; - } - - private: - std::shared_ptr<SchedulerGroupInfo> scheduler_group_info_; - SchedulerInfo *info_; - std::vector<td::thread> cpu_threads_; - bool is_stopped_{false}; - Poll poll_; - KHeap<double> heap_; - class IoWorker; - std::unique_ptr<IoWorker> io_worker_; - - class SchedulerContextImpl : public SchedulerContext { - public: - SchedulerContextImpl(WorkerInfo *worker, SchedulerInfo *scheduler, SchedulerGroupInfo *scheduler_group, Poll *poll, - KHeap<double> *heap) - : worker_(worker), scheduler_(scheduler), scheduler_group_(scheduler_group), poll_(poll), heap_(heap) { - } - - SchedulerId get_scheduler_id() const override { - return scheduler()->id; - } - void add_to_queue(ActorInfoPtr actor_info_ptr, SchedulerId scheduler_id, bool need_poll) override { - if (!scheduler_id.is_valid()) { - scheduler_id = scheduler()->id; - } - auto &info = scheduler_group()->schedulers.at(scheduler_id.value()); - if (need_poll) { - info.io_queue->writer_put(std::move(actor_info_ptr)); - } else { - info.cpu_queue->push(std::move(actor_info_ptr), get_thread_id()); - info.cpu_queue_waiter->notify(); - } - } - - ActorInfoCreator &get_actor_info_creator() override { - return worker()->actor_info_creator; - } - - bool has_poll() override { - return poll_ != nullptr; - } - Poll &get_poll() override { - CHECK(has_poll()); - return *poll_; - } - - bool has_heap() override { - return heap_ != nullptr; - } - KHeap<double> &get_heap() override { - CHECK(has_heap()); - return *heap_; - } - - void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) override { - // we are in PollWorker - CHECK(has_heap()); - auto &heap = get_heap(); - auto *heap_node = actor_info_ptr->as_heap_node(); - if (timestamp) { - if (heap_node->in_heap()) { - heap.fix(timestamp.at(), heap_node); - } else { - heap.insert(timestamp.at(), heap_node); - } - } else { - if (heap_node->in_heap()) { - heap.erase(heap_node); - } - } - - // TODO: do something in plain worker - } - - bool is_stop_requested() override { - return scheduler_group()->is_stop_requested; - } - - void stop() override { - bool expect_false = false; - // Trying to set close_flag_ to true with CAS - auto &group = *scheduler_group(); - if (!group.is_stop_requested.compare_exchange_strong(expect_false, true)) { - return; - } - - // Notify all workers of all schedulers - for (auto &scheduler_info : group.schedulers) { - scheduler_info.io_queue->writer_put({}); - for (size_t i = 0; i < scheduler_info.cpu_threads_count; i++) { - scheduler_info.cpu_queue->push({}, get_thread_id()); - scheduler_info.cpu_queue_waiter->notify(); - } - } - } - - private: - WorkerInfo *worker() const { - return worker_; - } - SchedulerInfo *scheduler() const { - return scheduler_; - } - SchedulerGroupInfo *scheduler_group() const { - return scheduler_group_; - } - - WorkerInfo *worker_; - SchedulerInfo *scheduler_; - SchedulerGroupInfo *scheduler_group_; - Poll *poll_; - - KHeap<double> *heap_; - }; - - template <class F> - void run_in_context_impl(WorkerInfo &worker_info, F &&f) { - bool is_io_worker = worker_info.type == WorkerInfo::Type::Io; - SchedulerContextImpl context(&worker_info, info_, scheduler_group_info_.get(), is_io_worker ? &poll_ : nullptr, - is_io_worker ? &heap_ : nullptr); - SchedulerContext::Guard guard(&context); - f(); - } - - class CpuWorker { - public: - CpuWorker(MpmcQueue<SchedulerMessage> &queue, MpmcWaiter &waiter) : queue_(queue), waiter_(waiter) { - } - void run() { - auto thread_id = get_thread_id(); - auto &dispatcher = SchedulerContext::get()->dispatcher(); - - int yields = 0; - while (true) { - SchedulerMessage message; - if (queue_.try_pop(message, thread_id)) { - if (!message) { - return; - } - ActorExecutor executor(*message, dispatcher, ActorExecutor::Options().with_from_queue()); - yields = waiter_.stop_wait(yields, thread_id); - } else { - yields = waiter_.wait(yields, thread_id); - } - } - } - - private: - MpmcQueue<SchedulerMessage> &queue_; - MpmcWaiter &waiter_; - }; - - class IoWorker { - public: - explicit IoWorker(MpscPollableQueue<SchedulerMessage> &queue) : queue_(queue) { - } - - void start_up() { - auto &poll = SchedulerContext::get()->get_poll(); - poll.subscribe(queue_.reader_get_event_fd().get_fd(), Fd::Flag::Read); - } - void tear_down() { - auto &poll = SchedulerContext::get()->get_poll(); - poll.unsubscribe(queue_.reader_get_event_fd().get_fd()); - } - - bool run_once(double timeout) { - auto &dispatcher = SchedulerContext::get()->dispatcher(); - auto &poll = SchedulerContext::get()->get_poll(); - auto &heap = SchedulerContext::get()->get_heap(); - - auto now = Time::now(); // update Time::now_cached() - while (!heap.empty() && heap.top_key() <= now) { - auto *heap_node = heap.pop(); - auto *actor_info = ActorInfo::from_heap_node(heap_node); - - ActorExecutor executor(*actor_info, dispatcher, ActorExecutor::Options().with_has_poll(true)); - if (executor.can_send_immediate()) { - executor.send_immediate(ActorSignals::one(ActorSignals::Alarm)); - } else { - executor.send(ActorSignals::one(ActorSignals::Alarm)); - } - } - - const int size = queue_.reader_wait_nonblock(); - for (int i = 0; i < size; i++) { - auto message = queue_.reader_get_unsafe(); - if (!message) { - return false; - } - ActorExecutor executor(*message, dispatcher, ActorExecutor::Options().with_from_queue().with_has_poll(true)); - } - queue_.reader_flush(); - - bool can_sleep = size == 0 && timeout != 0; - int32 timeout_ms = 0; - if (can_sleep) { - auto wakeup_timestamp = Timestamp::in(timeout); - if (!heap.empty()) { - wakeup_timestamp.relax(Timestamp::at(heap.top_key())); - } - timeout_ms = static_cast<int>(wakeup_timestamp.in() * 1000) + 1; - if (timeout_ms < 0) { - timeout_ms = 0; - } - //const int thirty_seconds = 30 * 1000; - //if (timeout_ms > thirty_seconds) { - //timeout_ms = thirty_seconds; - //} - } - poll.run(timeout_ms); - return true; - } - - private: - MpscPollableQueue<SchedulerMessage> &queue_; - }; - - void do_stop() { - if (is_stopped_) { - return; - } - // wait other threads to finish - for (auto &thread : cpu_threads_) { - thread.join(); - } - // Can't do anything else, other schedulers may send queries to this one. - // Must wait till every scheduler is stopped first.. - is_stopped_ = true; - - io_worker_.reset(); - poll_.clear(); - - std::unique_lock<std::mutex> lock(scheduler_group_info_->active_scheduler_count_mutex); - scheduler_group_info_->active_scheduler_count--; - scheduler_group_info_->active_scheduler_count_condition_variable.notify_all(); - } - - public: - static void close_scheduler_group(SchedulerGroupInfo &group_info) { - LOG(ERROR) << "close scheduler group"; - // Cannot close scheduler group before somebody asked to stop them - CHECK(group_info.is_stop_requested); - { - std::unique_lock<std::mutex> lock(group_info.active_scheduler_count_mutex); - group_info.active_scheduler_count_condition_variable.wait(lock, - [&] { return group_info.active_scheduler_count == 0; }); - } - - // Drain all queues - // Just to destroy all elements should be ok. - for (auto &scheduler_info : group_info.schedulers) { - // Drain io queue - auto &io_queue = *scheduler_info.io_queue; - while (true) { - int n = io_queue.reader_wait_nonblock(); - if (n == 0) { - break; - } - while (n-- > 0) { - auto message = io_queue.reader_get_unsafe(); - // message's destructor is called - } - } - scheduler_info.io_queue.reset(); - - // Drain cpu queue - auto &cpu_queue = *scheduler_info.cpu_queue; - while (true) { - SchedulerMessage message; - if (!cpu_queue.try_pop(message, get_thread_id())) { - break; - } - // message's destructor is called - } - scheduler_info.cpu_queue.reset(); - - // Do not destroy worker infos. run_in_context will crash if they are empty - } - } -}; - -// Actor messages -template <class LambdaT> -class ActorMessageLambda : public ActorMessageImpl { - public: - template <class FromLambdaT> - explicit ActorMessageLambda(FromLambdaT &&lambda) : lambda_(std::forward<FromLambdaT>(lambda)) { - } - void run() override { - lambda_(); - } - - private: - LambdaT lambda_; -}; - -class ActorMessageHangup : public ActorMessageImpl { - public: - void run() override { - ActorExecuteContext::get()->actor().hang_up(); - } -}; - -class ActorMessageCreator { - public: - template <class F> - static ActorMessage lambda(F &&f) { - return ActorMessage(std::make_unique<ActorMessageLambda<F>>(std::forward<F>(f))); - } - - static ActorMessage hangup() { - return ActorMessage(std::make_unique<ActorMessageHangup>()); - } - - // Use faster allocation? -}; - -// SYNTAX SHUGAR -namespace detail { -struct ActorRef { - ActorRef(ActorInfo &actor_info, uint64 link_token = EmptyLinkToken) : actor_info(actor_info), link_token(link_token) { - } - - ActorInfo &actor_info; - uint64 link_token; -}; - -template <class T> -T ¤t_actor() { - return static_cast<T &>(ActorExecuteContext::get()->actor()); -} - -void send_message(ActorInfo &actor_info, ActorMessage message) { - ActorExecutor executor(actor_info, SchedulerContext::get()->dispatcher(), ActorExecutor::Options()); - executor.send(std::move(message)); -} - -void send_message(ActorRef actor_ref, ActorMessage message) { - message.set_link_token(actor_ref.link_token); - send_message(actor_ref.actor_info, std::move(message)); -} -void send_message_later(ActorInfo &actor_info, ActorMessage message) { - ActorExecutor executor(actor_info, SchedulerContext::get()->dispatcher(), ActorExecutor::Options()); - executor.send(std::move(message)); -} - -void send_message_later(ActorRef actor_ref, ActorMessage message) { - message.set_link_token(actor_ref.link_token); - send_message_later(actor_ref.actor_info, std::move(message)); -} - -template <class ExecuteF, class ToMessageF> -void send_immediate(ActorRef actor_ref, ExecuteF &&execute, ToMessageF &&to_message) { - auto &scheduler_context = *SchedulerContext::get(); - ActorExecutor executor(actor_ref.actor_info, scheduler_context.dispatcher(), - ActorExecutor::Options().with_has_poll(scheduler_context.has_poll())); - if (executor.can_send_immediate()) { - return executor.send_immediate(execute, actor_ref.link_token); - } - auto message = to_message(); - message.set_link_token(actor_ref.link_token); - executor.send(std::move(message)); -} - -template <class F> -void send_lambda(ActorRef actor_ref, F &&lambda) { - send_immediate(actor_ref, lambda, [&lambda]() mutable { return ActorMessageCreator::lambda(std::move(lambda)); }); -} -template <class F> -void send_lambda_later(ActorRef actor_ref, F &&lambda) { - send_message_later(actor_ref, ActorMessageCreator::lambda(std::move(lambda))); -} - -template <class ClosureT> -void send_closure_impl(ActorRef actor_ref, ClosureT &&closure) { - using ActorType = typename ClosureT::ActorType; - send_immediate(actor_ref, [&closure]() mutable { closure.run(¤t_actor<ActorType>()); }, - [&closure]() mutable { - return ActorMessageCreator::lambda([closure = to_delayed_closure(std::move(closure))]() mutable { - closure.run(¤t_actor<ActorType>()); - }); - }); -} - -template <class... ArgsT> -void send_closure(ActorRef actor_ref, ArgsT &&... args) { - send_closure_impl(actor_ref, create_immediate_closure(std::forward<ArgsT>(args)...)); -} - -template <class ClosureT> -void send_closure_later_impl(ActorRef actor_ref, ClosureT &&closure) { - using ActorType = typename ClosureT::ActorType; - send_message_later(actor_ref, ActorMessageCreator::lambda([closure = std::move(closure)]() mutable { - closure.run(¤t_actor<ActorType>()); - })); -} - -template <class... ArgsT> -void send_closure_later(ActorRef actor_ref, ArgsT &&... args) { - send_closure_later_impl(actor_ref, create_delayed_closure(std::forward<ArgsT>(args)...)); -} - -void register_actor_info_ptr(ActorInfoPtr actor_info_ptr) { - auto state = actor_info_ptr->state().get_flags_unsafe(); - SchedulerContext::get()->add_to_queue(std::move(actor_info_ptr), state.get_scheduler_id(), !state.is_shared()); -} - -template <class T, class... ArgsT> -ActorInfoPtr create_actor(ActorOptions &options, ArgsT &&... args) { - auto *scheduler_context = SchedulerContext::get(); - if (!options.has_scheduler()) { - options.on_scheduler(scheduler_context->get_scheduler_id()); - } - auto res = - scheduler_context->get_actor_info_creator().create(std::make_unique<T>(std::forward<ArgsT>(args)...), options); - register_actor_info_ptr(res); - return res; -} -} // namespace detail - -// Essentially ActorInfoWeakPtr with Type -template <class ActorType = Actor> -class ActorId { - public: - using ActorT = ActorType; - ActorId() = default; - ActorId(const ActorId &) = default; - ActorId &operator=(const ActorId &) = default; - ActorId(ActorId &&other) = default; - ActorId &operator=(ActorId &&other) = default; - - // allow only conversion from child to parent - template <class ToActorType, class = std::enable_if_t<std::is_base_of<ToActorType, ActorType>::value>> - explicit operator ActorId<ToActorType>() const { - return ActorId<ToActorType>(ptr_); - } - - const ActorInfoPtr &actor_info_ptr() const { - return ptr_; - } - - ActorInfo &actor_info() const { - CHECK(ptr_); - return *ptr_; - } - bool empty() const { - return !ptr_; - } - - template <class... ArgsT> - static ActorId<ActorType> create(ActorOptions &options, ArgsT &&... args) { - return ActorId<ActorType>(detail::create_actor<ActorType>(options, std::forward<ArgsT>(args)...)); - } - - detail::ActorRef as_actor_ref() const { - CHECK(!empty()); - return detail::ActorRef(*actor_info_ptr()); - } - - private: - ActorInfoPtr ptr_; - - explicit ActorId(ActorInfoPtr ptr) : ptr_(std::move(ptr)) { - } - - template <class SelfT> - friend ActorId<SelfT> actor_id(SelfT *self); -}; - -template <class ActorType = Actor> -class ActorOwn { - public: - using ActorT = ActorType; - ActorOwn() = default; - explicit ActorOwn(ActorId<ActorType> id) : id_(std::move(id)) { - } - template <class OtherActorType> - explicit ActorOwn(ActorId<OtherActorType> id) : id_(std::move(id)) { - } - template <class OtherActorType> - explicit ActorOwn(ActorOwn<OtherActorType> &&other) : id_(other.release()) { - } - template <class OtherActorType> - ActorOwn &operator=(ActorOwn<OtherActorType> &&other) { - reset(other.release()); - } - ActorOwn(ActorOwn &&other) : id_(other.release()) { - } - ActorOwn &operator=(ActorOwn &&other) { - reset(other.release()); - } - ActorOwn(const ActorOwn &) = delete; - ActorOwn &operator=(const ActorOwn &) = delete; - ~ActorOwn() { - reset(); - } - - bool empty() const { - return id_.empty(); - } - bool is_alive() const { - return id_.is_alive(); - } - ActorId<ActorType> get() const { - return id_; - } - ActorId<ActorType> release() { - return std::move(id_); - } - void reset(ActorId<ActorType> other = ActorId<ActorType>()) { - static_assert(sizeof(ActorType) > 0, "Can't use ActorOwn with incomplete type"); - hangup(); - id_ = std::move(other); - } - const ActorId<ActorType> *operator->() const { - return &id_; - } - - detail::ActorRef as_actor_ref() const { - CHECK(!empty()); - return detail::ActorRef(*id_.actor_info_ptr(), 0); - } - - private: - ActorId<ActorType> id_; - void hangup() const { - if (empty()) { - return; - } - detail::send_message(as_actor_ref(), ActorMessageCreator::hangup()); - } -}; - -template <class ActorType = Actor> -class ActorShared { - public: - using ActorT = ActorType; - ActorShared() = default; - template <class OtherActorType> - ActorShared(ActorId<OtherActorType> id, uint64 token) : id_(std::move(id)), token_(token) { - CHECK(token_ != 0); - } - template <class OtherActorType> - ActorShared(ActorShared<OtherActorType> &&other) : id_(other.release()), token_(other.token()) { - } - template <class OtherActorType> - ActorShared(ActorOwn<OtherActorType> &&other) : id_(other.release()), token_(other.token()) { - } - template <class OtherActorType> - ActorShared &operator=(ActorShared<OtherActorType> &&other) { - reset(other.release(), other.token()); - } - ActorShared(ActorShared &&other) : id_(other.release()), token_(other.token()) { - } - ActorShared &operator=(ActorShared &&other) { - reset(other.release(), other.token()); - } - ActorShared(const ActorShared &) = delete; - ActorShared &operator=(const ActorShared &) = delete; - ~ActorShared() { - reset(); - } - - uint64 token() const { - return token_; - } - bool empty() const { - return id_.empty(); - } - bool is_alive() const { - return id_.is_alive(); - } - ActorId<ActorType> get() const { - return id_; - } - ActorId<ActorType> release(); - void reset(ActorId<ActorType> other = ActorId<ActorType>(), uint64 link_token = EmptyLinkToken) { - static_assert(sizeof(ActorType) > 0, "Can't use ActorShared with incomplete type"); - hangup(); - id_ = other; - token_ = link_token; - } - const ActorId<ActorType> *operator->() const { - return &id_; - } - - detail::ActorRef as_actor_ref() const { - CHECK(!empty()); - return detail::ActorRef(*id_.actor_info_ptr(), token_); - } - - private: - ActorId<ActorType> id_; - uint64 token_; - - void hangup() const { - } -}; - -// common interface -template <class SelfT> -ActorId<SelfT> actor_id(SelfT *self) { - CHECK(self); - CHECK(static_cast<Actor *>(self) == &ActorExecuteContext::get()->actor()); - return ActorId<SelfT>(ActorExecuteContext::get()->actor().get_actor_info_ptr()); -} - -inline ActorId<> actor_id() { - return actor_id(&ActorExecuteContext::get()->actor()); -} - -template <class T, class... ArgsT> -ActorOwn<T> create_actor(ActorOptions options, ArgsT &&... args) { - return ActorOwn<T>(ActorId<T>::create(options, std::forward<ArgsT>(args)...)); -} - -template <class T, class... ArgsT> -ActorOwn<T> create_actor(Slice name, ArgsT &&... args) { - return ActorOwn<T>(ActorId<T>::create(ActorOptions().with_name(name), std::forward<ArgsT>(args)...)); -} - -template <class ActorIdT, class FunctionT, class... ArgsT> -void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) { - using ActorT = typename std::decay_t<ActorIdT>::ActorT; - using FunctionClassT = member_function_class_t<FunctionT>; - static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure"); - - ActorIdT id = std::forward<ActorIdT>(actor_id); - detail::send_closure(id.as_actor_ref(), function, std::forward<ArgsT>(args)...); -} - -template <class ActorIdT, class FunctionT, class... ArgsT> -void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) { - using ActorT = typename std::decay_t<ActorIdT>::ActorT; - using FunctionClassT = member_function_class_t<FunctionT>; - static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure"); - - ActorIdT id = std::forward<ActorIdT>(actor_id); - detail::send_closure_later(id.as_actor_ref(), function, std::forward<ArgsT>(args)...); -} - -template <class ActorIdT, class... ArgsT> -void send_lambda(ActorIdT &&actor_id, ArgsT &&... args) { - ActorIdT id = std::forward<ActorIdT>(actor_id); - detail::send_lambda(id.as_actor_ref(), std::forward<ArgsT>(args)...); -} - -#endif //!TD_THREAD_UNSUPPORTED -} // namespace actor2 -} // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h deleted file mode 100644 index 5850f1a94c..0000000000 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h +++ /dev/null @@ -1,32 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#pragma once - -#include "td/utils/common.h" -#include "td/utils/logging.h" - -namespace td { -namespace actor2 { -class SchedulerId { - public: - SchedulerId() : id_(-1) { - } - explicit SchedulerId(uint8 id) : id_(id) { - } - bool is_valid() const { - return id_ >= 0; - } - uint8 value() const { - CHECK(is_valid()); - return static_cast<uint8>(id_); - } - - private: - int32 id_{0}; -}; -} // namespace actor2 -} // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp index f4267f2818..0720f0ed6f 100644 --- a/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp @@ -1,38 +1,39 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#include "td/utils/tests.h" - -#include "td/actor/Timeout.h" +#include "td/actor/actor.h" +#include "td/actor/ConcurrentScheduler.h" +#include "td/actor/MultiTimeout.h" -using namespace td; +#include "td/utils/common.h" +#include "td/utils/logging.h" +#include "td/utils/Random.h" +#include "td/utils/tests.h" TEST(MultiTimeout, bug) { - ConcurrentScheduler sched; - int threads_n = 0; - sched.init(threads_n); + td::ConcurrentScheduler sched(0, 0); sched.start(); - std::unique_ptr<MultiTimeout> multi_timeout; + td::unique_ptr<td::MultiTimeout> multi_timeout; struct Data { - MultiTimeout *multi_timeout; + td::MultiTimeout *multi_timeout; }; Data data; { - auto guard = sched.get_current_guard(); - multi_timeout = std::make_unique<MultiTimeout>(); + auto guard = sched.get_main_guard(); + multi_timeout = td::make_unique<td::MultiTimeout>("MultiTimeout"); data.multi_timeout = multi_timeout.get(); - multi_timeout->set_callback([](void *void_data, int64 key) { + multi_timeout->set_callback([](void *void_data, td::int64 key) { auto &data = *static_cast<Data *>(void_data); if (key == 1) { data.multi_timeout->cancel_timeout(key + 1); data.multi_timeout->set_timeout_in(key + 2, 1); } else { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } }); multi_timeout->set_callback_data(&data); @@ -45,3 +46,67 @@ TEST(MultiTimeout, bug) { } sched.finish(); } + +class TimeoutManager final : public td::Actor { + static td::int32 count; + + public: + TimeoutManager() { + count++; + + test_timeout_.set_callback(on_test_timeout_callback); + test_timeout_.set_callback_data(static_cast<void *>(this)); + } + TimeoutManager(const TimeoutManager &) = delete; + TimeoutManager &operator=(const TimeoutManager &) = delete; + TimeoutManager(TimeoutManager &&) = delete; + TimeoutManager &operator=(TimeoutManager &&) = delete; + ~TimeoutManager() final { + count--; + LOG(INFO) << "Destroy TimeoutManager"; + } + + static void on_test_timeout_callback(void *timeout_manager_ptr, td::int64 id) { + CHECK(count >= 0); + if (count == 0) { + LOG(ERROR) << "Receive timeout after manager was closed"; + return; + } + + auto manager = static_cast<TimeoutManager *>(timeout_manager_ptr); + send_closure_later(manager->actor_id(manager), &TimeoutManager::test_timeout); + } + + void test_timeout() { + CHECK(count > 0); + // we must yield scheduler, so run_main breaks immediately, if timeouts are handled immediately + td::Scheduler::instance()->yield(); + } + + td::MultiTimeout test_timeout_{"TestTimeout"}; +}; + +td::int32 TimeoutManager::count; + +TEST(MultiTimeout, Destroy) { + td::ConcurrentScheduler sched(0, 0); + + auto timeout_manager = sched.create_actor_unsafe<TimeoutManager>(0, "TimeoutManager"); + TimeoutManager *manager = timeout_manager.get().get_actor_unsafe(); + sched.start(); + int cnt = 100; + while (sched.run_main(cnt == 100 || cnt <= 0 ? 0.001 : 10)) { + auto guard = sched.get_main_guard(); + cnt--; + if (cnt > 0) { + for (int i = 0; i < 2; i++) { + manager->test_timeout_.set_timeout_in(td::Random::fast(0, 1000000000), td::Random::fast(2, 5) / 1000.0); + } + } else if (cnt == 0) { + timeout_manager.reset(); + } else if (cnt == -10) { + td::Scheduler::instance()->finish(); + } + } + sched.finish(); +} diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp deleted file mode 100644 index 9185fe8858..0000000000 --- a/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp +++ /dev/null @@ -1,535 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#include "td/actor/impl2/ActorLocker.h" -#include "td/actor/impl2/Scheduler.h" - -#include "td/utils/format.h" -#include "td/utils/logging.h" -#include "td/utils/port/thread.h" -#include "td/utils/Slice.h" -#include "td/utils/StringBuilder.h" -#include "td/utils/tests.h" -#include "td/utils/Time.h" - -#include <array> -#include <atomic> -#include <deque> -#include <memory> - -using td::actor2::ActorLocker; -using td::actor2::ActorSignals; -using td::actor2::ActorState; -using td::actor2::SchedulerId; - -TEST(Actor2, signals) { - ActorSignals signals; - signals.add_signal(ActorSignals::Wakeup); - signals.add_signal(ActorSignals::Cpu); - signals.add_signal(ActorSignals::Kill); - signals.clear_signal(ActorSignals::Cpu); - - bool was_kill = false; - bool was_wakeup = false; - while (!signals.empty()) { - auto s = signals.first_signal(); - if (s == ActorSignals::Kill) { - was_kill = true; - } else if (s == ActorSignals::Wakeup) { - was_wakeup = true; - } else { - UNREACHABLE(); - } - signals.clear_signal(s); - } - CHECK(was_kill && was_wakeup); -} - -TEST(Actors2, flags) { - ActorState::Flags flags; - CHECK(!flags.is_locked()); - flags.set_locked(true); - CHECK(flags.is_locked()); - flags.set_locked(false); - CHECK(!flags.is_locked()); - flags.set_pause(true); - - flags.set_scheduler_id(SchedulerId{123}); - - auto signals = flags.get_signals(); - CHECK(signals.empty()); - signals.add_signal(ActorSignals::Cpu); - signals.add_signal(ActorSignals::Kill); - CHECK(signals.has_signal(ActorSignals::Cpu)); - CHECK(signals.has_signal(ActorSignals::Kill)); - flags.set_signals(signals); - CHECK(flags.get_signals().raw() == signals.raw()) << flags.get_signals().raw() << " " << signals.raw(); - - auto wakeup = ActorSignals{}; - wakeup.add_signal(ActorSignals::Wakeup); - - flags.add_signals(wakeup); - signals.add_signal(ActorSignals::Wakeup); - CHECK(flags.get_signals().raw() == signals.raw()); - - flags.clear_signals(); - CHECK(flags.get_signals().empty()); - - CHECK(flags.get_scheduler_id().value() == 123); - CHECK(flags.is_pause()); -} - -TEST(Actor2, locker) { - ActorState state; - - ActorSignals kill_signal; - kill_signal.add_signal(ActorSignals::Kill); - - ActorSignals wakeup_signal; - kill_signal.add_signal(ActorSignals::Wakeup); - - ActorSignals cpu_signal; - kill_signal.add_signal(ActorSignals::Cpu); - - { - ActorLocker lockerA(&state); - ActorLocker lockerB(&state); - ActorLocker lockerC(&state); - - CHECK(lockerA.try_lock()); - CHECK(lockerA.own_lock()); - auto flagsA = lockerA.flags(); - CHECK(lockerA.try_unlock(flagsA)); - CHECK(!lockerA.own_lock()); - - CHECK(lockerA.try_lock()); - CHECK(!lockerB.try_lock()); - CHECK(!lockerC.try_lock()); - - CHECK(lockerB.try_add_signals(kill_signal)); - CHECK(!lockerC.try_add_signals(wakeup_signal)); - CHECK(lockerC.try_add_signals(wakeup_signal)); - CHECK(!lockerC.add_signals(cpu_signal)); - CHECK(!lockerA.flags().has_signals()); - CHECK(!lockerA.try_unlock(lockerA.flags())); - { - auto flags = lockerA.flags(); - auto signals = flags.get_signals(); - bool was_kill = false; - bool was_wakeup = false; - bool was_cpu = false; - while (!signals.empty()) { - auto s = signals.first_signal(); - if (s == ActorSignals::Kill) { - was_kill = true; - } else if (s == ActorSignals::Wakeup) { - was_wakeup = true; - } else if (s == ActorSignals::Cpu) { - was_cpu = true; - } else { - UNREACHABLE(); - } - signals.clear_signal(s); - } - CHECK(was_kill && was_wakeup && was_cpu); - flags.clear_signals(); - CHECK(lockerA.try_unlock(flags)); - } - } - - { - ActorLocker lockerB(&state); - CHECK(lockerB.try_lock()); - CHECK(lockerB.try_unlock(lockerB.flags())); - CHECK(lockerB.add_signals(kill_signal)); - CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill)); - auto flags = lockerB.flags(); - flags.clear_signals(); - ActorLocker lockerA(&state); - CHECK(!lockerA.add_signals(kill_signal)); - CHECK(!lockerB.try_unlock(flags)); - CHECK(!lockerA.add_signals(kill_signal)); // do not loose this signal! - CHECK(!lockerB.try_unlock(flags)); - CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill)); - CHECK(lockerB.try_unlock(flags)); - } - - { - ActorLocker lockerA(&state); - CHECK(lockerA.try_lock()); - auto flags = lockerA.flags(); - flags.set_pause(true); - CHECK(lockerA.try_unlock(flags)); - //We have to lock, though we can't execute. - CHECK(lockerA.add_signals(wakeup_signal)); - } -} - -#if !TD_THREAD_UNSUPPORTED -TEST(Actor2, locker_stress) { - ActorState state; - - constexpr size_t threads_n = 5; - auto stage = [&](std::atomic<int> &value, int need) { - value.fetch_add(1, std::memory_order_release); - while (value.load(std::memory_order_acquire) < need) { - td::this_thread::yield(); - } - }; - - struct Node { - std::atomic<td::uint32> request{0}; - td::uint32 response = 0; - char pad[64]; - }; - std::array<Node, threads_n> nodes; - auto do_work = [&]() { - for (auto &node : nodes) { - auto query = node.request.load(std::memory_order_acquire); - if (query) { - node.response = query * query; - node.request.store(0, std::memory_order_relaxed); - } - } - }; - - std::atomic<int> begin{0}; - std::atomic<int> ready{0}; - std::atomic<int> check{0}; - std::atomic<int> finish{0}; - std::vector<td::thread> threads; - for (size_t i = 0; i < threads_n; i++) { - threads.push_back(td::thread([&, id = i] { - for (size_t i = 1; i < 1000000; i++) { - ActorLocker locker(&state); - auto need = static_cast<int>(threads_n * i); - auto query = static_cast<td::uint32>(id + need); - stage(begin, need); - nodes[id].request = 0; - nodes[id].response = 0; - stage(ready, need); - if (locker.try_lock()) { - nodes[id].response = query * query; - } else { - auto cpu = ActorSignals::one(ActorSignals::Cpu); - nodes[id].request.store(query, std::memory_order_release); - locker.add_signals(cpu); - } - while (locker.own_lock()) { - auto flags = locker.flags(); - auto signals = flags.get_signals(); - if (!signals.empty()) { - do_work(); - } - flags.clear_signals(); - locker.try_unlock(flags); - } - - stage(check, need); - if (id == 0) { - CHECK(locker.add_signals(ActorSignals{})); - CHECK(!locker.flags().has_signals()); - CHECK(locker.try_unlock(locker.flags())); - for (size_t thread_id = 0; thread_id < threads_n; thread_id++) { - CHECK(nodes[thread_id].response == - static_cast<td::uint32>(thread_id + need) * static_cast<td::uint32>(thread_id + need)) - << td::tag("thread", thread_id) << " " << nodes[thread_id].response << " " - << nodes[thread_id].request.load(); - } - } - } - })); - } - for (auto &thread : threads) { - thread.join(); - } -} - -namespace { -const size_t BUF_SIZE = 1024 * 1024; -char buf[BUF_SIZE]; -td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1)); -} // namespace - -TEST(Actor2, executor_simple) { - using namespace td; - using namespace td::actor2; - struct Dispatcher : public SchedulerDispatcher { - void add_to_queue(ActorInfoPtr ptr, SchedulerId scheduler_id, bool need_poll) override { - queue.push_back(std::move(ptr)); - } - void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) override { - UNREACHABLE(); - } - SchedulerId get_scheduler_id() const override { - return SchedulerId{0}; - } - std::deque<ActorInfoPtr> queue; - }; - Dispatcher dispatcher; - - class TestActor : public Actor { - public: - void close() { - stop(); - } - - private: - void start_up() override { - sb << "StartUp"; - } - void tear_down() override { - sb << "TearDown"; - } - }; - ActorInfoCreator actor_info_creator; - auto actor = actor_info_creator.create( - std::make_unique<TestActor>(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor")); - dispatcher.add_to_queue(actor, SchedulerId{0}, false); - - { - ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); - CHECK(executor.can_send()); - CHECK(executor.can_send_immediate()); - CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice(); - sb.clear(); - executor.send(ActorMessageCreator::lambda([&] { sb << "A"; })); - CHECK(sb.as_cslice() == "A") << sb.as_cslice(); - sb.clear(); - auto big_message = ActorMessageCreator::lambda([&] { sb << "big"; }); - big_message.set_big(); - executor.send(std::move(big_message)); - CHECK(sb.as_cslice() == "") << sb.as_cslice(); - executor.send(ActorMessageCreator::lambda([&] { sb << "A"; })); - CHECK(sb.as_cslice() == "") << sb.as_cslice(); - } - CHECK(dispatcher.queue.size() == 1); - { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue()); } - CHECK(dispatcher.queue.size() == 1); - dispatcher.queue.clear(); - CHECK(sb.as_cslice() == "bigA") << sb.as_cslice(); - sb.clear(); - { - ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); - executor.send( - ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); })); - } - CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice(); - sb.clear(); - CHECK(!actor->has_actor()); - { - ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); - executor.send( - ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); })); - } - CHECK(dispatcher.queue.empty()); - CHECK(sb.as_cslice() == ""); -} - -using namespace td::actor2; -using td::uint32; -static std::atomic<int> cnt; -class Worker : public Actor { - public: - void query(uint32 x, ActorInfoPtr master); - void close() { - stop(); - } -}; -class Master : public Actor { - public: - void on_result(uint32 x, uint32 y) { - loop(); - } - - private: - uint32 l = 0; - uint32 r = 100000; - ActorInfoPtr worker; - void start_up() override { - worker = detail::create_actor<Worker>(ActorOptions().with_name("Master")); - loop(); - } - void loop() override { - l++; - if (l == r) { - if (!--cnt) { - SchedulerContext::get()->stop(); - } - detail::send_closure(*worker, &Worker::close); - stop(); - return; - } - detail::send_lambda(*worker, - [x = l, self = get_actor_info_ptr()] { detail::current_actor<Worker>().query(x, self); }); - } -}; - -void Worker::query(uint32 x, ActorInfoPtr master) { - auto y = x; - for (int i = 0; i < 100; i++) { - y = y * y; - } - detail::send_lambda(*master, [result = y, x] { detail::current_actor<Master>().on_result(x, result); }); -} - -TEST(Actor2, scheduler_simple) { - auto group_info = std::make_shared<SchedulerGroupInfo>(1); - Scheduler scheduler{group_info, SchedulerId{0}, 2}; - scheduler.start(); - scheduler.run_in_context([] { - cnt = 10; - for (int i = 0; i < 10; i++) { - detail::create_actor<Master>(ActorOptions().with_name("Master")); - } - }); - while (scheduler.run(1000)) { - } - Scheduler::close_scheduler_group(*group_info); -} - -TEST(Actor2, actor_id_simple) { - auto group_info = std::make_shared<SchedulerGroupInfo>(1); - Scheduler scheduler{group_info, SchedulerId{0}, 2}; - sb.clear(); - scheduler.start(); - - scheduler.run_in_context([] { - class A : public Actor { - public: - A(int value) : value_(value) { - sb << "A" << value_; - } - void hello() { - sb << "hello"; - } - ~A() { - sb << "~A"; - if (--cnt <= 0) { - SchedulerContext::get()->stop(); - } - } - - private: - int value_; - }; - cnt = 1; - auto id = create_actor<A>("A", 123); - CHECK(sb.as_cslice() == "A123"); - sb.clear(); - send_closure(id, &A::hello); - }); - while (scheduler.run(1000)) { - } - CHECK(sb.as_cslice() == "hello~A"); - Scheduler::close_scheduler_group(*group_info); - sb.clear(); -} - -TEST(Actor2, actor_creation) { - auto group_info = std::make_shared<SchedulerGroupInfo>(1); - Scheduler scheduler{group_info, SchedulerId{0}, 1}; - scheduler.start(); - - scheduler.run_in_context([]() mutable { - class B; - class A : public Actor { - public: - void f() { - check(); - stop(); - } - - private: - void start_up() override { - check(); - create_actor<B>("Simple", actor_id(this)).release(); - } - - void check() { - auto &context = *SchedulerContext::get(); - CHECK(context.has_poll()); - context.get_poll(); - } - - void tear_down() override { - if (--cnt <= 0) { - SchedulerContext::get()->stop(); - } - } - }; - - class B : public Actor { - public: - B(ActorId<A> a) : a_(a) { - } - - private: - void start_up() override { - auto &context = *SchedulerContext::get(); - CHECK(!context.has_poll()); - send_closure(a_, &A::f); - stop(); - } - void tear_down() override { - if (--cnt <= 0) { - SchedulerContext::get()->stop(); - } - } - ActorId<A> a_; - }; - cnt = 2; - create_actor<A>(ActorOptions().with_name("Poll").with_poll()).release(); - }); - while (scheduler.run(1000)) { - } - scheduler.stop(); - Scheduler::close_scheduler_group(*group_info); -} - -TEST(Actor2, actor_timeout_simple) { - auto group_info = std::make_shared<SchedulerGroupInfo>(1); - Scheduler scheduler{group_info, SchedulerId{0}, 2}; - sb.clear(); - scheduler.start(); - - scheduler.run_in_context([] { - class A : public Actor { - public: - void start_up() override { - set_timeout(); - } - void alarm() override { - double diff = td::Time::now() - expected_timeout_; - CHECK(-0.001 < diff && diff < 0.1) << diff; - if (cnt_-- > 0) { - set_timeout(); - } else { - stop(); - } - } - - void tear_down() override { - SchedulerContext::get()->stop(); - } - - private: - double expected_timeout_; - int cnt_ = 5; - void set_timeout() { - auto wakeup_timestamp = td::Timestamp::in(0.1); - expected_timeout_ = wakeup_timestamp.at(); - alarm_timestamp() = wakeup_timestamp; - } - }; - create_actor<A>(ActorInfoCreator::Options().with_name("A").with_poll()).release(); - }); - while (scheduler.run(1000)) { - } - Scheduler::close_scheduler_group(*group_info); - sb.clear(); -} -#endif //!TD_THREAD_UNSUPPORTED diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp index ffceacc595..628b74a94c 100644 --- a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp @@ -1,35 +1,32 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#include "td/utils/tests.h" - #include "td/actor/actor.h" +#include "td/actor/ConcurrentScheduler.h" #include "td/actor/PromiseFuture.h" +#include "td/utils/common.h" #include "td/utils/logging.h" #include "td/utils/Random.h" +#include "td/utils/ScopeGuard.h" +#include "td/utils/tests.h" #include <limits> #include <map> +#include <memory> #include <utility> -using namespace td; - -REGISTER_TESTS(actors_main); - -namespace { - template <class ContainerT> static typename ContainerT::value_type &rand_elem(ContainerT &cont) { CHECK(0 < cont.size() && cont.size() <= static_cast<size_t>(std::numeric_limits<int>::max())); - return cont[Random::fast(0, static_cast<int>(cont.size()) - 1)]; + return cont[td::Random::fast(0, static_cast<int>(cont.size()) - 1)]; } -static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) { - uint32 res = 1; +static td::uint32 fast_pow_mod_uint32(td::uint32 x, td::uint32 p) { + td::uint32 res = 1; while (p) { if (p & 1) { res *= x; @@ -40,25 +37,25 @@ static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) { return res; } -static uint32 slow_pow_mod_uint32(uint32 x, uint32 p) { - uint32 res = 1; - for (uint32 i = 0; i < p; i++) { +static td::uint32 slow_pow_mod_uint32(td::uint32 x, td::uint32 p) { + td::uint32 res = 1; + for (td::uint32 i = 0; i < p; i++) { res *= x; } return res; } -struct Query { - uint32 query_id; - uint32 result; - std::vector<int> todo; - Query() = default; - Query(const Query &) = delete; - Query &operator=(const Query &) = delete; - Query(Query &&) = default; - Query &operator=(Query &&) = default; - ~Query() { - CHECK(todo.empty()) << "Query lost"; +struct ActorQuery { + td::uint32 query_id{}; + td::uint32 result{}; + td::vector<int> todo; + ActorQuery() = default; + ActorQuery(const ActorQuery &) = delete; + ActorQuery &operator=(const ActorQuery &) = delete; + ActorQuery(ActorQuery &&) = default; + ActorQuery &operator=(ActorQuery &&) = default; + ~ActorQuery() { + LOG_CHECK(todo.empty()) << "ActorQuery lost"; } int next_pow() { CHECK(!todo.empty()); @@ -71,25 +68,25 @@ struct Query { } }; -static uint32 fast_calc(Query &q) { - uint32 result = q.result; +static td::uint32 fast_calc(ActorQuery &q) { + td::uint32 result = q.result; for (auto x : q.todo) { result = fast_pow_mod_uint32(result, x); } return result; } -class Worker final : public Actor { +class Worker final : public td::Actor { public: explicit Worker(int threads_n) : threads_n_(threads_n) { } - void query(PromiseActor<uint32> &&promise, uint32 x, uint32 p) { - uint32 result = slow_pow_mod_uint32(x, p); + void query(td::PromiseActor<td::uint32> &&promise, td::uint32 x, td::uint32 p) { + td::uint32 result = slow_pow_mod_uint32(x, p); promise.set_value(std::move(result)); (void)threads_n_; - // if (threads_n_ > 1 && Random::fast(0, 9) == 0) { - // migrate(Random::fast(2, threads_n)); + // if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) { + // migrate(td::Random::fast(2, threads_n)); //} } @@ -97,7 +94,7 @@ class Worker final : public Actor { int threads_n_; }; -class QueryActor final : public Actor { +class QueryActor final : public td::Actor { public: class Callback { public: @@ -107,44 +104,46 @@ class QueryActor final : public Actor { Callback(Callback &&) = delete; Callback &operator=(Callback &&) = delete; virtual ~Callback() = default; - virtual void on_result(Query &&query) = 0; + virtual void on_result(ActorQuery &&query) = 0; virtual void on_closed() = 0; }; explicit QueryActor(int threads_n) : threads_n_(threads_n) { } - void set_callback(std::unique_ptr<Callback> callback) { + void set_callback(td::unique_ptr<Callback> callback) { callback_ = std::move(callback); } - void set_workers(std::vector<ActorId<Worker>> workers) { + void set_workers(td::vector<td::ActorId<Worker>> workers) { workers_ = std::move(workers); } - void query(Query &&query) { - uint32 x = query.result; - uint32 p = query.next_pow(); - if (Random::fast(0, 3) && (p <= 1000 || workers_.empty())) { + void query(ActorQuery &&query) { + td::uint32 x = query.result; + td::uint32 p = query.next_pow(); + if (td::Random::fast(0, 3) && (p <= 1000 || workers_.empty())) { query.result = slow_pow_mod_uint32(x, p); callback_->on_result(std::move(query)); } else { - auto future = send_promise(rand_elem(workers_), Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, x, p); + auto future = td::Random::fast(0, 3) == 0 + ? td::send_promise<td::ActorSendType::Immediate>(rand_elem(workers_), &Worker::query, x, p) + : td::send_promise<td::ActorSendType::Later>(rand_elem(workers_), &Worker::query, x, p); if (future.is_ready()) { query.result = future.move_as_ok(); callback_->on_result(std::move(query)); } else { - future.set_event(EventCreator::raw(actor_id(), query.query_id)); + future.set_event(td::EventCreator::raw(actor_id(), query.query_id)); auto query_id = query.query_id; - pending_.insert(std::make_pair(query_id, std::make_pair(std::move(future), std::move(query)))); + pending_.emplace(query_id, std::make_pair(std::move(future), std::move(query))); } } - if (threads_n_ > 1 && Random::fast(0, 9) == 0) { - migrate(Random::fast(2, threads_n_)); + if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) { + migrate(td::Random::fast(2, threads_n_)); } } - void raw_event(const Event::Raw &event) override { - uint32 id = event.u32; + void raw_event(const td::Event::Raw &event) final { + td::uint32 id = event.u32; auto it = pending_.find(id); auto future = std::move(it->second.first); auto query = std::move(it->second.second); @@ -159,44 +158,44 @@ class QueryActor final : public Actor { stop(); } - void on_start_migrate(int32 sched_id) override { + void on_start_migrate(td::int32 sched_id) final { for (auto &it : pending_) { start_migrate(it.second.first, sched_id); } } - void on_finish_migrate() override { + void on_finish_migrate() final { for (auto &it : pending_) { finish_migrate(it.second.first); } } private: - unique_ptr<Callback> callback_; - std::map<uint32, std::pair<FutureActor<uint32>, Query>> pending_; - std::vector<ActorId<Worker>> workers_; + td::unique_ptr<Callback> callback_; + std::map<td::uint32, std::pair<td::FutureActor<td::uint32>, ActorQuery>> pending_; + td::vector<td::ActorId<Worker>> workers_; int threads_n_; }; -class MainQueryActor final : public Actor { - class QueryActorCallback : public QueryActor::Callback { +class MainQueryActor final : public td::Actor { + class QueryActorCallback final : public QueryActor::Callback { public: - void on_result(Query &&query) override { + void on_result(ActorQuery &&query) final { if (query.ready()) { send_closure(parent_id_, &MainQueryActor::on_result, std::move(query)); } else { send_closure(next_solver_, &QueryActor::query, std::move(query)); } } - void on_closed() override { + void on_closed() final { send_closure(parent_id_, &MainQueryActor::on_closed); } - QueryActorCallback(ActorId<MainQueryActor> parent_id, ActorId<QueryActor> next_solver) + QueryActorCallback(td::ActorId<MainQueryActor> parent_id, td::ActorId<QueryActor> next_solver) : parent_id_(parent_id), next_solver_(next_solver) { } private: - ActorId<MainQueryActor> parent_id_; - ActorId<QueryActor> next_solver_; + td::ActorId<MainQueryActor> parent_id_; + td::ActorId<QueryActor> next_solver_; }; const int ACTORS_CNT = 10; @@ -206,39 +205,39 @@ class MainQueryActor final : public Actor { explicit MainQueryActor(int threads_n) : threads_n_(threads_n) { } - void start_up() override { + void start_up() final { actors_.resize(ACTORS_CNT); for (auto &actor : actors_) { - auto actor_ptr = make_unique<QueryActor>(threads_n_); - actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0) + auto actor_ptr = td::make_unique<QueryActor>(threads_n_); + actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0) .release(); } workers_.resize(WORKERS_CNT); for (auto &worker : workers_) { - auto actor_ptr = make_unique<Worker>(threads_n_); - worker = - register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release(); + auto actor_ptr = td::make_unique<Worker>(threads_n_); + worker = register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0) + .release(); } for (int i = 0; i < ACTORS_CNT; i++) { ref_cnt_++; send_closure(actors_[i], &QueryActor::set_callback, - make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT])); + td::make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT])); send_closure(actors_[i], &QueryActor::set_workers, workers_); } yield(); } - void on_result(Query &&query) { + void on_result(ActorQuery &&query) { CHECK(query.ready()); CHECK(query.result == expected_[query.query_id]); in_cnt_++; wakeup(); } - Query create_query() { - Query q; + ActorQuery create_query() { + ActorQuery q; q.query_id = (query_id_ += 2); q.result = q.query_id; q.todo = {1, 1, 1, 1, 1, 1, 1, 1, 10000}; @@ -249,14 +248,14 @@ class MainQueryActor final : public Actor { void on_closed() { ref_cnt_--; if (ref_cnt_ == 0) { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } } - void wakeup() override { - int cnt = 100000; + void wakeup() final { + int cnt = 10000; while (out_cnt_ < in_cnt_ + 100 && out_cnt_ < cnt) { - if (Random::fast(0, 1)) { + if (td::Random::fast_bool()) { send_closure(rand_elem(actors_), &QueryActor::query, create_query()); } else { send_closure_later(rand_elem(actors_), &QueryActor::query, create_query()); @@ -273,9 +272,9 @@ class MainQueryActor final : public Actor { } private: - std::map<uint32, uint32> expected_; - std::vector<ActorId<QueryActor>> actors_; - std::vector<ActorId<Worker>> workers_; + std::map<td::uint32, td::uint32> expected_; + td::vector<td::ActorId<QueryActor>> actors_; + td::vector<td::ActorId<Worker>> workers_; int out_cnt_ = 0; int in_cnt_ = 0; int query_id_ = 1; @@ -283,101 +282,104 @@ class MainQueryActor final : public Actor { int threads_n_; }; -class SimpleActor final : public Actor { +class SimpleActor final : public td::Actor { public: - explicit SimpleActor(int32 threads_n) : threads_n_(threads_n) { + explicit SimpleActor(td::int32 threads_n) : threads_n_(threads_n) { } - void start_up() override { - auto actor_ptr = make_unique<Worker>(threads_n_); + void start_up() final { + auto actor_ptr = td::make_unique<Worker>(threads_n_); worker_ = - register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release(); + register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0).release(); yield(); } - void wakeup() override { - if (q_ == 100000) { - Scheduler::instance()->finish(); + void wakeup() final { + if (q_ == 10000) { + td::Scheduler::instance()->finish(); stop(); return; } q_++; - p_ = Random::fast(0, 1) ? 1 : 10000; - auto future = send_promise(worker_, Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, q_, p_); + p_ = td::Random::fast_bool() ? 1 : 10000; + auto future = td::Random::fast(0, 3) == 0 + ? td::send_promise<td::ActorSendType::Immediate>(worker_, &Worker::query, q_, p_) + : td::send_promise<td::ActorSendType::Later>(worker_, &Worker::query, q_, p_); if (future.is_ready()) { auto result = future.move_as_ok(); CHECK(result == fast_pow_mod_uint32(q_, p_)); yield(); } else { - future.set_event(EventCreator::raw(actor_id(), nullptr)); + future.set_event(td::EventCreator::raw(actor_id(), nullptr)); future_ = std::move(future); } - // if (threads_n_ > 1 && Random::fast(0, 2) == 0) { - // migrate(Random::fast(1, threads_n)); + // if (threads_n_ > 1 && td::Random::fast(0, 2) == 0) { + // migrate(td::Random::fast(1, threads_n)); //} } - void raw_event(const Event::Raw &event) override { + void raw_event(const td::Event::Raw &event) final { auto result = future_.move_as_ok(); CHECK(result == fast_pow_mod_uint32(q_, p_)); yield(); } - void on_start_migrate(int32 sched_id) override { + void on_start_migrate(td::int32 sched_id) final { start_migrate(future_, sched_id); } - void on_finish_migrate() override { + void on_finish_migrate() final { finish_migrate(future_); } private: - int32 threads_n_; - ActorId<Worker> worker_; - FutureActor<uint32> future_; - uint32 q_ = 1; - uint32 p_; + td::int32 threads_n_; + td::ActorId<Worker> worker_; + td::FutureActor<td::uint32> future_; + td::uint32 q_ = 1; + td::uint32 p_ = 0; }; -} // namespace -class SendToDead : public Actor { +class SendToDead final : public td::Actor { public: - class Parent : public Actor { + class Parent final : public td::Actor { public: - explicit Parent(ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) { + explicit Parent(td::ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) { } - void start_up() override { - set_timeout_in(Random::fast_uint32() % 3 * 0.001); + void start_up() final { + set_timeout_in(td::Random::fast_uint32() % 3 * 0.001); if (ttl_ != 0) { - child_ = create_actor_on_scheduler<Parent>( - "Child", Random::fast_uint32() % Scheduler::instance()->sched_count(), actor_shared(), ttl_ - 1); + child_ = td::create_actor_on_scheduler<Parent>( + "Child", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), actor_shared(this), + ttl_ - 1); } } - void timeout_expired() override { + void timeout_expired() final { stop(); } private: - ActorOwn<Parent> child_; - ActorShared<> parent_; + td::ActorOwn<Parent> child_; + td::ActorShared<> parent_; int ttl_; }; - void start_up() override { + void start_up() final { for (int i = 0; i < 2000; i++) { - create_actor_on_scheduler<Parent>("Parent", Random::fast_uint32() % Scheduler::instance()->sched_count(), - create_reference(), 4) + td::create_actor_on_scheduler<Parent>( + "Parent", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), create_reference(), 4) .release(); } } - ActorShared<> create_reference() { + td::ActorShared<> create_reference() { ref_cnt_++; - return actor_shared(); + return actor_shared(this); } - void hangup_shared() override { + + void hangup_shared() final { ref_cnt_--; if (ref_cnt_ == 0) { ttl_--; if (ttl_ <= 0) { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); stop(); } else { start_up(); @@ -385,18 +387,17 @@ class SendToDead : public Actor { } } - uint32 ttl_{50}; - uint32 ref_cnt_{0}; + td::uint32 ttl_{50}; + td::uint32 ref_cnt_{0}; }; TEST(Actors, send_to_dead) { //TODO: fix CHECK(storage_count_.load() == 0) return; - ConcurrentScheduler sched; int threads_n = 5; - sched.init(threads_n); + td::ConcurrentScheduler sched(threads_n, 0); - sched.create_actor_unsafe<SendToDead>(0, "manager").release(); + sched.create_actor_unsafe<SendToDead>(0, "SendToDead").release(); sched.start(); while (sched.run_main(10)) { // empty @@ -405,11 +406,8 @@ TEST(Actors, send_to_dead) { } TEST(Actors, main_simple) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - - ConcurrentScheduler sched; int threads_n = 3; - sched.init(threads_n); + td::ConcurrentScheduler sched(threads_n, 0); sched.create_actor_unsafe<SimpleActor>(threads_n > 1 ? 1 : 0, "simple", threads_n).release(); sched.start(); @@ -420,13 +418,10 @@ TEST(Actors, main_simple) { } TEST(Actors, main) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - - ConcurrentScheduler sched; int threads_n = 9; - sched.init(threads_n); + td::ConcurrentScheduler sched(threads_n, 0); - sched.create_actor_unsafe<MainQueryActor>(threads_n > 1 ? 1 : 0, "manager", threads_n).release(); + sched.create_actor_unsafe<MainQueryActor>(threads_n > 1 ? 1 : 0, "MainQuery", threads_n).release(); sched.start(); while (sched.run_main(10)) { // empty @@ -434,27 +429,76 @@ TEST(Actors, main) { sched.finish(); } -class DoAfterStop : public Actor { +class DoAfterStop final : public td::Actor { public: - void loop() override { - ptr = std::make_unique<int>(10); + void loop() final { + ptr = td::make_unique<int>(10); stop(); CHECK(*ptr == 10); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } private: - std::unique_ptr<int> ptr; + td::unique_ptr<int> ptr; }; TEST(Actors, do_after_stop) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); + int threads_n = 0; + td::ConcurrentScheduler sched(threads_n, 0); - ConcurrentScheduler sched; + sched.create_actor_unsafe<DoAfterStop>(0, "DoAfterStop").release(); + sched.start(); + while (sched.run_main(10)) { + // empty + } + sched.finish(); +} + +class XContext final : public td::ActorContext { + public: + td::int32 get_id() const final { + return 123456789; + } + + void validate() { + CHECK(x == 1234); + } + ~XContext() final { + x = 0; + } + int x = 1234; +}; + +class WithXContext final : public td::Actor { + public: + void start_up() final { + auto old_context = set_context(std::make_shared<XContext>()); + } + void f(td::unique_ptr<td::Guard> guard) { + } + void close() { + stop(); + } +}; + +static void check_context() { + auto ptr = static_cast<XContext *>(td::Scheduler::context()); + CHECK(ptr != nullptr); + ptr->validate(); +} + +TEST(Actors, context_during_destruction) { int threads_n = 0; - sched.init(threads_n); + td::ConcurrentScheduler sched(threads_n, 0); - sched.create_actor_unsafe<DoAfterStop>(0, "manager").release(); + { + auto guard = sched.get_main_guard(); + auto with_context = td::create_actor<WithXContext>("WithXContext").release(); + send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); })); + send_closure_later(with_context, &WithXContext::close); + send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); })); + send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { td::Scheduler::instance()->finish(); })); + } sched.start(); while (sched.run_main(10)) { // empty diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp index c0a6c32b61..78d32d5437 100644 --- a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp @@ -1,57 +1,66 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#include "td/utils/tests.h" - #include "td/actor/actor.h" +#include "td/actor/ConcurrentScheduler.h" #include "td/actor/MultiPromise.h" #include "td/actor/PromiseFuture.h" #include "td/actor/SleepActor.h" -#include "td/actor/Timeout.h" +#include "td/utils/common.h" #include "td/utils/logging.h" +#include "td/utils/MpscPollableQueue.h" #include "td/utils/Observer.h" #include "td/utils/port/FileFd.h" +#include "td/utils/port/thread.h" +#include "td/utils/Promise.h" #include "td/utils/Slice.h" #include "td/utils/Status.h" #include "td/utils/StringBuilder.h" +#include "td/utils/tests.h" +#include "td/utils/Time.h" +#include <memory> #include <tuple> -REGISTER_TESTS(actors_simple) - -namespace { -using namespace td; - static const size_t BUF_SIZE = 1024 * 1024; static char buf[BUF_SIZE]; static char buf2[BUF_SIZE]; -static StringBuilder sb(MutableSlice(buf, BUF_SIZE - 1)); -static StringBuilder sb2(MutableSlice(buf2, BUF_SIZE - 1)); +static td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1)); +static td::StringBuilder sb2(td::MutableSlice(buf2, BUF_SIZE - 1)); + +static td::vector<std::shared_ptr<td::MpscPollableQueue<td::EventFull>>> create_queues() { +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + return {}; +#else + auto res = std::make_shared<td::MpscPollableQueue<td::EventFull>>(); + res->init(); + return {res}; +#endif +} TEST(Actors, SendLater) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); sb.clear(); - Scheduler scheduler; - scheduler.init(); + td::Scheduler scheduler; + scheduler.init(0, create_queues(), nullptr); auto guard = scheduler.get_guard(); - class Worker : public Actor { + class Worker final : public td::Actor { public: void f() { sb << "A"; } }; - auto id = create_actor<Worker>("Worker"); - scheduler.run_no_guard(0); - send_closure(id, &Worker::f); - send_closure_later(id, &Worker::f); - send_closure(id, &Worker::f); + auto id = td::create_actor<Worker>("Worker"); + scheduler.run_no_guard(td::Timestamp::in(1)); + td::send_closure(id, &Worker::f); + td::send_closure_later(id, &Worker::f); + td::send_closure(id, &Worker::f); ASSERT_STREQ("A", sb.as_cslice().c_str()); - scheduler.run_no_guard(0); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("AAA", sb.as_cslice().c_str()); } @@ -63,21 +72,21 @@ class X { X(const X &) { sb << "[cnstr_copy]"; } - X(X &&) { + X(X &&) noexcept { sb << "[cnstr_move]"; } X &operator=(const X &) { sb << "[set_copy]"; return *this; } - X &operator=(X &&) { + X &operator=(X &&) noexcept { sb << "[set_move]"; return *this; } ~X() = default; }; -class XReceiver final : public Actor { +class XReceiver final : public td::Actor { public: void by_const_ref(const X &) { sb << "[by_const_ref]"; @@ -91,13 +100,12 @@ class XReceiver final : public Actor { }; TEST(Actors, simple_pass_event_arguments) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - Scheduler scheduler; - scheduler.init(); + td::Scheduler scheduler; + scheduler.init(0, create_queues(), nullptr); auto guard = scheduler.get_guard(); - auto id = create_actor<XReceiver>("XR").release(); - scheduler.run_no_guard(0); + auto id = td::create_actor<XReceiver>("XR").release(); + scheduler.run_no_guard(td::Timestamp::in(1)); X x; @@ -112,47 +120,47 @@ TEST(Actors, simple_pass_event_arguments) { // Tmp-->ConstRef sb.clear(); - send_closure(id, &XReceiver::by_const_ref, X()); + td::send_closure(id, &XReceiver::by_const_ref, X()); ASSERT_STREQ("[cnstr_default][by_const_ref]", sb.as_cslice().c_str()); // Tmp-->ConstRef (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_const_ref, X()); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_const_ref, X()); + scheduler.run_no_guard(td::Timestamp::in(1)); // LOG(ERROR) << sb.as_cslice(); ASSERT_STREQ("[cnstr_default][cnstr_move][by_const_ref]", sb.as_cslice().c_str()); // Tmp-->LvalueRef sb.clear(); - send_closure(id, &XReceiver::by_lvalue_ref, X()); + td::send_closure(id, &XReceiver::by_lvalue_ref, X()); ASSERT_STREQ("[cnstr_default][by_lvalue_ref]", sb.as_cslice().c_str()); // Tmp-->LvalueRef (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_lvalue_ref, X()); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_lvalue_ref, X()); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("[cnstr_default][cnstr_move][by_lvalue_ref]", sb.as_cslice().c_str()); // Tmp-->Value sb.clear(); - send_closure(id, &XReceiver::by_value, X()); + td::send_closure(id, &XReceiver::by_value, X()); ASSERT_STREQ("[cnstr_default][cnstr_move][by_value]", sb.as_cslice().c_str()); // Tmp-->Value (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_value, X()); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_value, X()); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("[cnstr_default][cnstr_move][cnstr_move][by_value]", sb.as_cslice().c_str()); // Var-->ConstRef sb.clear(); - send_closure(id, &XReceiver::by_const_ref, x); + td::send_closure(id, &XReceiver::by_const_ref, x); ASSERT_STREQ("[by_const_ref]", sb.as_cslice().c_str()); // Var-->ConstRef (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_const_ref, x); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_const_ref, x); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("[cnstr_copy][by_const_ref]", sb.as_cslice().c_str()); // Var-->LvalueRef @@ -161,24 +169,24 @@ TEST(Actors, simple_pass_event_arguments) { // Var-->Value sb.clear(); - send_closure(id, &XReceiver::by_value, x); + td::send_closure(id, &XReceiver::by_value, x); ASSERT_STREQ("[cnstr_copy][by_value]", sb.as_cslice().c_str()); // Var-->Value (Delayed) sb.clear(); - send_closure_later(id, &XReceiver::by_value, x); - scheduler.run_no_guard(0); + td::send_closure_later(id, &XReceiver::by_value, x); + scheduler.run_no_guard(td::Timestamp::in(1)); ASSERT_STREQ("[cnstr_copy][cnstr_move][by_value]", sb.as_cslice().c_str()); } -class PrintChar final : public Actor { +class PrintChar final : public td::Actor { public: PrintChar(char c, int cnt) : char_(c), cnt_(cnt) { } - void start_up() override { + void start_up() final { yield(); } - void wakeup() override { + void wakeup() final { if (cnt_ == 0) { stop(); } else { @@ -192,25 +200,23 @@ class PrintChar final : public Actor { char char_; int cnt_; }; -} // namespace // // Yield must add actor to the end of queue // TEST(Actors, simple_hand_yield) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - Scheduler scheduler; - scheduler.init(); + td::Scheduler scheduler; + scheduler.init(0, create_queues(), nullptr); sb.clear(); int cnt = 1000; { auto guard = scheduler.get_guard(); - create_actor<PrintChar>("PrintA", 'A', cnt).release(); - create_actor<PrintChar>("PrintB", 'B', cnt).release(); - create_actor<PrintChar>("PrintC", 'C', cnt).release(); + td::create_actor<PrintChar>("PrintA", 'A', cnt).release(); + td::create_actor<PrintChar>("PrintB", 'B', cnt).release(); + td::create_actor<PrintChar>("PrintC", 'C', cnt).release(); } - scheduler.run(0); - std::string expected; + scheduler.run(td::Timestamp::in(1)); + td::string expected; for (int i = 0; i < cnt; i++) { expected += "ABC"; } @@ -219,7 +225,7 @@ TEST(Actors, simple_hand_yield) { class Ball { public: - friend void start_migrate(Ball &ball, int32 sched_id) { + friend void start_migrate(Ball &ball, td::int32 sched_id) { sb << "start"; } friend void finish_migrate(Ball &ball) { @@ -227,31 +233,30 @@ class Ball { } }; -class Pong final : public Actor { +class Pong final : public td::Actor { public: void pong(Ball ball) { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } }; -class Ping final : public Actor { +class Ping final : public td::Actor { public: - explicit Ping(ActorId<Pong> pong) : pong_(pong) { + explicit Ping(td::ActorId<Pong> pong) : pong_(pong) { } - void start_up() override { - send_closure(pong_, &Pong::pong, Ball()); + void start_up() final { + td::send_closure(pong_, &Pong::pong, Ball()); } private: - ActorId<Pong> pong_; + td::ActorId<Pong> pong_; }; TEST(Actors, simple_migrate) { sb.clear(); sb2.clear(); - ConcurrentScheduler scheduler; - scheduler.init(2); + td::ConcurrentScheduler scheduler(2, 0); auto pong = scheduler.create_actor_unsafe<Pong>(2, "Pong").release(); scheduler.create_actor_unsafe<Ping>(1, "Ping", pong).release(); scheduler.start(); @@ -267,26 +272,25 @@ TEST(Actors, simple_migrate) { #endif } -class OpenClose final : public Actor { +class OpenClose final : public td::Actor { public: explicit OpenClose(int cnt) : cnt_(cnt) { } - void start_up() override { + void start_up() final { yield(); } - void wakeup() override { - ObserverBase *observer = reinterpret_cast<ObserverBase *>(123); + void wakeup() final { + auto observer = reinterpret_cast<td::ObserverBase *>(123); if (cnt_ > 0) { - auto r_file_fd = FileFd::open("server", FileFd::Read | FileFd::Create); - CHECK(r_file_fd.is_ok()) << r_file_fd.error(); + auto r_file_fd = td::FileFd::open("server", td::FileFd::Read | td::FileFd::Create); + LOG_CHECK(r_file_fd.is_ok()) << r_file_fd.error(); auto file_fd = r_file_fd.move_as_ok(); - // LOG(ERROR) << file_fd.get_native_fd(); - file_fd.get_fd().set_observer(observer); + { auto pollable_fd = file_fd.get_poll_info().extract_pollable_fd(observer); } file_fd.close(); cnt_--; yield(); } else { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } } @@ -295,14 +299,8 @@ class OpenClose final : public Actor { }; TEST(Actors, open_close) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - ConcurrentScheduler scheduler; - scheduler.init(2); - int cnt = 1000000; -#if TD_WINDOWS || TD_ANDROID - // TODO(perf) optimize - cnt = 100; -#endif + td::ConcurrentScheduler scheduler(2, 0); + int cnt = 10000; // TODO(perf) optimize scheduler.create_actor_unsafe<OpenClose>(1, "A", cnt).release(); scheduler.create_actor_unsafe<OpenClose>(2, "B", cnt).release(); scheduler.start(); @@ -311,62 +309,59 @@ TEST(Actors, open_close) { scheduler.finish(); } -namespace { -class MsgActor : public Actor { +class MsgActor : public td::Actor { public: virtual void msg() = 0; }; -class Slave : public Actor { +class Slave final : public td::Actor { public: - ActorId<MsgActor> msg; - explicit Slave(ActorId<MsgActor> msg) : msg(msg) { + td::ActorId<MsgActor> msg; + explicit Slave(td::ActorId<MsgActor> msg) : msg(msg) { } - void hangup() override { - send_closure(msg, &MsgActor::msg); + void hangup() final { + td::send_closure(msg, &MsgActor::msg); } }; -class MasterActor : public MsgActor { +class MasterActor final : public MsgActor { public: - void loop() override { + void loop() final { alive_ = true; - slave = create_actor<Slave>("slave", static_cast<ActorId<MsgActor>>(actor_id(this))); + slave = td::create_actor<Slave>("Slave", static_cast<td::ActorId<MsgActor>>(actor_id(this))); stop(); } - ActorOwn<Slave> slave; + td::ActorOwn<Slave> slave; MasterActor() = default; MasterActor(const MasterActor &) = delete; MasterActor &operator=(const MasterActor &) = delete; MasterActor(MasterActor &&) = delete; MasterActor &operator=(MasterActor &&) = delete; - ~MasterActor() override { + ~MasterActor() final { alive_ = 987654321; } - void msg() override { + void msg() final { CHECK(alive_ == 123456789); } - uint64 alive_ = 123456789; + td::uint64 alive_ = 123456789; }; -} // namespace TEST(Actors, call_after_destruct) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - Scheduler scheduler; - scheduler.init(); + td::Scheduler scheduler; + scheduler.init(0, create_queues(), nullptr); { auto guard = scheduler.get_guard(); - create_actor<MasterActor>("Master").release(); + td::create_actor<MasterActor>("Master").release(); } - scheduler.run(0); + scheduler.run(td::Timestamp::in(1)); } -class LinkTokenSlave : public Actor { +class LinkTokenSlave final : public td::Actor { public: - explicit LinkTokenSlave(ActorShared<> parent) : parent_(std::move(parent)) { + explicit LinkTokenSlave(td::ActorShared<> parent) : parent_(std::move(parent)) { } - void add(uint64 link_token) { + void add(td::uint64 link_token) { CHECK(link_token == get_link_token()); } void close() { @@ -374,62 +369,61 @@ class LinkTokenSlave : public Actor { } private: - ActorShared<> parent_; + td::ActorShared<> parent_; }; -class LinkTokenMasterActor : public Actor { +class LinkTokenMasterActor final : public td::Actor { public: explicit LinkTokenMasterActor(int cnt) : cnt_(cnt) { } - void start_up() override { - child_ = create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release(); + void start_up() final { + child_ = td::create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release(); yield(); } - void loop() override { + void loop() final { for (int i = 0; i < 100 && cnt_ > 0; cnt_--, i++) { + auto token = static_cast<td::uint64>(cnt_) + 1; switch (i % 4) { case 0: { - send_closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1); + td::send_closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token); break; } case 1: { - send_closure_later(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1); + td::send_closure_later(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token); break; } case 2: { - EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1) + td::EventCreator::closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token) .try_emit(); break; } case 3: { - EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1) + td::EventCreator::closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token) .try_emit_later(); break; } } } if (cnt_ == 0) { - send_closure(child_, &LinkTokenSlave::close); + td::send_closure(child_, &LinkTokenSlave::close); } else { yield(); } } - void hangup_shared() override { + void hangup_shared() final { CHECK(get_link_token() == 123); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); stop(); } private: int cnt_; - ActorId<LinkTokenSlave> child_; + td::ActorId<LinkTokenSlave> child_; }; TEST(Actors, link_token) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); auto cnt = 100000; scheduler.create_actor_unsafe<LinkTokenMasterActor>(0, "A", cnt).release(); scheduler.start(); @@ -440,25 +434,25 @@ TEST(Actors, link_token) { TEST(Actors, promise) { int value = -1; - Promise<int> p1 = PromiseCreator::lambda([&](int x) { value = x; }); - p1.set_error(Status::Error("Test error")); + td::Promise<int> p1 = td::PromiseCreator::lambda([&](int x) { value = x; }); + p1.set_error(td::Status::Error("Test error")); ASSERT_EQ(0, value); - Promise<int32> p2 = PromiseCreator::lambda([&](Result<int32> x) { value = 1; }); - p2.set_error(Status::Error("Test error")); + td::Promise<td::int32> p2 = td::PromiseCreator::lambda([&](td::Result<td::int32> x) { value = 1; }); + p2.set_error(td::Status::Error("Test error")); ASSERT_EQ(1, value); } -class LaterSlave : public Actor { +class LaterSlave final : public td::Actor { public: - explicit LaterSlave(ActorShared<> parent) : parent_(std::move(parent)) { + explicit LaterSlave(td::ActorShared<> parent) : parent_(std::move(parent)) { } private: - ActorShared<> parent_; + td::ActorShared<> parent_; - void hangup() override { + void hangup() final { sb << "A"; - send_closure(actor_id(this), &LaterSlave::finish); + td::send_closure(actor_id(this), &LaterSlave::finish); } void finish() { sb << "B"; @@ -466,31 +460,29 @@ class LaterSlave : public Actor { } }; -class LaterMasterActor : public Actor { +class LaterMasterActor final : public td::Actor { int cnt_ = 3; - std::vector<ActorOwn<LaterSlave>> children_; - void start_up() override { + td::vector<td::ActorOwn<LaterSlave>> children_; + void start_up() final { for (int i = 0; i < cnt_; i++) { - children_.push_back(create_actor<LaterSlave>("B", actor_shared())); + children_.push_back(td::create_actor<LaterSlave>("B", actor_shared(this))); } yield(); } - void loop() override { + void loop() final { children_.clear(); } - void hangup_shared() override { + void hangup_shared() final { if (!--cnt_) { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); stop(); } } }; TEST(Actors, later) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); sb.clear(); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<LaterMasterActor>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { @@ -499,39 +491,36 @@ TEST(Actors, later) { ASSERT_STREQ(sb.as_cslice().c_str(), "AAABBB"); } -class MultiPromise2 : public Actor { +class MultiPromise2 final : public td::Actor { public: - void start_up() override { - auto promise = PromiseCreator::lambda([](Result<Unit> result) { + void start_up() final { + auto promise = td::PromiseCreator::lambda([](td::Result<td::Unit> result) { result.ensure(); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); }); - MultiPromiseActorSafe multi_promise; + td::MultiPromiseActorSafe multi_promise{"MultiPromiseActor2"}; multi_promise.add_promise(std::move(promise)); for (int i = 0; i < 10; i++) { - create_actor<SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release(); + td::create_actor<td::SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release(); } } }; -class MultiPromise1 : public Actor { +class MultiPromise1 final : public td::Actor { public: - void start_up() override { - auto promise = PromiseCreator::lambda([](Result<Unit> result) { + void start_up() final { + auto promise = td::PromiseCreator::lambda([](td::Result<td::Unit> result) { CHECK(result.is_error()); - create_actor<MultiPromise2>("B").release(); + td::create_actor<MultiPromise2>("B").release(); }); - MultiPromiseActorSafe multi_promise; + td::MultiPromiseActorSafe multi_promise{"MultiPromiseActor1"}; multi_promise.add_promise(std::move(promise)); } }; TEST(Actors, MultiPromise) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - sb.clear(); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<MultiPromise1>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { @@ -539,23 +528,20 @@ TEST(Actors, MultiPromise) { scheduler.finish(); } -class FastPromise : public Actor { +class FastPromise final : public td::Actor { public: - void start_up() override { - PromiseFuture<int> pf; + void start_up() final { + td::PromiseFuture<int> pf; auto promise = pf.move_promise(); auto future = pf.move_future(); promise.set_value(123); CHECK(future.move_as_ok() == 123); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } }; TEST(Actors, FastPromise) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - sb.clear(); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<FastPromise>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { @@ -563,21 +549,18 @@ TEST(Actors, FastPromise) { scheduler.finish(); } -class StopInTeardown : public Actor { - void loop() override { +class StopInTeardown final : public td::Actor { + void loop() final { stop(); } - void tear_down() override { + void tear_down() final { stop(); - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } }; TEST(Actors, stop_in_teardown) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - sb.clear(); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<StopInTeardown>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { @@ -585,38 +568,113 @@ TEST(Actors, stop_in_teardown) { scheduler.finish(); } -class AlwaysWaitForMailbox : public Actor { +class AlwaysWaitForMailbox final : public td::Actor { public: - void start_up() override { + void start_up() final { always_wait_for_mailbox(); - create_actor<SleepActor>("Sleep", 0.1, PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](Unit) { - send_closure(actor_id, &AlwaysWaitForMailbox::g); - send_closure(actor_id, &AlwaysWaitForMailbox::g); - CHECK(!ptr->was_f_); - })) + td::create_actor<td::SleepActor>("Sleep", 0.1, + td::PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](td::Unit) { + td::send_closure(actor_id, &AlwaysWaitForMailbox::g); + td::send_closure(actor_id, &AlwaysWaitForMailbox::g); + CHECK(!ptr->was_f_); + })) .release(); } void f() { was_f_ = true; - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); } void g() { - send_closure(actor_id(this), &AlwaysWaitForMailbox::f); + td::send_closure(actor_id(this), &AlwaysWaitForMailbox::f); } private: - Timeout timeout_; bool was_f_{false}; }; TEST(Actors, always_wait_for_mailbox) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); - ConcurrentScheduler scheduler; - scheduler.init(0); + td::ConcurrentScheduler scheduler(0, 0); scheduler.create_actor_unsafe<AlwaysWaitForMailbox>(0, "A").release(); scheduler.start(); while (scheduler.run_main(10)) { } scheduler.finish(); } + +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED +TEST(Actors, send_from_other_threads) { + td::ConcurrentScheduler scheduler(1, 0); + int thread_n = 10; + class Listener final : public td::Actor { + public: + explicit Listener(int cnt) : cnt_(cnt) { + } + void dec() { + if (--cnt_ == 0) { + td::Scheduler::instance()->finish(); + } + } + + private: + int cnt_; + }; + + auto A = scheduler.create_actor_unsafe<Listener>(1, "A", thread_n).release(); + scheduler.start(); + td::vector<td::thread> threads(thread_n); + for (auto &thread : threads) { + thread = td::thread([&A, &scheduler] { + auto guard = scheduler.get_send_guard(); + td::send_closure(A, &Listener::dec); + }); + } + while (scheduler.run_main(10)) { + } + for (auto &thread : threads) { + thread.join(); + } + scheduler.finish(); +} +#endif + +class DelayedCall final : public td::Actor { + public: + void on_called(int *step) { + CHECK(*step == 0); + *step = 1; + } +}; + +class MultiPromiseSendClosureLaterTest final : public td::Actor { + public: + void start_up() final { + delayed_call_ = td::create_actor<DelayedCall>("DelayedCall").release(); + mpa_.add_promise(td::PromiseCreator::lambda([this](td::Unit) { + CHECK(step_ == 1); + step_++; + td::Scheduler::instance()->finish(); + })); + auto lock = mpa_.get_promise(); + td::send_closure_later(delayed_call_, &DelayedCall::on_called, &step_); + lock.set_value(td::Unit()); + } + + void tear_down() final { + CHECK(step_ == 2); + } + + private: + int step_ = 0; + td::MultiPromiseActor mpa_{"MultiPromiseActor"}; + td::ActorId<DelayedCall> delayed_call_; +}; + +TEST(Actors, MultiPromiseSendClosureLater) { + td::ConcurrentScheduler scheduler(0, 0); + scheduler.create_actor_unsafe<MultiPromiseSendClosureLaterTest>(0, "MultiPromiseSendClosureLaterTest").release(); + scheduler.start(); + while (scheduler.run_main(1)) { + } + scheduler.finish(); +} diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp index b97a258a44..bac42e3fd5 100644 --- a/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp @@ -1,22 +1,17 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -#include "td/utils/tests.h" - #include "td/actor/actor.h" +#include "td/actor/ConcurrentScheduler.h" -#include "td/utils/logging.h" - -REGISTER_TESTS(actors_workers); - -namespace { - -using namespace td; +#include "td/utils/common.h" +#include "td/utils/SliceBuilder.h" +#include "td/utils/tests.h" -class PowerWorker final : public Actor { +class PowerWorker final : public td::Actor { public: class Callback { public: @@ -29,12 +24,12 @@ class PowerWorker final : public Actor { virtual void on_ready(int query, int res) = 0; virtual void on_closed() = 0; }; - void set_callback(unique_ptr<Callback> callback) { + void set_callback(td::unique_ptr<Callback> callback) { callback_ = std::move(callback); } - void task(uint32 x, uint32 p) { - uint32 res = 1; - for (uint32 i = 0; i < p; i++) { + void task(td::uint32 x, td::uint32 p) { + td::uint32 res = 1; + for (td::uint32 i = 0; i < p; i++) { res *= x; } callback_->on_ready(x, res); @@ -45,39 +40,41 @@ class PowerWorker final : public Actor { } private: - std::unique_ptr<Callback> callback_; + td::unique_ptr<Callback> callback_; }; -class Manager final : public Actor { +class Manager final : public td::Actor { public: - Manager(int queries_n, int query_size, std::vector<ActorId<PowerWorker>> workers) - : workers_(std::move(workers)), left_query_(queries_n), query_size_(query_size) { + Manager(int queries_n, int query_size, td::vector<td::ActorId<PowerWorker>> workers) + : workers_(std::move(workers)) + , ref_cnt_(static_cast<int>(workers_.size())) + , left_query_(queries_n) + , query_size_(query_size) { } - class Callback : public PowerWorker::Callback { + class Callback final : public PowerWorker::Callback { public: - Callback(ActorId<Manager> actor_id, int worker_id) : actor_id_(actor_id), worker_id_(worker_id) { + Callback(td::ActorId<Manager> actor_id, int worker_id) : actor_id_(actor_id), worker_id_(worker_id) { } - void on_ready(int query, int result) override { - send_closure(actor_id_, &Manager::on_ready, worker_id_, query, result); + void on_ready(int query, int result) final { + td::send_closure(actor_id_, &Manager::on_ready, worker_id_, query, result); } - void on_closed() override { - send_closure_later(actor_id_, &Manager::on_closed, worker_id_); + void on_closed() final { + td::send_closure_later(actor_id_, &Manager::on_closed, worker_id_); } private: - ActorId<Manager> actor_id_; + td::ActorId<Manager> actor_id_; int worker_id_; }; - void start_up() override { - ref_cnt_ = static_cast<int>(workers_.size()); + void start_up() final { int i = 0; for (auto &worker : workers_) { ref_cnt_++; - send_closure_later(worker, &PowerWorker::set_callback, make_unique<Callback>(actor_id(this), i)); + td::send_closure_later(worker, &PowerWorker::set_callback, td::make_unique<Callback>(actor_id(this), i)); i++; - send_closure_later(worker, &PowerWorker::task, 3, query_size_); + td::send_closure_later(worker, &PowerWorker::task, 3, query_size_); left_query_--; } } @@ -85,10 +82,10 @@ class Manager final : public Actor { void on_ready(int worker_id, int query, int res) { ref_cnt_--; if (left_query_ == 0) { - send_closure(workers_[worker_id], &PowerWorker::close); + td::send_closure(workers_[worker_id], &PowerWorker::close); } else { ref_cnt_++; - send_closure(workers_[worker_id], &PowerWorker::task, 3, query_size_); + td::send_closure(workers_[worker_id], &PowerWorker::task, 3, query_size_); left_query_--; } } @@ -96,30 +93,27 @@ class Manager final : public Actor { void on_closed(int worker_id) { ref_cnt_--; if (ref_cnt_ == 0) { - Scheduler::instance()->finish(); + td::Scheduler::instance()->finish(); stop(); } } private: - std::vector<ActorId<PowerWorker>> workers_; - int left_query_; + td::vector<td::ActorId<PowerWorker>> workers_; int ref_cnt_; + int left_query_; int query_size_; }; -void test_workers(int threads_n, int workers_n, int queries_n, int query_size) { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); - - ConcurrentScheduler sched; - sched.init(threads_n); +static void test_workers(int threads_n, int workers_n, int queries_n, int query_size) { + td::ConcurrentScheduler sched(threads_n, 0); - std::vector<ActorId<PowerWorker>> workers; + td::vector<td::ActorId<PowerWorker>> workers; for (int i = 0; i < workers_n; i++) { int thread_id = threads_n ? i % (threads_n - 1) + 2 : 0; - workers.push_back(sched.create_actor_unsafe<PowerWorker>(thread_id, "worker" + to_string(i)).release()); + workers.push_back(sched.create_actor_unsafe<PowerWorker>(thread_id, PSLICE() << "worker" << i).release()); } - sched.create_actor_unsafe<Manager>(threads_n ? 1 : 0, "manager", queries_n, query_size, std::move(workers)).release(); + sched.create_actor_unsafe<Manager>(threads_n ? 1 : 0, "Manager", queries_n, query_size, std::move(workers)).release(); sched.start(); while (sched.run_main(10)) { @@ -129,7 +123,6 @@ void test_workers(int threads_n, int workers_n, int queries_n, int query_size) { // sched.test_one_thread_run(); } -} // namespace TEST(Actors, workers_big_query_one_thread) { test_workers(0, 10, 1000, 300000); @@ -144,13 +137,13 @@ TEST(Actors, workers_big_query_nine_threads) { } TEST(Actors, workers_small_query_one_thread) { - test_workers(0, 10, 1000000, 1); + test_workers(0, 10, 100000, 1); } TEST(Actors, workers_small_query_two_threads) { - test_workers(2, 10, 1000000, 1); + test_workers(2, 10, 100000, 1); } TEST(Actors, workers_small_query_nine_threads) { - test_workers(9, 10, 1000000, 1); + test_workers(9, 10, 10000, 1); } |