summaryrefslogtreecommitdiff
path: root/protocols/Telegram/tdlib/td/tdactor
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/Telegram/tdlib/td/tdactor')
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt9
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h20
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h5
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h6
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h8
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h6
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h50
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp63
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h95
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp11
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp6
11 files changed, 172 insertions, 107 deletions
diff --git a/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt b/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt
index a156384ce9..2a7c68f5d3 100644
--- a/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt
+++ b/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt
@@ -6,7 +6,6 @@ if (NOT DEFINED CMAKE_INSTALL_LIBDIR)
set(CMAKE_INSTALL_LIBDIR "lib")
endif()
-#SOURCE SETS
set(TDACTOR_SOURCE
td/actor/ConcurrentScheduler.cpp
td/actor/impl/Scheduler.cpp
@@ -21,9 +20,9 @@ set(TDACTOR_SOURCE
td/actor/impl/ActorId.h
td/actor/impl/ActorInfo-decl.h
td/actor/impl/ActorInfo.h
+ td/actor/impl/Event.h
td/actor/impl/EventFull-decl.h
td/actor/impl/EventFull.h
- td/actor/impl/Event.h
td/actor/impl/Scheduler-decl.h
td/actor/impl/Scheduler.h
td/actor/MultiPromise.h
@@ -43,10 +42,6 @@ set(TDACTOR_TEST_SOURCE
PARENT_SCOPE
)
-#RULES
-
-#LIBRARIES
-
add_library(tdactor STATIC ${TDACTOR_SOURCE})
target_include_directories(tdactor PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>)
target_link_libraries(tdactor PUBLIC tdutils)
@@ -56,7 +51,7 @@ if (NOT CMAKE_CROSSCOMPILING)
target_link_libraries(example PRIVATE tdactor)
endif()
-install(TARGETS tdactor EXPORT TdTargets
+install(TARGETS tdactor EXPORT TdStaticTargets
LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}"
ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}"
)
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h
index ff695f8d5d..ce3b7bb4d9 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -291,12 +291,20 @@ class PromiseFuture {
FutureActor<T> future_;
};
-template <ActorSendType send_type, class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT,
- class... ArgsT>
-FutureActor<T> send_promise(ActorId<ActorAT> actor_id, ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...),
- ArgsT &&...args) {
+template <class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, class... ArgsT>
+FutureActor<T> send_promise_immediately(ActorId<ActorAT> actor_id,
+ ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...), ArgsT &&...args) {
PromiseFuture<T> pf;
- Scheduler::instance()->send_closure<send_type>(
+ Scheduler::instance()->send_closure_immediately(
+ std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...));
+ return pf.move_future();
+}
+
+template <class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, class... ArgsT>
+FutureActor<T> send_promise_later(ActorId<ActorAT> actor_id, ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...),
+ ArgsT &&...args) {
+ PromiseFuture<T> pf;
+ Scheduler::instance()->send_closure_later(
std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...));
return pf.move_future();
}
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h
index 4aca52d8ab..0fd1ded0b5 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -10,6 +10,7 @@
#include "td/actor/impl/ActorInfo-decl.h"
#include "td/actor/impl/Event.h"
+#include "td/utils/common.h"
#include "td/utils/ObjectPool.h"
#include "td/utils/Observer.h"
#include "td/utils/Slice.h"
@@ -94,7 +95,7 @@ class Actor : public ObserverBase {
auto self_closure(SelfT *self, FuncT &&func, ArgsT &&...args);
template <class LambdaT>
- auto self_lambda(LambdaT &&lambda);
+ auto self_lambda(LambdaT &&func);
// proxy to info_
ActorId<> actor_id();
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h
index c3025646ab..5ec72e347a 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -156,8 +156,8 @@ auto Actor::self_closure(SelfT *self, FuncT &&func, ArgsT &&...args) {
}
template <class LambdaT>
-auto Actor::self_lambda(LambdaT &&lambda) {
- return EventCreator::lambda(actor_id(), std::forward<LambdaT>(lambda));
+auto Actor::self_lambda(LambdaT &&func) {
+ return EventCreator::from_lambda(actor_id(), std::forward<LambdaT>(func));
}
inline Slice Actor::get_name() const {
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h
index 6f8976884c..ceec14f532 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -89,7 +89,7 @@ class LambdaEvent final : public CustomEvent {
f_();
}
template <class FromLambdaT, std::enable_if_t<!std::is_same<std::decay_t<FromLambdaT>, LambdaEvent>::value, int> = 0>
- explicit LambdaEvent(FromLambdaT &&lambda) : f_(std::forward<FromLambdaT>(lambda)) {
+ explicit LambdaEvent(FromLambdaT &&func) : f_(std::forward<FromLambdaT>(func)) {
}
private:
@@ -149,8 +149,8 @@ class Event {
}
template <class FromLambdaT>
- static Event lambda(FromLambdaT &&lambda) {
- return custom(new LambdaEvent<std::decay_t<FromLambdaT>>(std::forward<FromLambdaT>(lambda)));
+ static Event from_lambda(FromLambdaT &&func) {
+ return custom(new LambdaEvent<std::decay_t<FromLambdaT>>(std::forward<FromLambdaT>(func)));
}
Event() : Event(Type::NoType) {
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h
index 95c32343e8..b47443c4d3 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -65,8 +65,8 @@ class EventCreator {
}
template <class LambdaT>
- static EventFull lambda(ActorRef actor_ref, LambdaT &&lambda) {
- return EventFull(actor_ref, Event::lambda(std::forward<LambdaT>(lambda)));
+ static EventFull from_lambda(ActorRef actor_ref, LambdaT &&func) {
+ return EventFull(actor_ref, Event::from_lambda(std::forward<LambdaT>(func)));
}
static EventFull yield(ActorRef actor_ref) {
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h
index 4422795c91..3aa9f8c3a8 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -39,8 +39,6 @@ extern int VERBOSITY_NAME(actor);
class ActorInfo;
-enum class ActorSendType { Immediate, Later };
-
class Scheduler;
class SchedulerGuard {
public:
@@ -103,17 +101,27 @@ class Scheduler {
template <class T>
void destroy_on_scheduler(int32 sched_id, T &value);
+ template <class T>
+ void destroy_on_scheduler_unique_ptr(int32 sched_id, T &value);
+
template <class... ArgsT>
void destroy_on_scheduler(int32 sched_id, ArgsT &...values);
- template <ActorSendType send_type, class EventT>
- void send_lambda(ActorRef actor_ref, EventT &&lambda);
+ template <class EventT>
+ void send_lambda_immediately(ActorRef actor_ref, EventT &&func);
+
+ template <class EventT>
+ void send_lambda_later(ActorRef actor_ref, EventT &&func);
- template <ActorSendType send_type, class EventT>
- void send_closure(ActorRef actor_ref, EventT &&closure);
+ template <class EventT>
+ void send_closure_immediately(ActorRef actor_ref, EventT &&closure);
- template <ActorSendType send_type>
- void send(ActorRef actor_ref, Event &&event);
+ template <class EventT>
+ void send_closure_later(ActorRef actor_ref, EventT &&closure);
+
+ void send_immediately(ActorRef actor_ref, Event &&event);
+
+ void send_later(ActorRef actor_ref, Event &&event);
void before_tail_send(const ActorId<> &actor_id);
@@ -194,11 +202,15 @@ class Scheduler {
void add_to_mailbox(ActorInfo *actor_info, Event &&event);
void clear_mailbox(ActorInfo *actor_info);
+ void flush_mailbox(ActorInfo *actor_info);
+
+ void get_actor_sched_id_to_send_immediately(const ActorInfo *actor_info, int32 &actor_sched_id,
+ bool &on_current_sched, bool &can_send_immediately);
+
template <class RunFuncT, class EventFuncT>
- void flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func);
+ void send_immediately_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func);
- template <ActorSendType send_type, class RunFuncT, class EventFuncT>
- void send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func);
+ void send_later_impl(const ActorId<> &actor_id, Event &&event);
Timestamp run_timeout();
void run_mailbox();
@@ -270,8 +282,8 @@ void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&...args) {
using FunctionClassT = member_function_class_t<FunctionT>;
static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
- Scheduler::instance()->send_closure<ActorSendType::Immediate>(
- std::forward<ActorIdT>(actor_id), create_immediate_closure(function, std::forward<ArgsT>(args)...));
+ Scheduler::instance()->send_closure_immediately(std::forward<ActorIdT>(actor_id),
+ create_immediate_closure(function, std::forward<ArgsT>(args)...));
}
template <class ActorIdT, class FunctionT, class... ArgsT>
@@ -280,23 +292,23 @@ void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&...args
using FunctionClassT = member_function_class_t<FunctionT>;
static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
- Scheduler::instance()->send<ActorSendType::Later>(std::forward<ActorIdT>(actor_id),
- Event::delayed_closure(function, std::forward<ArgsT>(args)...));
+ Scheduler::instance()->send_later(std::forward<ActorIdT>(actor_id),
+ Event::delayed_closure(function, std::forward<ArgsT>(args)...));
}
template <class... ArgsT>
void send_lambda(ActorRef actor_ref, ArgsT &&...args) {
- Scheduler::instance()->send_lambda<ActorSendType::Immediate>(actor_ref, std::forward<ArgsT>(args)...);
+ Scheduler::instance()->send_lambda_immediately(actor_ref, std::forward<ArgsT>(args)...);
}
template <class... ArgsT>
void send_event(ActorRef actor_ref, ArgsT &&...args) {
- Scheduler::instance()->send<ActorSendType::Immediate>(actor_ref, std::forward<ArgsT>(args)...);
+ Scheduler::instance()->send_immediately(actor_ref, std::forward<ArgsT>(args)...);
}
template <class... ArgsT>
void send_event_later(ActorRef actor_ref, ArgsT &&...args) {
- Scheduler::instance()->send<ActorSendType::Later>(actor_ref, std::forward<ArgsT>(args)...);
+ Scheduler::instance()->send_later(actor_ref, std::forward<ArgsT>(args)...);
}
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp
index 64a1e14233..f107602fd7 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -12,6 +12,7 @@
#include "td/actor/impl/Event.h"
#include "td/actor/impl/EventFull.h"
+#include "td/utils/algorithm.h"
#include "td/utils/common.h"
#include "td/utils/ExitGuard.h"
#include "td/utils/format.h"
@@ -23,10 +24,10 @@
#include "td/utils/port/thread_local.h"
#include "td/utils/Promise.h"
#include "td/utils/ScopeGuard.h"
+#include "td/utils/SliceBuilder.h"
#include "td/utils/Time.h"
#include <functional>
-#include <iterator>
#include <memory>
#include <utility>
@@ -80,6 +81,7 @@ void Scheduler::ServiceActor::start_up() {
void Scheduler::ServiceActor::loop() {
auto &queue = inbound_;
int ready_n = queue->reader_wait_nonblock();
+ VLOG(actor) << "Have " << ready_n << " pending events";
if (ready_n == 0) {
return;
}
@@ -228,7 +230,7 @@ void Scheduler::init(int32 id, std::vector<std::shared_ptr<MpscPollableQueue<Eve
sched_id_ = id;
sched_n_ = static_cast<int32>(outbound_queues_.size());
service_actor_.set_queue(inbound_queue_);
- register_actor("ServiceActor", &service_actor_).release();
+ register_actor(PSLICE() << "ServiceActor" << id, &service_actor_).release();
}
void Scheduler::clear() {
@@ -299,9 +301,36 @@ void Scheduler::do_event(ActorInfo *actor_info, Event &&event) {
// can't clear event here. It may be already destroyed during destroy_actor
}
+void Scheduler::get_actor_sched_id_to_send_immediately(const ActorInfo *actor_info, int32 &actor_sched_id,
+ bool &on_current_sched, bool &can_send_immediately) {
+ bool is_migrating;
+ std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic();
+ on_current_sched = !is_migrating && sched_id_ == actor_sched_id;
+ CHECK(has_guard_ || !on_current_sched);
+ can_send_immediately = on_current_sched && !actor_info->is_running() && actor_info->mailbox_.empty();
+}
+
+void Scheduler::send_later_impl(const ActorId<> &actor_id, Event &&event) {
+ ActorInfo *actor_info = actor_id.get_actor_info();
+ if (unlikely(actor_info == nullptr || close_flag_)) {
+ return;
+ }
+
+ int32 actor_sched_id;
+ bool is_migrating;
+ std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic();
+ bool on_current_sched = !is_migrating && sched_id_ == actor_sched_id;
+ CHECK(has_guard_ || !on_current_sched);
+
+ if (on_current_sched) {
+ add_to_mailbox(actor_info, std::move(event));
+ } else {
+ send_to_scheduler(actor_sched_id, actor_id, std::move(event));
+ }
+}
+
void Scheduler::register_migrated_actor(ActorInfo *actor_info) {
- VLOG(actor) << "Register migrated actor: " << tag("name", *actor_info) << tag("ptr", actor_info)
- << tag("actor_count", actor_count_);
+ VLOG(actor) << "Register migrated actor " << *actor_info << ", " << tag("actor_count", actor_count_);
actor_count_++;
LOG_CHECK(actor_info->is_migrating()) << *actor_info << ' ' << actor_count_ << ' ' << sched_id_ << ' '
<< actor_info->migrate_dest() << ' ' << actor_info->is_running() << ' '
@@ -314,8 +343,7 @@ void Scheduler::register_migrated_actor(ActorInfo *actor_info) {
}
auto it = pending_events_.find(actor_info);
if (it != pending_events_.end()) {
- actor_info->mailbox_.insert(actor_info->mailbox_.end(), std::make_move_iterator(it->second.begin()),
- std::make_move_iterator(it->second.end()));
+ append(actor_info->mailbox_, std::move(it->second));
pending_events_.erase(it);
}
if (actor_info->mailbox_.empty()) {
@@ -390,6 +418,7 @@ void Scheduler::add_to_mailbox(ActorInfo *actor_info, Event &&event) {
void Scheduler::do_stop_actor(Actor *actor) {
return do_stop_actor(actor->get_info());
}
+
void Scheduler::do_stop_actor(ActorInfo *actor_info) {
CHECK(!actor_info->is_migrating());
LOG_CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_;
@@ -411,6 +440,7 @@ void Scheduler::do_stop_actor(ActorInfo *actor_info) {
void Scheduler::migrate_actor(Actor *actor, int32 dest_sched_id) {
migrate_actor(actor->get_info(), dest_sched_id);
}
+
void Scheduler::migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
CHECK(event_context_ptr_->actor_info == actor_info);
if (sched_id_ == dest_sched_id) {
@@ -423,6 +453,7 @@ void Scheduler::migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
void Scheduler::do_migrate_actor(Actor *actor, int32 dest_sched_id) {
do_migrate_actor(actor->get_info(), dest_sched_id);
}
+
void Scheduler::do_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
dest_sched_id = 0;
@@ -439,7 +470,7 @@ void Scheduler::start_migrate_actor(Actor *actor, int32 dest_sched_id) {
}
void Scheduler::start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
- VLOG(actor) << "Start migrate actor: " << tag("name", actor_info) << tag("ptr", actor_info)
+ VLOG(actor) << "Start migrate actor " << *actor_info << " to scheduler " << dest_sched_id << ", "
<< tag("actor_count", actor_count_);
actor_count_--;
CHECK(actor_count_ >= 0);
@@ -490,6 +521,18 @@ void Scheduler::run_poll(Timestamp timeout) {
#endif
}
+void Scheduler::flush_mailbox(ActorInfo *actor_info) {
+ auto &mailbox = actor_info->mailbox_;
+ size_t mailbox_size = mailbox.size();
+ CHECK(mailbox_size != 0);
+ EventGuard guard(this, actor_info);
+ size_t i = 0;
+ for (; i < mailbox_size && guard.can_run(); i++) {
+ do_event(actor_info, std::move(mailbox[i]));
+ }
+ mailbox.erase(mailbox.begin(), mailbox.begin() + i);
+}
+
void Scheduler::run_mailbox() {
VLOG(actor) << "Run mailbox : begin";
ListNode actors_list = std::move(ready_actors_list_);
@@ -497,7 +540,7 @@ void Scheduler::run_mailbox() {
ListNode *node = actors_list.get();
CHECK(node);
auto actor_info = ActorInfo::from_list_node(node);
- flush_mailbox(actor_info, static_cast<void (*)(ActorInfo *)>(nullptr), static_cast<Event (*)()>(nullptr));
+ flush_mailbox(actor_info);
}
VLOG(actor) << "Run mailbox : finish " << actor_count_;
@@ -525,7 +568,7 @@ Timestamp Scheduler::run_timeout() {
while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) {
HeapNode *node = timeout_queue_.pop();
ActorInfo *actor_info = ActorInfo::from_heap_node(node);
- send<ActorSendType::Immediate>(actor_info->actor_id(), Event::timeout());
+ send_immediately(actor_info->actor_id(), Event::timeout());
}
return get_timeout();
}
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h
index 8c2b6d6e2a..075adbaace 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -54,6 +54,7 @@ class EventGuard {
inline SchedulerGuard Scheduler::get_guard() {
return SchedulerGuard(this);
}
+
inline SchedulerGuard Scheduler::get_const_guard() {
return SchedulerGuard(this, false);
}
@@ -61,6 +62,7 @@ inline SchedulerGuard Scheduler::get_const_guard() {
inline int32 Scheduler::sched_id() const {
return sched_id_;
}
+
inline int32 Scheduler::sched_count() const {
return sched_n_;
}
@@ -106,12 +108,12 @@ ActorOwn<ActorT> Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, A
ActorId<ActorT> actor_id = weak_info->actor_id(actor_ptr);
if (sched_id != sched_id_) {
- send<ActorSendType::Later>(actor_id, Event::start());
+ send_later(actor_id, Event::start());
do_migrate_actor(actor_info, sched_id);
} else {
pending_actors_list_.put(weak_info->get_list_node());
if (ActorTraits<ActorT>::need_start_up) {
- send<ActorSendType::Later>(actor_id, Event::start());
+ send_later(actor_id, Event::start());
}
}
@@ -138,26 +140,6 @@ inline void Scheduler::destroy_actor(ActorInfo *actor_info) {
CHECK(actor_count_ >= 0);
}
-template <class RunFuncT, class EventFuncT>
-void Scheduler::flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func) {
- auto &mailbox = actor_info->mailbox_;
- size_t mailbox_size = mailbox.size();
- CHECK(mailbox_size != 0);
- EventGuard guard(this, actor_info);
- size_t i = 0;
- for (; i < mailbox_size && guard.can_run(); i++) {
- do_event(actor_info, std::move(mailbox[i]));
- }
- if (run_func) {
- if (guard.can_run()) {
- (*run_func)(actor_info);
- } else {
- mailbox.insert(mailbox.begin() + i, (*event_func)());
- }
- }
- mailbox.erase(mailbox.begin(), mailbox.begin() + i);
-}
-
inline void Scheduler::send_to_scheduler(int32 sched_id, const ActorId<Actor> &actor_id, Event &&event) {
if (sched_id == sched_id_) {
ActorInfo *actor_info = actor_id.get_actor_info();
@@ -176,6 +158,15 @@ void Scheduler::destroy_on_scheduler(int32 sched_id, T &value) {
}
}
+template <class T>
+void Scheduler::destroy_on_scheduler_unique_ptr(int32 sched_id, T &value) {
+ if (value != nullptr) {
+ destroy_on_scheduler_impl(sched_id, PromiseCreator::lambda([value = std::move(value)](Unit) {
+ // destroy value
+ }));
+ }
+}
+
template <class... ArgsT>
void Scheduler::destroy_on_scheduler(int32 sched_id, ArgsT &...values) {
destroy_on_scheduler_impl(sched_id, PromiseCreator::lambda([values = std::make_tuple(std::move(values)...)](Unit) {
@@ -187,22 +178,20 @@ inline void Scheduler::before_tail_send(const ActorId<> &actor_id) {
// TODO
}
-template <ActorSendType send_type, class RunFuncT, class EventFuncT>
-void Scheduler::send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func) {
+template <class RunFuncT, class EventFuncT>
+void Scheduler::send_immediately_impl(const ActorId<> &actor_id, const RunFuncT &run_func,
+ const EventFuncT &event_func) {
ActorInfo *actor_info = actor_id.get_actor_info();
if (unlikely(actor_info == nullptr || close_flag_)) {
return;
}
- CHECK(actor_info != nullptr);
int32 actor_sched_id;
- bool is_migrating;
- std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic();
- bool on_current_sched = !is_migrating && sched_id_ == actor_sched_id;
- CHECK(has_guard_ || !on_current_sched);
+ bool on_current_sched;
+ bool can_send_immediately;
+ get_actor_sched_id_to_send_immediately(actor_info, actor_sched_id, on_current_sched, can_send_immediately);
- if (likely(send_type == ActorSendType::Immediate && on_current_sched && !actor_info->is_running() &&
- actor_info->mailbox_.empty())) { // run immediately
+ if (likely(can_send_immediately)) { // run immediately
EventGuard guard(this, actor_info);
run_func(actor_info);
} else {
@@ -214,24 +203,31 @@ void Scheduler::send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, c
}
}
-template <ActorSendType send_type, class EventT>
-void Scheduler::send_lambda(ActorRef actor_ref, EventT &&lambda) {
- return send_impl<send_type>(
+template <class EventT>
+void Scheduler::send_lambda_immediately(ActorRef actor_ref, EventT &&func) {
+ return send_immediately_impl(
actor_ref.get(),
[&](ActorInfo *actor_info) {
event_context_ptr_->link_token = actor_ref.token();
- lambda();
+ func();
},
[&] {
- auto event = Event::lambda(std::forward<EventT>(lambda));
+ auto event = Event::from_lambda(std::forward<EventT>(func));
event.set_link_token(actor_ref.token());
return event;
});
}
-template <ActorSendType send_type, class EventT>
-void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure) {
- return send_impl<send_type>(
+template <class EventT>
+void Scheduler::send_lambda_later(ActorRef actor_ref, EventT &&func) {
+ auto event = Event::from_lambda(std::forward<EventT>(func));
+ event.set_link_token(actor_ref.token());
+ return send_later_impl(actor_ref.get(), std::move(event));
+}
+
+template <class EventT>
+void Scheduler::send_closure_immediately(ActorRef actor_ref, EventT &&closure) {
+ return send_immediately_impl(
actor_ref.get(),
[&](ActorInfo *actor_info) {
event_context_ptr_->link_token = actor_ref.token();
@@ -244,14 +240,25 @@ void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure) {
});
}
-template <ActorSendType send_type>
-void Scheduler::send(ActorRef actor_ref, Event &&event) {
+template <class EventT>
+void Scheduler::send_closure_later(ActorRef actor_ref, EventT &&closure) {
+ auto event = Event::immediate_closure(std::forward<EventT>(closure));
event.set_link_token(actor_ref.token());
- return send_impl<send_type>(
+ return send_later_impl(actor_ref.get(), std::move(event));
+}
+
+inline void Scheduler::send_immediately(ActorRef actor_ref, Event &&event) {
+ event.set_link_token(actor_ref.token());
+ return send_immediately_impl(
actor_ref.get(), [&](ActorInfo *actor_info) { do_event(actor_info, std::move(event)); },
[&] { return std::move(event); });
}
+inline void Scheduler::send_later(ActorRef actor_ref, Event &&event) {
+ event.set_link_token(actor_ref.token());
+ return send_later_impl(actor_ref.get(), std::move(event));
+}
+
inline void Scheduler::subscribe(PollableFd fd, PollFlags flags) {
instance()->poll_.subscribe(std::move(fd), flags);
}
@@ -268,7 +275,7 @@ inline void Scheduler::yield_actor(Actor *actor) {
yield_actor(actor->get_info());
}
inline void Scheduler::yield_actor(ActorInfo *actor_info) {
- send<ActorSendType::Later>(actor_info->actor_id(), Event::yield());
+ send_later(actor_info->actor_id(), Event::yield());
}
inline void Scheduler::stop_actor(Actor *actor) {
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp
index 9042d7e0ce..09b9db6e76 100644
--- a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -126,8 +126,8 @@ class QueryActor final : public td::Actor {
callback_->on_result(std::move(query));
} else {
auto future = td::Random::fast(0, 3) == 0
- ? td::send_promise<td::ActorSendType::Immediate>(rand_elem(workers_), &Worker::query, x, p)
- : td::send_promise<td::ActorSendType::Later>(rand_elem(workers_), &Worker::query, x, p);
+ ? td::send_promise_immediately(rand_elem(workers_), &Worker::query, x, p)
+ : td::send_promise_later(rand_elem(workers_), &Worker::query, x, p);
if (future.is_ready()) {
query.result = future.move_as_ok();
callback_->on_result(std::move(query));
@@ -301,9 +301,8 @@ class SimpleActor final : public td::Actor {
}
q_++;
p_ = td::Random::fast_bool() ? 1 : 10000;
- auto future = td::Random::fast(0, 3) == 0
- ? td::send_promise<td::ActorSendType::Immediate>(worker_, &Worker::query, q_, p_)
- : td::send_promise<td::ActorSendType::Later>(worker_, &Worker::query, q_, p_);
+ auto future = td::Random::fast(0, 3) == 0 ? td::send_promise_immediately(worker_, &Worker::query, q_, p_)
+ : td::send_promise_later(worker_, &Worker::query, q_, p_);
if (future.is_ready()) {
auto result = future.move_as_ok();
CHECK(result == fast_pow_mod_uint32(q_, p_));
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp
index e01b356e52..89a220d50e 100644
--- a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -166,7 +166,7 @@ TEST(Actors, simple_pass_event_arguments) {
// Var-->LvalueRef
// Var-->LvalueRef (Delayed)
- // CE or stange behaviour
+ // CE or strange behaviour
// Var-->Value
sb.clear();
@@ -292,7 +292,6 @@ class OpenClose final : public td::Actor {
cnt_--;
yield();
} else {
- td::unlink(file_name).ignore();
td::Scheduler::instance()->finish();
}
}
@@ -310,6 +309,7 @@ TEST(Actors, open_close) {
while (scheduler.run_main(10)) {
}
scheduler.finish();
+ td::unlink("server").ignore();
}
class MsgActor : public td::Actor {