diff options
Diffstat (limited to 'protocols/Telegram/tdlib/td/tdactor')
11 files changed, 172 insertions, 107 deletions
diff --git a/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt b/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt index a156384ce9..2a7c68f5d3 100644 --- a/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt +++ b/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt @@ -6,7 +6,6 @@ if (NOT DEFINED CMAKE_INSTALL_LIBDIR) set(CMAKE_INSTALL_LIBDIR "lib") endif() -#SOURCE SETS set(TDACTOR_SOURCE td/actor/ConcurrentScheduler.cpp td/actor/impl/Scheduler.cpp @@ -21,9 +20,9 @@ set(TDACTOR_SOURCE td/actor/impl/ActorId.h td/actor/impl/ActorInfo-decl.h td/actor/impl/ActorInfo.h + td/actor/impl/Event.h td/actor/impl/EventFull-decl.h td/actor/impl/EventFull.h - td/actor/impl/Event.h td/actor/impl/Scheduler-decl.h td/actor/impl/Scheduler.h td/actor/MultiPromise.h @@ -43,10 +42,6 @@ set(TDACTOR_TEST_SOURCE PARENT_SCOPE ) -#RULES - -#LIBRARIES - add_library(tdactor STATIC ${TDACTOR_SOURCE}) target_include_directories(tdactor PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>) target_link_libraries(tdactor PUBLIC tdutils) @@ -56,7 +51,7 @@ if (NOT CMAKE_CROSSCOMPILING) target_link_libraries(example PRIVATE tdactor) endif() -install(TARGETS tdactor EXPORT TdTargets +install(TARGETS tdactor EXPORT TdStaticTargets LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}" ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}" ) diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h index ff695f8d5d..ce3b7bb4d9 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -291,12 +291,20 @@ class PromiseFuture { FutureActor<T> future_; }; -template <ActorSendType send_type, class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, - class... ArgsT> -FutureActor<T> send_promise(ActorId<ActorAT> actor_id, ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...), - ArgsT &&...args) { +template <class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, class... ArgsT> +FutureActor<T> send_promise_immediately(ActorId<ActorAT> actor_id, + ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...), ArgsT &&...args) { PromiseFuture<T> pf; - Scheduler::instance()->send_closure<send_type>( + Scheduler::instance()->send_closure_immediately( + std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...)); + return pf.move_future(); +} + +template <class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, class... ArgsT> +FutureActor<T> send_promise_later(ActorId<ActorAT> actor_id, ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...), + ArgsT &&...args) { + PromiseFuture<T> pf; + Scheduler::instance()->send_closure_later( std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...)); return pf.move_future(); } diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h index 4aca52d8ab..0fd1ded0b5 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -10,6 +10,7 @@ #include "td/actor/impl/ActorInfo-decl.h" #include "td/actor/impl/Event.h" +#include "td/utils/common.h" #include "td/utils/ObjectPool.h" #include "td/utils/Observer.h" #include "td/utils/Slice.h" @@ -94,7 +95,7 @@ class Actor : public ObserverBase { auto self_closure(SelfT *self, FuncT &&func, ArgsT &&...args); template <class LambdaT> - auto self_lambda(LambdaT &&lambda); + auto self_lambda(LambdaT &&func); // proxy to info_ ActorId<> actor_id(); diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h index c3025646ab..5ec72e347a 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -156,8 +156,8 @@ auto Actor::self_closure(SelfT *self, FuncT &&func, ArgsT &&...args) { } template <class LambdaT> -auto Actor::self_lambda(LambdaT &&lambda) { - return EventCreator::lambda(actor_id(), std::forward<LambdaT>(lambda)); +auto Actor::self_lambda(LambdaT &&func) { + return EventCreator::from_lambda(actor_id(), std::forward<LambdaT>(func)); } inline Slice Actor::get_name() const { diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h index 6f8976884c..ceec14f532 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -89,7 +89,7 @@ class LambdaEvent final : public CustomEvent { f_(); } template <class FromLambdaT, std::enable_if_t<!std::is_same<std::decay_t<FromLambdaT>, LambdaEvent>::value, int> = 0> - explicit LambdaEvent(FromLambdaT &&lambda) : f_(std::forward<FromLambdaT>(lambda)) { + explicit LambdaEvent(FromLambdaT &&func) : f_(std::forward<FromLambdaT>(func)) { } private: @@ -149,8 +149,8 @@ class Event { } template <class FromLambdaT> - static Event lambda(FromLambdaT &&lambda) { - return custom(new LambdaEvent<std::decay_t<FromLambdaT>>(std::forward<FromLambdaT>(lambda))); + static Event from_lambda(FromLambdaT &&func) { + return custom(new LambdaEvent<std::decay_t<FromLambdaT>>(std::forward<FromLambdaT>(func))); } Event() : Event(Type::NoType) { diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h index 95c32343e8..b47443c4d3 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -65,8 +65,8 @@ class EventCreator { } template <class LambdaT> - static EventFull lambda(ActorRef actor_ref, LambdaT &&lambda) { - return EventFull(actor_ref, Event::lambda(std::forward<LambdaT>(lambda))); + static EventFull from_lambda(ActorRef actor_ref, LambdaT &&func) { + return EventFull(actor_ref, Event::from_lambda(std::forward<LambdaT>(func))); } static EventFull yield(ActorRef actor_ref) { diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h index 4422795c91..3aa9f8c3a8 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -39,8 +39,6 @@ extern int VERBOSITY_NAME(actor); class ActorInfo; -enum class ActorSendType { Immediate, Later }; - class Scheduler; class SchedulerGuard { public: @@ -103,17 +101,27 @@ class Scheduler { template <class T> void destroy_on_scheduler(int32 sched_id, T &value); + template <class T> + void destroy_on_scheduler_unique_ptr(int32 sched_id, T &value); + template <class... ArgsT> void destroy_on_scheduler(int32 sched_id, ArgsT &...values); - template <ActorSendType send_type, class EventT> - void send_lambda(ActorRef actor_ref, EventT &&lambda); + template <class EventT> + void send_lambda_immediately(ActorRef actor_ref, EventT &&func); + + template <class EventT> + void send_lambda_later(ActorRef actor_ref, EventT &&func); - template <ActorSendType send_type, class EventT> - void send_closure(ActorRef actor_ref, EventT &&closure); + template <class EventT> + void send_closure_immediately(ActorRef actor_ref, EventT &&closure); - template <ActorSendType send_type> - void send(ActorRef actor_ref, Event &&event); + template <class EventT> + void send_closure_later(ActorRef actor_ref, EventT &&closure); + + void send_immediately(ActorRef actor_ref, Event &&event); + + void send_later(ActorRef actor_ref, Event &&event); void before_tail_send(const ActorId<> &actor_id); @@ -194,11 +202,15 @@ class Scheduler { void add_to_mailbox(ActorInfo *actor_info, Event &&event); void clear_mailbox(ActorInfo *actor_info); + void flush_mailbox(ActorInfo *actor_info); + + void get_actor_sched_id_to_send_immediately(const ActorInfo *actor_info, int32 &actor_sched_id, + bool &on_current_sched, bool &can_send_immediately); + template <class RunFuncT, class EventFuncT> - void flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func); + void send_immediately_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func); - template <ActorSendType send_type, class RunFuncT, class EventFuncT> - void send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func); + void send_later_impl(const ActorId<> &actor_id, Event &&event); Timestamp run_timeout(); void run_mailbox(); @@ -270,8 +282,8 @@ void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&...args) { using FunctionClassT = member_function_class_t<FunctionT>; static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure"); - Scheduler::instance()->send_closure<ActorSendType::Immediate>( - std::forward<ActorIdT>(actor_id), create_immediate_closure(function, std::forward<ArgsT>(args)...)); + Scheduler::instance()->send_closure_immediately(std::forward<ActorIdT>(actor_id), + create_immediate_closure(function, std::forward<ArgsT>(args)...)); } template <class ActorIdT, class FunctionT, class... ArgsT> @@ -280,23 +292,23 @@ void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&...args using FunctionClassT = member_function_class_t<FunctionT>; static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure"); - Scheduler::instance()->send<ActorSendType::Later>(std::forward<ActorIdT>(actor_id), - Event::delayed_closure(function, std::forward<ArgsT>(args)...)); + Scheduler::instance()->send_later(std::forward<ActorIdT>(actor_id), + Event::delayed_closure(function, std::forward<ArgsT>(args)...)); } template <class... ArgsT> void send_lambda(ActorRef actor_ref, ArgsT &&...args) { - Scheduler::instance()->send_lambda<ActorSendType::Immediate>(actor_ref, std::forward<ArgsT>(args)...); + Scheduler::instance()->send_lambda_immediately(actor_ref, std::forward<ArgsT>(args)...); } template <class... ArgsT> void send_event(ActorRef actor_ref, ArgsT &&...args) { - Scheduler::instance()->send<ActorSendType::Immediate>(actor_ref, std::forward<ArgsT>(args)...); + Scheduler::instance()->send_immediately(actor_ref, std::forward<ArgsT>(args)...); } template <class... ArgsT> void send_event_later(ActorRef actor_ref, ArgsT &&...args) { - Scheduler::instance()->send<ActorSendType::Later>(actor_ref, std::forward<ArgsT>(args)...); + Scheduler::instance()->send_later(actor_ref, std::forward<ArgsT>(args)...); } } // namespace td diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp index 64a1e14233..f107602fd7 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -12,6 +12,7 @@ #include "td/actor/impl/Event.h" #include "td/actor/impl/EventFull.h" +#include "td/utils/algorithm.h" #include "td/utils/common.h" #include "td/utils/ExitGuard.h" #include "td/utils/format.h" @@ -23,10 +24,10 @@ #include "td/utils/port/thread_local.h" #include "td/utils/Promise.h" #include "td/utils/ScopeGuard.h" +#include "td/utils/SliceBuilder.h" #include "td/utils/Time.h" #include <functional> -#include <iterator> #include <memory> #include <utility> @@ -80,6 +81,7 @@ void Scheduler::ServiceActor::start_up() { void Scheduler::ServiceActor::loop() { auto &queue = inbound_; int ready_n = queue->reader_wait_nonblock(); + VLOG(actor) << "Have " << ready_n << " pending events"; if (ready_n == 0) { return; } @@ -228,7 +230,7 @@ void Scheduler::init(int32 id, std::vector<std::shared_ptr<MpscPollableQueue<Eve sched_id_ = id; sched_n_ = static_cast<int32>(outbound_queues_.size()); service_actor_.set_queue(inbound_queue_); - register_actor("ServiceActor", &service_actor_).release(); + register_actor(PSLICE() << "ServiceActor" << id, &service_actor_).release(); } void Scheduler::clear() { @@ -299,9 +301,36 @@ void Scheduler::do_event(ActorInfo *actor_info, Event &&event) { // can't clear event here. It may be already destroyed during destroy_actor } +void Scheduler::get_actor_sched_id_to_send_immediately(const ActorInfo *actor_info, int32 &actor_sched_id, + bool &on_current_sched, bool &can_send_immediately) { + bool is_migrating; + std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic(); + on_current_sched = !is_migrating && sched_id_ == actor_sched_id; + CHECK(has_guard_ || !on_current_sched); + can_send_immediately = on_current_sched && !actor_info->is_running() && actor_info->mailbox_.empty(); +} + +void Scheduler::send_later_impl(const ActorId<> &actor_id, Event &&event) { + ActorInfo *actor_info = actor_id.get_actor_info(); + if (unlikely(actor_info == nullptr || close_flag_)) { + return; + } + + int32 actor_sched_id; + bool is_migrating; + std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic(); + bool on_current_sched = !is_migrating && sched_id_ == actor_sched_id; + CHECK(has_guard_ || !on_current_sched); + + if (on_current_sched) { + add_to_mailbox(actor_info, std::move(event)); + } else { + send_to_scheduler(actor_sched_id, actor_id, std::move(event)); + } +} + void Scheduler::register_migrated_actor(ActorInfo *actor_info) { - VLOG(actor) << "Register migrated actor: " << tag("name", *actor_info) << tag("ptr", actor_info) - << tag("actor_count", actor_count_); + VLOG(actor) << "Register migrated actor " << *actor_info << ", " << tag("actor_count", actor_count_); actor_count_++; LOG_CHECK(actor_info->is_migrating()) << *actor_info << ' ' << actor_count_ << ' ' << sched_id_ << ' ' << actor_info->migrate_dest() << ' ' << actor_info->is_running() << ' ' @@ -314,8 +343,7 @@ void Scheduler::register_migrated_actor(ActorInfo *actor_info) { } auto it = pending_events_.find(actor_info); if (it != pending_events_.end()) { - actor_info->mailbox_.insert(actor_info->mailbox_.end(), std::make_move_iterator(it->second.begin()), - std::make_move_iterator(it->second.end())); + append(actor_info->mailbox_, std::move(it->second)); pending_events_.erase(it); } if (actor_info->mailbox_.empty()) { @@ -390,6 +418,7 @@ void Scheduler::add_to_mailbox(ActorInfo *actor_info, Event &&event) { void Scheduler::do_stop_actor(Actor *actor) { return do_stop_actor(actor->get_info()); } + void Scheduler::do_stop_actor(ActorInfo *actor_info) { CHECK(!actor_info->is_migrating()); LOG_CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_; @@ -411,6 +440,7 @@ void Scheduler::do_stop_actor(ActorInfo *actor_info) { void Scheduler::migrate_actor(Actor *actor, int32 dest_sched_id) { migrate_actor(actor->get_info(), dest_sched_id); } + void Scheduler::migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) { CHECK(event_context_ptr_->actor_info == actor_info); if (sched_id_ == dest_sched_id) { @@ -423,6 +453,7 @@ void Scheduler::migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) { void Scheduler::do_migrate_actor(Actor *actor, int32 dest_sched_id) { do_migrate_actor(actor->get_info(), dest_sched_id); } + void Scheduler::do_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) { #if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED dest_sched_id = 0; @@ -439,7 +470,7 @@ void Scheduler::start_migrate_actor(Actor *actor, int32 dest_sched_id) { } void Scheduler::start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) { - VLOG(actor) << "Start migrate actor: " << tag("name", actor_info) << tag("ptr", actor_info) + VLOG(actor) << "Start migrate actor " << *actor_info << " to scheduler " << dest_sched_id << ", " << tag("actor_count", actor_count_); actor_count_--; CHECK(actor_count_ >= 0); @@ -490,6 +521,18 @@ void Scheduler::run_poll(Timestamp timeout) { #endif } +void Scheduler::flush_mailbox(ActorInfo *actor_info) { + auto &mailbox = actor_info->mailbox_; + size_t mailbox_size = mailbox.size(); + CHECK(mailbox_size != 0); + EventGuard guard(this, actor_info); + size_t i = 0; + for (; i < mailbox_size && guard.can_run(); i++) { + do_event(actor_info, std::move(mailbox[i])); + } + mailbox.erase(mailbox.begin(), mailbox.begin() + i); +} + void Scheduler::run_mailbox() { VLOG(actor) << "Run mailbox : begin"; ListNode actors_list = std::move(ready_actors_list_); @@ -497,7 +540,7 @@ void Scheduler::run_mailbox() { ListNode *node = actors_list.get(); CHECK(node); auto actor_info = ActorInfo::from_list_node(node); - flush_mailbox(actor_info, static_cast<void (*)(ActorInfo *)>(nullptr), static_cast<Event (*)()>(nullptr)); + flush_mailbox(actor_info); } VLOG(actor) << "Run mailbox : finish " << actor_count_; @@ -525,7 +568,7 @@ Timestamp Scheduler::run_timeout() { while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) { HeapNode *node = timeout_queue_.pop(); ActorInfo *actor_info = ActorInfo::from_heap_node(node); - send<ActorSendType::Immediate>(actor_info->actor_id(), Event::timeout()); + send_immediately(actor_info->actor_id(), Event::timeout()); } return get_timeout(); } diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h index 8c2b6d6e2a..075adbaace 100644 --- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h +++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -54,6 +54,7 @@ class EventGuard { inline SchedulerGuard Scheduler::get_guard() { return SchedulerGuard(this); } + inline SchedulerGuard Scheduler::get_const_guard() { return SchedulerGuard(this, false); } @@ -61,6 +62,7 @@ inline SchedulerGuard Scheduler::get_const_guard() { inline int32 Scheduler::sched_id() const { return sched_id_; } + inline int32 Scheduler::sched_count() const { return sched_n_; } @@ -106,12 +108,12 @@ ActorOwn<ActorT> Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, A ActorId<ActorT> actor_id = weak_info->actor_id(actor_ptr); if (sched_id != sched_id_) { - send<ActorSendType::Later>(actor_id, Event::start()); + send_later(actor_id, Event::start()); do_migrate_actor(actor_info, sched_id); } else { pending_actors_list_.put(weak_info->get_list_node()); if (ActorTraits<ActorT>::need_start_up) { - send<ActorSendType::Later>(actor_id, Event::start()); + send_later(actor_id, Event::start()); } } @@ -138,26 +140,6 @@ inline void Scheduler::destroy_actor(ActorInfo *actor_info) { CHECK(actor_count_ >= 0); } -template <class RunFuncT, class EventFuncT> -void Scheduler::flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func) { - auto &mailbox = actor_info->mailbox_; - size_t mailbox_size = mailbox.size(); - CHECK(mailbox_size != 0); - EventGuard guard(this, actor_info); - size_t i = 0; - for (; i < mailbox_size && guard.can_run(); i++) { - do_event(actor_info, std::move(mailbox[i])); - } - if (run_func) { - if (guard.can_run()) { - (*run_func)(actor_info); - } else { - mailbox.insert(mailbox.begin() + i, (*event_func)()); - } - } - mailbox.erase(mailbox.begin(), mailbox.begin() + i); -} - inline void Scheduler::send_to_scheduler(int32 sched_id, const ActorId<Actor> &actor_id, Event &&event) { if (sched_id == sched_id_) { ActorInfo *actor_info = actor_id.get_actor_info(); @@ -176,6 +158,15 @@ void Scheduler::destroy_on_scheduler(int32 sched_id, T &value) { } } +template <class T> +void Scheduler::destroy_on_scheduler_unique_ptr(int32 sched_id, T &value) { + if (value != nullptr) { + destroy_on_scheduler_impl(sched_id, PromiseCreator::lambda([value = std::move(value)](Unit) { + // destroy value + })); + } +} + template <class... ArgsT> void Scheduler::destroy_on_scheduler(int32 sched_id, ArgsT &...values) { destroy_on_scheduler_impl(sched_id, PromiseCreator::lambda([values = std::make_tuple(std::move(values)...)](Unit) { @@ -187,22 +178,20 @@ inline void Scheduler::before_tail_send(const ActorId<> &actor_id) { // TODO } -template <ActorSendType send_type, class RunFuncT, class EventFuncT> -void Scheduler::send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func) { +template <class RunFuncT, class EventFuncT> +void Scheduler::send_immediately_impl(const ActorId<> &actor_id, const RunFuncT &run_func, + const EventFuncT &event_func) { ActorInfo *actor_info = actor_id.get_actor_info(); if (unlikely(actor_info == nullptr || close_flag_)) { return; } - CHECK(actor_info != nullptr); int32 actor_sched_id; - bool is_migrating; - std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic(); - bool on_current_sched = !is_migrating && sched_id_ == actor_sched_id; - CHECK(has_guard_ || !on_current_sched); + bool on_current_sched; + bool can_send_immediately; + get_actor_sched_id_to_send_immediately(actor_info, actor_sched_id, on_current_sched, can_send_immediately); - if (likely(send_type == ActorSendType::Immediate && on_current_sched && !actor_info->is_running() && - actor_info->mailbox_.empty())) { // run immediately + if (likely(can_send_immediately)) { // run immediately EventGuard guard(this, actor_info); run_func(actor_info); } else { @@ -214,24 +203,31 @@ void Scheduler::send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, c } } -template <ActorSendType send_type, class EventT> -void Scheduler::send_lambda(ActorRef actor_ref, EventT &&lambda) { - return send_impl<send_type>( +template <class EventT> +void Scheduler::send_lambda_immediately(ActorRef actor_ref, EventT &&func) { + return send_immediately_impl( actor_ref.get(), [&](ActorInfo *actor_info) { event_context_ptr_->link_token = actor_ref.token(); - lambda(); + func(); }, [&] { - auto event = Event::lambda(std::forward<EventT>(lambda)); + auto event = Event::from_lambda(std::forward<EventT>(func)); event.set_link_token(actor_ref.token()); return event; }); } -template <ActorSendType send_type, class EventT> -void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure) { - return send_impl<send_type>( +template <class EventT> +void Scheduler::send_lambda_later(ActorRef actor_ref, EventT &&func) { + auto event = Event::from_lambda(std::forward<EventT>(func)); + event.set_link_token(actor_ref.token()); + return send_later_impl(actor_ref.get(), std::move(event)); +} + +template <class EventT> +void Scheduler::send_closure_immediately(ActorRef actor_ref, EventT &&closure) { + return send_immediately_impl( actor_ref.get(), [&](ActorInfo *actor_info) { event_context_ptr_->link_token = actor_ref.token(); @@ -244,14 +240,25 @@ void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure) { }); } -template <ActorSendType send_type> -void Scheduler::send(ActorRef actor_ref, Event &&event) { +template <class EventT> +void Scheduler::send_closure_later(ActorRef actor_ref, EventT &&closure) { + auto event = Event::immediate_closure(std::forward<EventT>(closure)); event.set_link_token(actor_ref.token()); - return send_impl<send_type>( + return send_later_impl(actor_ref.get(), std::move(event)); +} + +inline void Scheduler::send_immediately(ActorRef actor_ref, Event &&event) { + event.set_link_token(actor_ref.token()); + return send_immediately_impl( actor_ref.get(), [&](ActorInfo *actor_info) { do_event(actor_info, std::move(event)); }, [&] { return std::move(event); }); } +inline void Scheduler::send_later(ActorRef actor_ref, Event &&event) { + event.set_link_token(actor_ref.token()); + return send_later_impl(actor_ref.get(), std::move(event)); +} + inline void Scheduler::subscribe(PollableFd fd, PollFlags flags) { instance()->poll_.subscribe(std::move(fd), flags); } @@ -268,7 +275,7 @@ inline void Scheduler::yield_actor(Actor *actor) { yield_actor(actor->get_info()); } inline void Scheduler::yield_actor(ActorInfo *actor_info) { - send<ActorSendType::Later>(actor_info->actor_id(), Event::yield()); + send_later(actor_info->actor_id(), Event::yield()); } inline void Scheduler::stop_actor(Actor *actor) { diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp index 9042d7e0ce..09b9db6e76 100644 --- a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -126,8 +126,8 @@ class QueryActor final : public td::Actor { callback_->on_result(std::move(query)); } else { auto future = td::Random::fast(0, 3) == 0 - ? td::send_promise<td::ActorSendType::Immediate>(rand_elem(workers_), &Worker::query, x, p) - : td::send_promise<td::ActorSendType::Later>(rand_elem(workers_), &Worker::query, x, p); + ? td::send_promise_immediately(rand_elem(workers_), &Worker::query, x, p) + : td::send_promise_later(rand_elem(workers_), &Worker::query, x, p); if (future.is_ready()) { query.result = future.move_as_ok(); callback_->on_result(std::move(query)); @@ -301,9 +301,8 @@ class SimpleActor final : public td::Actor { } q_++; p_ = td::Random::fast_bool() ? 1 : 10000; - auto future = td::Random::fast(0, 3) == 0 - ? td::send_promise<td::ActorSendType::Immediate>(worker_, &Worker::query, q_, p_) - : td::send_promise<td::ActorSendType::Later>(worker_, &Worker::query, q_, p_); + auto future = td::Random::fast(0, 3) == 0 ? td::send_promise_immediately(worker_, &Worker::query, q_, p_) + : td::send_promise_later(worker_, &Worker::query, q_, p_); if (future.is_ready()) { auto result = future.move_as_ok(); CHECK(result == fast_pow_mod_uint32(q_, p_)); diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp index e01b356e52..89a220d50e 100644 --- a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp +++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -166,7 +166,7 @@ TEST(Actors, simple_pass_event_arguments) { // Var-->LvalueRef // Var-->LvalueRef (Delayed) - // CE or stange behaviour + // CE or strange behaviour // Var-->Value sb.clear(); @@ -292,7 +292,6 @@ class OpenClose final : public td::Actor { cnt_--; yield(); } else { - td::unlink(file_name).ignore(); td::Scheduler::instance()->finish(); } } @@ -310,6 +309,7 @@ TEST(Actors, open_close) { while (scheduler.run_main(10)) { } scheduler.finish(); + td::unlink("server").ignore(); } class MsgActor : public td::Actor { |