summaryrefslogtreecommitdiff
path: root/protocols/Telegram/tdlib/td/tdactor
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/Telegram/tdlib/td/tdactor')
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt39
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/example/example.cpp28
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/ConcurrentScheduler.cpp205
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/ConcurrentScheduler.h (renamed from protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.h)71
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/Condition.h47
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.cpp25
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.h72
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/MultiTimeout.cpp (renamed from protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.cpp)42
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/MultiTimeout.h81
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h360
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h15
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/SignalSlot.h18
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/SleepActor.h18
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.h83
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/actor.h3
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h20
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h46
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h72
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId.h69
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h35
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo.h65
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp102
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h70
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h6
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull.h4
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h149
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp250
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h213
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h117
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h84
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorState.h166
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp11
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.h1508
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h32
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp93
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp535
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp322
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp428
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp87
39 files changed, 1651 insertions, 3940 deletions
diff --git a/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt b/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt
index c0c83025e5..a156384ce9 100644
--- a/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt
+++ b/protocols/Telegram/tdlib/td/tdactor/CMakeLists.txt
@@ -1,14 +1,20 @@
-cmake_minimum_required(VERSION 3.0.2 FATAL_ERROR)
+if ((CMAKE_MAJOR_VERSION LESS 3) OR (CMAKE_VERSION VERSION_LESS "3.0.2"))
+ message(FATAL_ERROR "CMake >= 3.0.2 is required")
+endif()
+
+if (NOT DEFINED CMAKE_INSTALL_LIBDIR)
+ set(CMAKE_INSTALL_LIBDIR "lib")
+endif()
#SOURCE SETS
set(TDACTOR_SOURCE
- td/actor/impl/ConcurrentScheduler.cpp
+ td/actor/ConcurrentScheduler.cpp
td/actor/impl/Scheduler.cpp
td/actor/MultiPromise.cpp
- td/actor/Timeout.cpp
-
- td/actor/impl2/Scheduler.cpp
+ td/actor/MultiTimeout.cpp
+ td/actor/actor.h
+ td/actor/ConcurrentScheduler.h
td/actor/impl/Actor-decl.h
td/actor/impl/Actor.h
td/actor/impl/ActorId-decl.h
@@ -17,28 +23,19 @@ set(TDACTOR_SOURCE
td/actor/impl/ActorInfo.h
td/actor/impl/EventFull-decl.h
td/actor/impl/EventFull.h
- td/actor/impl/ConcurrentScheduler.h
td/actor/impl/Event.h
td/actor/impl/Scheduler-decl.h
td/actor/impl/Scheduler.h
- td/actor/Condition.h
td/actor/MultiPromise.h
+ td/actor/MultiTimeout.h
td/actor/PromiseFuture.h
td/actor/SchedulerLocalStorage.h
td/actor/SignalSlot.h
td/actor/SleepActor.h
td/actor/Timeout.h
- td/actor/actor.h
-
- td/actor/impl2/ActorLocker.h
- td/actor/impl2/ActorSignals.h
- td/actor/impl2/ActorState.h
- td/actor/impl2/Scheduler.h
- td/actor/impl2/SchedulerId.h
)
set(TDACTOR_TEST_SOURCE
- ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_impl2.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test/actors_main.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test/actors_simple.cpp
${CMAKE_CURRENT_SOURCE_DIR}/test/actors_workers.cpp
@@ -54,12 +51,12 @@ add_library(tdactor STATIC ${TDACTOR_SOURCE})
target_include_directories(tdactor PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>)
target_link_libraries(tdactor PUBLIC tdutils)
-add_executable(example example/example.cpp)
-target_link_libraries(example PRIVATE tdactor)
+if (NOT CMAKE_CROSSCOMPILING)
+ add_executable(example example/example.cpp)
+ target_link_libraries(example PRIVATE tdactor)
+endif()
install(TARGETS tdactor EXPORT TdTargets
- LIBRARY DESTINATION lib
- ARCHIVE DESTINATION lib
- RUNTIME DESTINATION bin
- INCLUDES DESTINATION include
+ LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}"
+ ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}"
)
diff --git a/protocols/Telegram/tdlib/td/tdactor/example/example.cpp b/protocols/Telegram/tdlib/td/tdactor/example/example.cpp
index 4c2415c5e2..8f182d8350 100644
--- a/protocols/Telegram/tdlib/td/tdactor/example/example.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/example/example.cpp
@@ -1,31 +1,33 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/actor.h"
+#include "td/actor/ConcurrentScheduler.h"
#include "td/utils/logging.h"
+#include "td/utils/Time.h"
-class Worker : public td::Actor {
+class Worker final : public td::Actor {
public:
void ping(int x) {
- LOG(ERROR) << "got ping " << x;
+ LOG(ERROR) << "Got ping " << x;
}
};
-class MainActor : public td::Actor {
+class MainActor final : public td::Actor {
public:
- void start_up() override {
- LOG(ERROR) << "start up";
+ void start_up() final {
+ LOG(ERROR) << "Start up";
set_timeout_in(10);
worker_ = td::create_actor_on_scheduler<Worker>("Worker", 1);
send_closure(worker_, &Worker::ping, 123);
}
- void timeout_expired() override {
- LOG(ERROR) << "timeout expired";
+ void timeout_expired() final {
+ LOG(ERROR) << "Timeout expired";
td::Scheduler::instance()->finish();
}
@@ -33,17 +35,15 @@ class MainActor : public td::Actor {
td::ActorOwn<Worker> worker_;
};
-int main(void) {
- td::ConcurrentScheduler scheduler;
- scheduler.init(4 /*threads_count*/);
+int main() {
+ td::ConcurrentScheduler scheduler(4 /*thread_count*/, 0);
scheduler.start();
{
- auto guard = scheduler.get_current_guard();
+ auto guard = scheduler.get_main_guard();
td::create_actor_on_scheduler<MainActor>("Main actor", 0).release();
}
while (!scheduler.is_finished()) {
- scheduler.run_main(10);
+ scheduler.run_main(td::Timestamp::in(10));
}
scheduler.finish();
- return 0;
}
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/ConcurrentScheduler.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/ConcurrentScheduler.cpp
new file mode 100644
index 0000000000..9a3cf21338
--- /dev/null
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/ConcurrentScheduler.cpp
@@ -0,0 +1,205 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/actor/ConcurrentScheduler.h"
+
+#include "td/utils/ExitGuard.h"
+#include "td/utils/MpscPollableQueue.h"
+#include "td/utils/port/thread_local.h"
+#include "td/utils/ScopeGuard.h"
+
+#include <memory>
+
+namespace td {
+
+ConcurrentScheduler::ConcurrentScheduler(int32 additional_thread_count, uint64 thread_affinity_mask) {
+#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
+ additional_thread_count = 0;
+#endif
+ additional_thread_count++;
+ std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound(additional_thread_count);
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+ for (int32 i = 0; i < additional_thread_count; i++) {
+ auto queue = std::make_shared<MpscPollableQueue<EventFull>>();
+ queue->init();
+ outbound[i] = queue;
+ }
+ thread_affinity_mask_ = thread_affinity_mask;
+#endif
+
+ // +1 for extra scheduler for IOCP and send_closure from unrelated threads
+ // It will know about other schedulers
+ // Other schedulers will have no idea about its existence
+ extra_scheduler_ = 1;
+#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
+ extra_scheduler_ = 0;
+#endif
+
+ schedulers_.resize(additional_thread_count + extra_scheduler_);
+ for (int32 i = 0; i < additional_thread_count + extra_scheduler_; i++) {
+ auto &sched = schedulers_[i];
+ sched = make_unique<Scheduler>();
+
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+ if (i >= additional_thread_count) {
+ auto queue = std::make_shared<MpscPollableQueue<EventFull>>();
+ queue->init();
+ outbound.push_back(std::move(queue));
+ }
+#endif
+
+ sched->init(i, outbound, static_cast<Scheduler::Callback *>(this));
+ }
+
+#if TD_PORT_WINDOWS
+ iocp_ = make_unique<detail::Iocp>();
+ iocp_->init();
+#endif
+
+ state_ = State::Start;
+}
+
+void ConcurrentScheduler::test_one_thread_run() {
+ do {
+ for (auto &sched : schedulers_) {
+ sched->run(Timestamp::now_cached());
+ }
+ } while (!is_finished_.load(std::memory_order_relaxed));
+}
+
+#if !TD_THREAD_UNSUPPORTED
+thread::id ConcurrentScheduler::get_scheduler_thread_id(int32 sched_id) {
+ auto thread_pos = static_cast<size_t>(sched_id - 1);
+ CHECK(thread_pos < threads_.size());
+ return threads_[thread_pos].get_id();
+}
+#endif
+
+void ConcurrentScheduler::start() {
+ CHECK(state_ == State::Start);
+ is_finished_.store(false, std::memory_order_relaxed);
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+ for (size_t i = 1; i + extra_scheduler_ < schedulers_.size(); i++) {
+ auto &sched = schedulers_[i];
+ threads_.push_back(td::thread([&, thread_affinity_mask = thread_affinity_mask_] {
+#if TD_PORT_WINDOWS
+ detail::Iocp::Guard iocp_guard(iocp_.get());
+#endif
+#if TD_HAVE_THREAD_AFFINITY
+ if (thread_affinity_mask != 0) {
+ thread::set_affinity_mask(this_thread::get_id(), thread_affinity_mask).ignore();
+ }
+#else
+ (void)thread_affinity_mask;
+#endif
+ while (!is_finished()) {
+ sched->run(Timestamp::in(10));
+ }
+ }));
+ }
+#if TD_PORT_WINDOWS
+ iocp_thread_ = td::thread([this] {
+ auto guard = this->get_send_guard();
+ this->iocp_->loop();
+ });
+#endif
+#endif
+
+ state_ = State::Run;
+}
+
+static TD_THREAD_LOCAL double emscripten_timeout;
+
+bool ConcurrentScheduler::run_main(Timestamp timeout) {
+ CHECK(state_ == State::Run);
+ // run main scheduler in same thread
+ auto &main_sched = schedulers_[0];
+ if (!is_finished()) {
+#if TD_PORT_WINDOWS
+ detail::Iocp::Guard iocp_guard(iocp_.get());
+#endif
+ main_sched->run(timeout);
+ }
+
+ // hack for emscripten
+ emscripten_timeout = get_main_timeout().at();
+
+ return !is_finished();
+}
+
+Timestamp ConcurrentScheduler::get_main_timeout() {
+ CHECK(state_ == State::Run);
+ return schedulers_[0]->get_timeout();
+}
+
+double ConcurrentScheduler::emscripten_get_main_timeout() {
+ return Timestamp::at(emscripten_timeout).in();
+}
+void ConcurrentScheduler::emscripten_clear_main_timeout() {
+ emscripten_timeout = 0;
+}
+
+void ConcurrentScheduler::finish() {
+ CHECK(state_ == State::Run);
+ if (!is_finished()) {
+ on_finish();
+ }
+#if TD_PORT_WINDOWS
+ SCOPE_EXIT {
+ iocp_->clear();
+ };
+ detail::Iocp::Guard iocp_guard(iocp_.get());
+#endif
+
+ if (ExitGuard::is_exited()) {
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+ // prevent closing of schedulers from already killed by OS threads
+ for (auto &thread : threads_) {
+ thread.detach();
+ }
+#endif
+
+#if TD_PORT_WINDOWS
+ iocp_->interrupt_loop();
+ iocp_thread_.detach();
+#endif
+ return;
+ }
+
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+ for (auto &thread : threads_) {
+ thread.join();
+ }
+ threads_.clear();
+#endif
+
+#if TD_PORT_WINDOWS
+ iocp_->interrupt_loop();
+ iocp_thread_.join();
+#endif
+
+ schedulers_.clear();
+ for (auto &f : at_finish_) {
+ f();
+ }
+ at_finish_.clear();
+
+ state_ = State::Start;
+}
+
+void ConcurrentScheduler::on_finish() {
+ is_finished_.store(true, std::memory_order_relaxed);
+ for (auto &it : schedulers_) {
+ it->wakeup();
+ }
+}
+
+void ConcurrentScheduler::register_at_finish(std::function<void()> f) {
+ std::lock_guard<std::mutex> lock(at_finish_mutex_);
+ at_finish_.push_back(std::move(f));
+}
+
+} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/ConcurrentScheduler.h
index 1e9793eab4..3f574de5ef 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/ConcurrentScheduler.h
@@ -1,17 +1,21 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
-#include "td/actor/impl/Scheduler-decl.h"
+#include "td/actor/actor.h"
#include "td/utils/common.h"
-#include "td/utils/logging.h"
#include "td/utils/port/thread.h"
#include "td/utils/Slice.h"
+#include "td/utils/Time.h"
+
+#if TD_PORT_WINDOWS
+#include "td/utils/port/detail/Iocp.h"
+#endif
#include <atomic>
#include <functional>
@@ -20,34 +24,55 @@
namespace td {
-class ConcurrentScheduler : private Scheduler::Callback {
+class ConcurrentScheduler final : private Scheduler::Callback {
public:
- void init(int32 threads_n);
+ explicit ConcurrentScheduler(int32 additional_thread_count, uint64 thread_affinity_mask = 0);
void finish_async() {
schedulers_[0]->finish();
}
+
void wakeup() {
schedulers_[0]->wakeup();
}
- SchedulerGuard get_current_guard() {
+
+ SchedulerGuard get_main_guard() {
return schedulers_[0]->get_guard();
}
+ SchedulerGuard get_send_guard() {
+ return schedulers_.back()->get_const_guard();
+ }
+
void test_one_thread_run();
- bool is_finished() {
+ bool is_finished() const {
return is_finished_.load(std::memory_order_relaxed);
}
+#if TD_THREAD_UNSUPPORTED
+ int get_scheduler_thread_id(int32 sched_id) {
+ return 1;
+ }
+#else
+ thread::id get_scheduler_thread_id(int32 sched_id);
+#endif
+
void start();
- bool run_main(double timeout);
+ bool run_main(double timeout) {
+ return run_main(Timestamp::in(timeout));
+ }
+ bool run_main(Timestamp timeout);
+
+ Timestamp get_main_timeout();
+ static double emscripten_get_main_timeout();
+ static void emscripten_clear_main_timeout();
void finish();
template <class ActorT, class... Args>
- ActorOwn<ActorT> create_actor_unsafe(int32 sched_id, Slice name, Args &&... args) {
+ ActorOwn<ActorT> create_actor_unsafe(int32 sched_id, Slice name, Args &&...args) {
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
sched_id = 0;
#endif
@@ -68,26 +93,24 @@ class ConcurrentScheduler : private Scheduler::Callback {
private:
enum class State { Start, Run };
- State state_;
- std::vector<unique_ptr<Scheduler>> schedulers_;
- std::atomic<bool> is_finished_;
+ State state_ = State::Start;
std::mutex at_finish_mutex_;
- std::vector<std::function<void()>> at_finish_;
+ vector<std::function<void()>> at_finish_; // can be used during destruction by Scheduler destructors
+ vector<unique_ptr<Scheduler>> schedulers_;
+ std::atomic<bool> is_finished_{false};
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
- std::vector<thread> threads_;
+ vector<td::thread> threads_;
+ uint64 thread_affinity_mask_ = 0;
#endif
+#if TD_PORT_WINDOWS
+ unique_ptr<detail::Iocp> iocp_;
+ td::thread iocp_thread_;
+#endif
+ int32 extra_scheduler_ = 0;
- void on_finish() override {
- is_finished_.store(true, std::memory_order_relaxed);
- for (auto &it : schedulers_) {
- it->wakeup();
- }
- }
+ void on_finish() final;
- void register_at_finish(std::function<void()> f) override {
- std::lock_guard<std::mutex> lock(at_finish_mutex_);
- at_finish_.push_back(std::move(f));
- }
+ void register_at_finish(std::function<void()> f) final;
};
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/Condition.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/Condition.h
deleted file mode 100644
index c3799df487..0000000000
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/Condition.h
+++ /dev/null
@@ -1,47 +0,0 @@
-//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
-//
-// Distributed under the Boost Software License, Version 1.0. (See accompanying
-// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-//
-#pragma once
-
-#include "td/actor/actor.h"
-
-#include "td/utils/logging.h"
-
-namespace td {
-class Condition {
- class Helper : public Actor {
- public:
- void wait(Promise<> promise) {
- pending_promises_.push_back(std::move(promise));
- }
-
- private:
- std::vector<Promise<>> pending_promises_;
- void tear_down() override {
- for (auto &promise : pending_promises_) {
- promise.set_value(Unit());
- }
- }
- };
-
- public:
- Condition() {
- own_actor_ = create_actor<Helper>("helper");
- actor_ = own_actor_.get();
- }
- void wait(Promise<> promise) {
- send_closure(actor_, &Helper::wait, std::move(promise));
- }
- void set_true() {
- CHECK(!own_actor_.empty());
- own_actor_.reset();
- }
-
- private:
- ActorId<Helper> actor_;
- ActorOwn<Helper> own_actor_;
-};
-} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.cpp
index 0d98f5cfb4..f78e0f0161 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.cpp
@@ -1,19 +1,23 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/MultiPromise.h"
+#include "td/utils/logging.h"
+
namespace td {
+
void MultiPromiseActor::add_promise(Promise<Unit> &&promise) {
promises_.emplace_back(std::move(promise));
+ LOG(DEBUG) << "Add promise #" << promises_.size() << " to " << name_;
}
Promise<Unit> MultiPromiseActor::get_promise() {
if (empty()) {
- register_actor("MultiPromise", this).release();
+ register_actor(name_, this).release();
}
CHECK(!promises_.empty());
@@ -23,11 +27,13 @@ Promise<Unit> MultiPromiseActor::get_promise() {
future.set_event(EventCreator::raw(actor_id(), nullptr));
futures_.emplace_back(std::move(future));
- return PromiseCreator::from_promise_actor(std::move(promise));
+ LOG(DEBUG) << "Get promise #" << futures_.size() << " for " << name_;
+ return create_promise_from_promise_actor(std::move(promise));
}
void MultiPromiseActor::raw_event(const Event::Raw &event) {
received_results_++;
+ LOG(DEBUG) << "Receive result #" << received_results_ << " out of " << futures_.size() << " for " << name_;
if (received_results_ == futures_.size()) {
if (!ignore_errors_) {
for (auto &future : futures_) {
@@ -46,13 +52,21 @@ void MultiPromiseActor::set_ignore_errors(bool ignore_errors) {
}
void MultiPromiseActor::set_result(Result<Unit> &&result) {
- // MultiPromiseActor should be cleared before he begins to send out result
+ result_ = std::move(result);
+ stop();
+}
+
+void MultiPromiseActor::tear_down() {
+ LOG(DEBUG) << "Set result for " << promises_.size() << " promises in " << name_;
+
+ // MultiPromiseActor should be cleared before it begins to send out result
auto promises_copy = std::move(promises_);
promises_.clear();
auto futures_copy = std::move(futures_);
futures_.clear();
received_results_ = 0;
- stop();
+ auto result = std::move(result_);
+ result_ = Unit();
if (!promises_copy.empty()) {
for (size_t i = 0; i + 1 < promises_copy.size(); i++) {
@@ -87,4 +101,5 @@ MultiPromiseActorSafe::~MultiPromiseActorSafe() {
register_existing_actor(std::move(multi_promise_)).release();
}
}
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.h
index aa28947464..73b24d5d1c 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiPromise.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -10,7 +10,7 @@
#include "td/actor/PromiseFuture.h"
#include "td/utils/common.h"
-#include "td/utils/logging.h"
+#include "td/utils/Promise.h"
#include "td/utils/Status.h"
namespace td {
@@ -20,7 +20,6 @@ class MultiPromiseInterface {
virtual void add_promise(Promise<> &&promise) = 0;
virtual Promise<> get_promise() = 0;
- // deprecated?
virtual size_t promise_count() const = 0;
virtual void set_ignore_errors(bool ignore_errors) = 0;
@@ -32,85 +31,90 @@ class MultiPromiseInterface {
virtual ~MultiPromiseInterface() = default;
};
-class MultiPromise : public MultiPromiseInterface {
+class MultiPromise final : public MultiPromiseInterface {
public:
- void add_promise(Promise<> &&promise) override {
+ void add_promise(Promise<> &&promise) final {
impl_->add_promise(std::move(promise));
}
- Promise<> get_promise() override {
+ Promise<> get_promise() final {
return impl_->get_promise();
}
- // deprecated?
- size_t promise_count() const override {
+ size_t promise_count() const final {
return impl_->promise_count();
}
- void set_ignore_errors(bool ignore_errors) override {
+ void set_ignore_errors(bool ignore_errors) final {
impl_->set_ignore_errors(ignore_errors);
}
MultiPromise() = default;
- explicit MultiPromise(std::unique_ptr<MultiPromiseInterface> impl) : impl_(std::move(impl)) {
+ explicit MultiPromise(unique_ptr<MultiPromiseInterface> impl) : impl_(std::move(impl)) {
}
private:
- std::unique_ptr<MultiPromiseInterface> impl_;
+ unique_ptr<MultiPromiseInterface> impl_;
};
class MultiPromiseActor final
: public Actor
, public MultiPromiseInterface {
public:
- MultiPromiseActor() = default;
+ explicit MultiPromiseActor(string name) : name_(std::move(name)) {
+ }
- void add_promise(Promise<Unit> &&promise) override;
+ void add_promise(Promise<Unit> &&promise) final;
- Promise<Unit> get_promise() override;
+ Promise<Unit> get_promise() final;
- void set_ignore_errors(bool ignore_errors) override;
+ void set_ignore_errors(bool ignore_errors) final;
- size_t promise_count() const override;
+ size_t promise_count() const final;
private:
void set_result(Result<Unit> &&result);
+ string name_;
vector<Promise<Unit>> promises_; // promises waiting for result
vector<FutureActor<Unit>> futures_; // futures waiting for result of the queries
size_t received_results_ = 0;
bool ignore_errors_ = false;
+ Result<Unit> result_;
+
+ void raw_event(const Event::Raw &event) final;
- void raw_event(const Event::Raw &event) override;
+ void tear_down() final;
- void on_start_migrate(int32) override {
+ void on_start_migrate(int32) final {
UNREACHABLE();
}
- void on_finish_migrate() override {
+ void on_finish_migrate() final {
UNREACHABLE();
}
};
-class MultiPromiseActorSafe : public MultiPromiseInterface {
+template <>
+class ActorTraits<MultiPromiseActor> {
+ public:
+ static constexpr bool need_context = false;
+ static constexpr bool need_start_up = true;
+};
+
+class MultiPromiseActorSafe final : public MultiPromiseInterface {
public:
- void add_promise(Promise<Unit> &&promise) override;
- Promise<Unit> get_promise() override;
- void set_ignore_errors(bool ignore_errors) override;
- size_t promise_count() const override;
- MultiPromiseActorSafe() = default;
+ void add_promise(Promise<Unit> &&promise) final;
+ Promise<Unit> get_promise() final;
+ void set_ignore_errors(bool ignore_errors) final;
+ size_t promise_count() const final;
+ explicit MultiPromiseActorSafe(string name) : multi_promise_(td::make_unique<MultiPromiseActor>(std::move(name))) {
+ }
MultiPromiseActorSafe(const MultiPromiseActorSafe &other) = delete;
MultiPromiseActorSafe &operator=(const MultiPromiseActorSafe &other) = delete;
MultiPromiseActorSafe(MultiPromiseActorSafe &&other) = delete;
MultiPromiseActorSafe &operator=(MultiPromiseActorSafe &&other) = delete;
- ~MultiPromiseActorSafe() override;
+ ~MultiPromiseActorSafe() final;
private:
- std::unique_ptr<MultiPromiseActor> multi_promise_ = std::make_unique<MultiPromiseActor>();
-};
-
-class MultiPromiseCreator {
- public:
- static MultiPromise create() {
- return MultiPromise(std::make_unique<MultiPromiseActor>());
- }
+ unique_ptr<MultiPromiseActor> multi_promise_;
};
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiTimeout.cpp
index fa2e5ffff3..8c3dcb942f 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiTimeout.cpp
@@ -1,21 +1,21 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
-#include "td/actor/Timeout.h"
+#include "td/actor/MultiTimeout.h"
-#include "td/utils/Time.h"
+#include "td/utils/logging.h"
namespace td {
bool MultiTimeout::has_timeout(int64 key) const {
- return items_.find(Item(key)) != items_.end();
+ return items_.count(Item(key)) > 0;
}
void MultiTimeout::set_timeout_at(int64 key, double timeout) {
- LOG(DEBUG) << "Set timeout for " << key << " in " << timeout - Time::now();
+ LOG(DEBUG) << "Set " << get_name() << " for " << key << " in " << timeout - Time::now();
auto item = items_.emplace(key);
auto heap_node = static_cast<HeapNode *>(const_cast<Item *>(&*item.first));
if (heap_node->in_heap()) {
@@ -35,7 +35,7 @@ void MultiTimeout::set_timeout_at(int64 key, double timeout) {
}
void MultiTimeout::add_timeout_at(int64 key, double timeout) {
- LOG(DEBUG) << "Add timeout for " << key << " in " << timeout - Time::now();
+ LOG(DEBUG) << "Add " << get_name() << " for " << key << " in " << timeout - Time::now();
auto item = items_.emplace(key);
auto heap_node = static_cast<HeapNode *>(const_cast<Item *>(&*item.first));
if (heap_node->in_heap()) {
@@ -50,7 +50,7 @@ void MultiTimeout::add_timeout_at(int64 key, double timeout) {
}
void MultiTimeout::cancel_timeout(int64 key) {
- LOG(DEBUG) << "Cancel timeout for " << key;
+ LOG(DEBUG) << "Cancel " << get_name() << " for " << key;
auto item = items_.find(Item(key));
if (item != items_.end()) {
auto heap_node = static_cast<HeapNode *>(const_cast<Item *>(&*item));
@@ -67,30 +67,44 @@ void MultiTimeout::cancel_timeout(int64 key) {
void MultiTimeout::update_timeout() {
if (items_.empty()) {
- LOG(DEBUG) << "Cancel timeout";
+ LOG(DEBUG) << "Cancel timeout of " << get_name();
CHECK(timeout_queue_.empty());
CHECK(Actor::has_timeout());
Actor::cancel_timeout();
} else {
- LOG(DEBUG) << "Set timeout in " << timeout_queue_.top_key() - Time::now_cached();
+ LOG(DEBUG) << "Set timeout of " << get_name() << " in " << timeout_queue_.top_key() - Time::now_cached();
Actor::set_timeout_at(timeout_queue_.top_key());
}
}
-void MultiTimeout::timeout_expired() {
- double now = Time::now_cached();
+vector<int64> MultiTimeout::get_expired_keys(double now) {
+ vector<int64> expired_keys;
while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) {
int64 key = static_cast<Item *>(timeout_queue_.pop())->key;
items_.erase(Item(key));
- expired_.push_back(key);
+ expired_keys.push_back(key);
}
+ return expired_keys;
+}
+
+void MultiTimeout::timeout_expired() {
+ vector<int64> expired_keys = get_expired_keys(Time::now_cached());
if (!items_.empty()) {
update_timeout();
}
- for (auto key : expired_) {
+ for (auto key : expired_keys) {
+ callback_(data_, key);
+ }
+}
+
+void MultiTimeout::run_all() {
+ vector<int64> expired_keys = get_expired_keys(Time::now_cached() + 1e10);
+ if (!expired_keys.empty()) {
+ update_timeout();
+ }
+ for (auto key : expired_keys) {
callback_(data_, key);
}
- expired_.clear();
}
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiTimeout.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiTimeout.h
new file mode 100644
index 0000000000..64803d346d
--- /dev/null
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/MultiTimeout.h
@@ -0,0 +1,81 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/actor.h"
+
+#include "td/utils/common.h"
+#include "td/utils/Heap.h"
+#include "td/utils/Slice.h"
+#include "td/utils/Time.h"
+
+#include <set>
+
+namespace td {
+
+// TODO optimize
+class MultiTimeout final : public Actor {
+ struct Item final : public HeapNode {
+ int64 key;
+
+ explicit Item(int64 key) : key(key) {
+ }
+
+ bool operator<(const Item &other) const {
+ return key < other.key;
+ }
+ };
+
+ public:
+ using Data = void *;
+ using Callback = void (*)(Data, int64);
+ explicit MultiTimeout(Slice name) {
+ register_actor(name, this).release();
+ }
+
+ void set_callback(Callback callback) {
+ callback_ = callback;
+ }
+ void set_callback_data(Data data) {
+ data_ = data;
+ }
+
+ bool has_timeout(int64 key) const;
+
+ void set_timeout_in(int64 key, double timeout) {
+ set_timeout_at(key, Time::now() + timeout);
+ }
+
+ void add_timeout_in(int64 key, double timeout) {
+ add_timeout_at(key, Time::now() + timeout);
+ }
+
+ void set_timeout_at(int64 key, double timeout);
+
+ void add_timeout_at(int64 key, double timeout); // memcache semantics, doesn't replace old timeout
+
+ void cancel_timeout(int64 key);
+
+ void run_all();
+
+ private:
+ friend class Scheduler;
+
+ Callback callback_;
+ Data data_;
+
+ KHeap<double> timeout_queue_;
+ std::set<Item> items_;
+
+ void update_timeout();
+
+ void timeout_expired() final;
+
+ vector<int64> get_expired_keys(double now);
+};
+
+} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h
index 63156c3838..5ddc6e9848 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/PromiseFuture.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -10,174 +10,23 @@
#include "td/utils/Closure.h"
#include "td/utils/common.h"
-#include "td/utils/invoke.h" // for tuple_for_each
-#include "td/utils/logging.h"
+#include "td/utils/invoke.h"
+#include "td/utils/Promise.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/Status.h"
#include <tuple>
-#include <type_traits>
#include <utility>
namespace td {
-template <class T = Unit>
-class PromiseInterface {
- public:
- PromiseInterface() = default;
- PromiseInterface(const PromiseInterface &) = delete;
- PromiseInterface &operator=(const PromiseInterface &) = delete;
- PromiseInterface(PromiseInterface &&) = default;
- PromiseInterface &operator=(PromiseInterface &&) = default;
- virtual ~PromiseInterface() = default;
- virtual void set_value(T &&value) {
- set_result(std::move(value));
- }
- virtual void set_error(Status &&error) {
- set_result(std::move(error));
- }
- virtual void set_result(Result<T> &&result) {
- if (result.is_ok()) {
- set_value(result.move_as_ok());
- } else {
- set_error(result.move_as_error());
- }
- }
- virtual void start_migrate(int32 sched_id) {
- }
- virtual void finish_migrate() {
- }
-};
-
-template <class T = Unit>
-class FutureInterface {
- public:
- FutureInterface() = default;
- FutureInterface(const FutureInterface &) = delete;
- FutureInterface &operator=(const FutureInterface &) = delete;
- FutureInterface(FutureInterface &&) = default;
- FutureInterface &operator=(FutureInterface &&) = default;
- virtual ~FutureInterface() = default;
- virtual bool is_ready() = 0;
- virtual bool is_ok() = 0;
- virtual bool is_error() = 0;
- virtual const T &ok() = 0;
- virtual T move_as_ok() = 0;
- virtual const Status &error() = 0;
- virtual Status move_as_error() TD_WARN_UNUSED_RESULT = 0;
- virtual const Result<T> &result() = 0;
- virtual Result<T> move_as_result() TD_WARN_UNUSED_RESULT = 0;
-};
-
-template <class T>
-class SafePromise;
-
-template <class T = Unit>
-class Promise {
- public:
- void set_value(T &&value) {
- if (!promise_) {
- return;
- }
- promise_->set_value(std::move(value));
- promise_.reset();
- }
- void set_error(Status &&error) {
- if (!promise_) {
- return;
- }
- promise_->set_error(std::move(error));
- promise_.reset();
- }
- void set_result(Result<T> &&result) {
- if (!promise_) {
- return;
- }
- promise_->set_result(std::move(result));
- promise_.reset();
- }
- void reset() {
- promise_.reset();
- }
- void start_migrate(int32 sched_id) {
- if (!promise_) {
- return;
- }
- promise_->start_migrate(sched_id);
- }
- void finish_migrate() {
- if (!promise_) {
- return;
- }
- promise_->finish_migrate();
- }
- std::unique_ptr<PromiseInterface<T>> release() {
- return std::move(promise_);
- }
-
- Promise() = default;
- explicit Promise(std::unique_ptr<PromiseInterface<T>> promise) : promise_(std::move(promise)) {
- }
- Promise(SafePromise<T> &&other);
- Promise &operator=(SafePromise<T> &&other);
-
- explicit operator bool() {
- return static_cast<bool>(promise_);
- }
-
- private:
- std::unique_ptr<PromiseInterface<T>> promise_;
-};
-
-template <class T>
-void start_migrate(Promise<T> &promise, int32 sched_id) {
- // promise.start_migrate(sched_id);
-}
-template <class T>
-void finish_migrate(Promise<T> &promise) {
- // promise.finish_migrate();
-}
-
-template <class T = Unit>
-class SafePromise {
- public:
- SafePromise(Promise<T> promise, Result<T> result) : promise_(std::move(promise)), result_(std::move(result)) {
- }
- SafePromise(const SafePromise &other) = delete;
- SafePromise &operator=(const SafePromise &other) = delete;
- SafePromise(SafePromise &&other) = default;
- SafePromise &operator=(SafePromise &&other) = default;
- ~SafePromise() {
- if (promise_) {
- promise_.set_result(std::move(result_));
- }
- }
- Promise<T> release() {
- return std::move(promise_);
- }
-
- private:
- Promise<T> promise_;
- Result<T> result_;
-};
-
-template <class T>
-Promise<T>::Promise(SafePromise<T> &&other) : Promise(other.release()) {
-}
-template <class T>
-Promise<T> &Promise<T>::operator=(SafePromise<T> &&other) {
- *this = other.release();
- return *this;
-}
-
namespace detail {
-
-class EventPromise : public PromiseInterface<Unit> {
+class EventPromise final : public PromiseInterface<Unit> {
public:
- void set_value(Unit &&) override {
+ void set_value(Unit &&) final {
ok_.try_emit();
fail_.clear();
}
- void set_error(Status &&) override {
+ void set_error(Status &&) final {
do_set_error();
}
@@ -185,7 +34,7 @@ class EventPromise : public PromiseInterface<Unit> {
EventPromise &operator=(const EventPromise &other) = delete;
EventPromise(EventPromise &&other) = delete;
EventPromise &operator=(EventPromise &&other) = delete;
- ~EventPromise() override {
+ ~EventPromise() final {
do_set_error();
}
@@ -209,109 +58,23 @@ class EventPromise : public PromiseInterface<Unit> {
}
};
-template <typename T>
-struct GetArg : public GetArg<decltype(&T::operator())> {};
-
-template <class C, class R, class Arg>
-class GetArg<R (C::*)(Arg)> {
- public:
- using type = Arg;
-};
-template <class C, class R, class Arg>
-class GetArg<R (C::*)(Arg) const> {
+class SendClosure {
public:
- using type = Arg;
-};
-
-template <class T>
-using get_arg_t = std::decay_t<typename GetArg<T>::type>;
-
-template <class T>
-struct DropResult {
- using type = T;
-};
-
-template <class T>
-struct DropResult<Result<T>> {
- using type = T;
-};
-
-template <class T>
-using drop_result_t = typename DropResult<T>::type;
-
-template <class ValueT, class FunctionOkT, class FunctionFailT>
-class LambdaPromise : public PromiseInterface<ValueT> {
- enum OnFail { None, Ok, Fail };
-
- public:
- void set_value(ValueT &&value) override {
- ok_(std::move(value));
- on_fail_ = None;
- }
- void set_error(Status &&error) override {
- do_error(std::move(error));
- }
- LambdaPromise(const LambdaPromise &other) = delete;
- LambdaPromise &operator=(const LambdaPromise &other) = delete;
- LambdaPromise(LambdaPromise &&other) = delete;
- LambdaPromise &operator=(LambdaPromise &&other) = delete;
- ~LambdaPromise() override {
- do_error(Status::Error("Lost promise"));
- }
-
- template <class FromOkT, class FromFailT>
- LambdaPromise(FromOkT &&ok, FromFailT &&fail, bool use_ok_as_fail)
- : ok_(std::forward<FromOkT>(ok)), fail_(std::forward<FromFailT>(fail)), on_fail_(use_ok_as_fail ? Ok : Fail) {
- }
-
- private:
- FunctionOkT ok_;
- FunctionFailT fail_;
- OnFail on_fail_ = None;
-
- template <class FuncT, class ArgT = detail::get_arg_t<FuncT>>
- std::enable_if_t<std::is_assignable<ArgT, Status>::value> do_error_impl(FuncT &func, Status &&status) {
- func(std::move(status));
- }
-
- template <class FuncT, class ArgT = detail::get_arg_t<FuncT>>
- std::enable_if_t<!std::is_assignable<ArgT, Status>::value> do_error_impl(FuncT &func, Status &&status) {
- func(Auto());
- }
-
- void do_error(Status &&error) {
- switch (on_fail_) {
- case None:
- break;
- case Ok:
- do_error_impl(ok_, std::move(error));
- break;
- case Fail:
- fail_(std::move(error));
- break;
- }
- on_fail_ = None;
+ template <class... ArgsT>
+ void operator()(ArgsT &&...args) const {
+ send_closure(std::forward<ArgsT>(args)...);
}
};
+} // namespace detail
-template <class... ArgsT>
-class JoinPromise : public PromiseInterface<Unit> {
- public:
- explicit JoinPromise(ArgsT &&... arg) : promises_(std::forward<ArgsT>(arg)...) {
- }
- void set_value(Unit &&) override {
- tuple_for_each(promises_, [](auto &promise) { promise.set_value(Unit()); });
- }
- void set_error(Status &&error) override {
- tuple_for_each(promises_, [&error](auto &promise) { promise.set_error(error.clone()); });
- }
+inline Promise<Unit> create_event_promise(EventFull &&ok) {
+ return Promise<Unit>(td::make_unique<detail::EventPromise>(std::move(ok)));
+}
- private:
- std::tuple<std::decay_t<ArgsT>...> promises_;
-};
-} // namespace detail
+inline Promise<Unit> create_event_promise(EventFull ok, EventFull fail) {
+ return Promise<Unit>(td::make_unique<detail::EventPromise>(std::move(ok), std::move(fail)));
+}
-/*** FutureActor and PromiseActor ***/
template <class T>
class FutureActor;
@@ -321,7 +84,8 @@ class PromiseActor;
template <class T>
class ActorTraits<FutureActor<T>> {
public:
- static constexpr bool is_lite = true;
+ static constexpr bool need_context = false;
+ static constexpr bool need_start_up = false;
};
template <class T>
@@ -335,12 +99,12 @@ class PromiseActor final : public PromiseInterface<T> {
PromiseActor &operator=(const PromiseActor &other) = delete;
PromiseActor(PromiseActor &&) = default;
PromiseActor &operator=(PromiseActor &&) = default;
- ~PromiseActor() override {
+ ~PromiseActor() final {
close();
}
- void set_value(T &&value) override;
- void set_error(Status &&error) override;
+ void set_value(T &&value) final;
+ void set_error(Status &&error) final;
void close() {
future_id_.reset();
@@ -373,7 +137,7 @@ class PromiseActor final : public PromiseInterface<T> {
private:
ActorOwn<FutureActor<T>> future_id_;
EventFull event_;
- State state_;
+ State state_ = State::Hangup;
void init() {
state_ = State::Waiting;
@@ -384,9 +148,12 @@ class PromiseActor final : public PromiseInterface<T> {
template <class T>
class FutureActor final : public Actor {
friend class PromiseActor<T>;
- enum State { Waiting, Ready };
public:
+ enum State { Waiting, Ready };
+
+ static constexpr int HANGUP_ERROR_CODE = 426487;
+
FutureActor() = default;
FutureActor(const FutureActor &other) = delete;
@@ -395,7 +162,7 @@ class FutureActor final : public Actor {
FutureActor(FutureActor &&other) = default;
FutureActor &operator=(FutureActor &&other) = default;
- ~FutureActor() override = default;
+ ~FutureActor() final = default;
bool is_ok() const {
return is_ready() && result_.is_ok();
@@ -435,13 +202,17 @@ class FutureActor final : public Actor {
}
}
+ State get_state() const {
+ return state_;
+ }
+
template <class S>
friend void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future);
private:
EventFull event_;
- Result<T> result_;
- State state_;
+ Result<T> result_ = Status::Error(500, "Empty FutureActor");
+ State state_ = State::Waiting;
void set_value(T &&value) {
set_result(std::move(value));
@@ -459,11 +230,11 @@ class FutureActor final : public Actor {
event_.try_emit_later();
}
- void hangup() override {
- set_error(Status::Hangup());
+ void hangup() final {
+ set_error(Status::Error<HANGUP_ERROR_CODE>());
}
- void start_up() override {
+ void start_up() final {
// empty
}
@@ -520,51 +291,26 @@ class PromiseFuture {
FutureActor<T> future_;
};
-template <class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, class... ArgsT>
-FutureActor<T> send_promise(ActorId<ActorAT> actor_id, Send::Flags flags,
- ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...), ArgsT &&... args) {
+template <ActorSendType send_type, class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT,
+ class... ArgsT>
+FutureActor<T> send_promise(ActorId<ActorAT> actor_id, ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...),
+ ArgsT &&...args) {
PromiseFuture<T> pf;
- ::td::Scheduler::instance()->send_closure(
- std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...), flags);
+ Scheduler::instance()->send_closure<send_type>(
+ std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...));
return pf.move_future();
}
-class PromiseCreator {
- public:
- struct Ignore {
- void operator()(Status &&error) {
- error.ignore();
- }
+template <class... ArgsT>
+auto promise_send_closure(ArgsT &&...args) {
+ return [t = std::make_tuple(std::forward<ArgsT>(args)...)](auto &&res) mutable {
+ call_tuple(detail::SendClosure(), std::tuple_cat(std::move(t), std::make_tuple(std::forward<decltype(res)>(res))));
};
+}
- template <class OkT, class ArgT = detail::drop_result_t<detail::get_arg_t<OkT>>>
- static Promise<ArgT> lambda(OkT &&ok) {
- return Promise<ArgT>(std::make_unique<detail::LambdaPromise<ArgT, std::decay_t<OkT>, Ignore>>(std::forward<OkT>(ok),
- Ignore(), true));
- }
-
- template <class OkT, class FailT, class ArgT = detail::get_arg_t<OkT>>
- static Promise<ArgT> lambda(OkT &&ok, FailT &&fail) {
- return Promise<ArgT>(std::make_unique<detail::LambdaPromise<ArgT, std::decay_t<OkT>, std::decay_t<FailT>>>(
- std::forward<OkT>(ok), std::forward<FailT>(fail), false));
- }
-
- static Promise<> event(EventFull &&ok) {
- return Promise<>(std::make_unique<detail::EventPromise>(std::move(ok)));
- }
-
- static Promise<> event(EventFull ok, EventFull fail) {
- return Promise<>(std::make_unique<detail::EventPromise>(std::move(ok), std::move(fail)));
- }
-
- template <class... ArgsT>
- static Promise<> join(ArgsT &&... args) {
- return Promise<>(std::make_unique<detail::JoinPromise<ArgsT...>>(std::forward<ArgsT>(args)...));
- }
+template <class T>
+Promise<T> create_promise_from_promise_actor(PromiseActor<T> &&from) {
+ return Promise<T>(td::make_unique<PromiseActor<T>>(std::move(from)));
+}
- template <class T>
- static Promise<T> from_promise_actor(PromiseActor<T> &&from) {
- return Promise<T>(std::make_unique<PromiseActor<T>>(std::move(from)));
- }
-};
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h
index f505836a16..b89a283f74 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -8,12 +8,13 @@
#include "td/actor/actor.h"
-#include "td/utils/logging.h"
+#include "td/utils/common.h"
#include "td/utils/optional.h"
#include <functional>
namespace td {
+
template <class T>
class SchedulerLocalStorage {
public:
@@ -45,6 +46,16 @@ class LazySchedulerLocalStorage {
LazySchedulerLocalStorage() = default;
explicit LazySchedulerLocalStorage(std::function<T()> create_func) : create_func_(std::move(create_func)) {
}
+ void set_create_func(std::function<T()> create_func) {
+ CHECK(!create_func_);
+ create_func_ = create_func;
+ }
+
+ void set(T &&t) {
+ auto &optional_value_ = sls_optional_value_.get();
+ CHECK(!optional_value_);
+ optional_value_ = std::move(t);
+ }
T &get() {
auto &optional_value_ = sls_optional_value_.get();
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/SignalSlot.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/SignalSlot.h
index 73b48f58ed..e1fd36323a 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/SignalSlot.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/SignalSlot.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -9,17 +9,20 @@
#include "td/actor/actor.h"
namespace td {
+
class Slot;
+
class Signal {
public:
void emit();
- explicit Signal(ActorId<Slot> slot_id) : slot_id_(slot_id) {
+ explicit Signal(ActorId<Slot> slot_id) : slot_id_(std::move(slot_id)) {
}
private:
ActorId<Slot> slot_id_;
};
+
class Slot final : public Actor {
public:
Slot() = default;
@@ -27,7 +30,7 @@ class Slot final : public Actor {
Slot &operator=(const Slot &other) = delete;
Slot(Slot &&) = default;
Slot &operator=(Slot &&) = default;
- ~Slot() override {
+ ~Slot() final {
close();
}
void set_event(EventFull &&event) {
@@ -69,18 +72,18 @@ class Slot final : public Actor {
}
ActorShared<> get_signal_new() {
register_if_empty();
- return actor_shared();
+ return actor_shared(this);
}
private:
bool was_signal_ = false;
EventFull event_;
- void timeout_expired() override {
+ void timeout_expired() final {
signal();
}
- void start_up() override {
+ void start_up() final {
empty();
}
@@ -97,10 +100,11 @@ class Slot final : public Actor {
event_.try_emit_later();
}
}
- void hangup_shared() override {
+ void hangup_shared() final {
signal();
}
};
+
inline void Signal::emit() {
send_closure(slot_id_, &Slot::signal);
}
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/SleepActor.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/SleepActor.h
index 9b9981ec38..8682ab0df5 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/SleepActor.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/SleepActor.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -8,11 +8,12 @@
#include "td/actor/actor.h"
-#include "td/actor/PromiseFuture.h"
+#include "td/utils/common.h"
+#include "td/utils/Promise.h"
namespace td {
-class SleepActor : public Actor {
+class SleepActor final : public Actor {
public:
SleepActor(double timeout, Promise<> promise) : timeout_(timeout), promise_(std::move(promise)) {
}
@@ -21,13 +22,20 @@ class SleepActor : public Actor {
double timeout_;
Promise<> promise_;
- void start_up() override {
+ void start_up() final {
set_timeout_in(timeout_);
}
- void timeout_expired() override {
+ void timeout_expired() final {
promise_.set_value(Unit());
stop();
}
};
+template <>
+class ActorTraits<SleepActor> {
+ public:
+ static constexpr bool need_context = false;
+ static constexpr bool need_start_up = true;
+};
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.h
index a3a9ba1913..cde72657b8 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/Timeout.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -8,13 +8,10 @@
#include "td/actor/actor.h"
-#include "td/utils/Heap.h"
-#include "td/utils/logging.h"
-#include "td/utils/Time.h"
-
-#include <set>
+#include "td/utils/common.h"
namespace td {
+
class Timeout final : public Actor {
public:
using Data = void *;
@@ -33,9 +30,15 @@ class Timeout final : public Actor {
bool has_timeout() const {
return Actor::has_timeout();
}
+ double get_timeout() const {
+ return Actor::get_timeout();
+ }
void set_timeout_in(double timeout) {
Actor::set_timeout_in(timeout);
}
+ void set_timeout_at(double timeout) {
+ Actor::set_timeout_at(timeout);
+ }
void cancel_timeout() {
if (has_timeout()) {
Actor::cancel_timeout();
@@ -47,14 +50,10 @@ class Timeout final : public Actor {
private:
friend class Scheduler;
- Callback callback_;
- Data data_;
+ Callback callback_{};
+ Data data_{};
- void set_timeout_at(double timeout) {
- Actor::set_timeout_at(timeout);
- }
-
- void timeout_expired() override {
+ void timeout_expired() final {
CHECK(!has_timeout());
CHECK(callback_ != Callback());
Callback callback = callback_;
@@ -66,62 +65,4 @@ class Timeout final : public Actor {
}
};
-// TODO optimize
-class MultiTimeout final : public Actor {
- struct Item : public HeapNode {
- int64 key;
-
- explicit Item(int64 key) : key(key) {
- }
-
- bool operator<(const Item &other) const {
- return key < other.key;
- }
- };
-
- public:
- using Data = void *;
- using Callback = void (*)(Data, int64);
- MultiTimeout() {
- register_actor("MultiTimeout", this).release();
- }
-
- void set_callback(Callback callback) {
- callback_ = callback;
- }
- void set_callback_data(Data data) {
- data_ = data;
- }
-
- bool has_timeout(int64 key) const;
-
- void set_timeout_in(int64 key, double timeout) {
- set_timeout_at(key, Time::now() + timeout);
- }
-
- void add_timeout_in(int64 key, double timeout) {
- add_timeout_at(key, Time::now() + timeout);
- }
-
- void set_timeout_at(int64 key, double timeout);
-
- void add_timeout_at(int64 key, double timeout); // memcache semantics, doesn't replace old timeout
-
- void cancel_timeout(int64 key);
-
- private:
- friend class Scheduler;
-
- Callback callback_;
- Data data_;
-
- KHeap<double> timeout_queue_;
- std::set<Item> items_;
- std::vector<int64> expired_;
-
- void update_timeout();
-
- void timeout_expired() override;
-};
-
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/actor.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/actor.h
index dadfadc055..0aed51710c 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/actor.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/actor.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -9,6 +9,5 @@
#include "td/actor/impl/Actor.h"
#include "td/actor/impl/ActorId.h"
#include "td/actor/impl/ActorInfo.h"
-#include "td/actor/impl/ConcurrentScheduler.h"
#include "td/actor/impl/EventFull.h"
#include "td/actor/impl/Scheduler.h"
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h
index 4342214800..b0e75bd21c 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor-decl.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -24,8 +24,8 @@ class Actor : public ObserverBase {
Actor() = default;
Actor(const Actor &) = delete;
Actor &operator=(const Actor &) = delete;
- Actor(Actor &&other);
- Actor &operator=(Actor &&other);
+ Actor(Actor &&other) noexcept;
+ Actor &operator=(Actor &&other) noexcept;
~Actor() override {
if (!empty()) {
do_stop();
@@ -67,6 +67,7 @@ class Actor : public ObserverBase {
void stop();
void do_stop();
bool has_timeout() const;
+ double get_timeout() const;
void set_timeout_in(double timeout_in);
void set_timeout_at(double timeout_at);
void cancel_timeout();
@@ -74,8 +75,9 @@ class Actor : public ObserverBase {
void do_migrate(int32 sched_id);
uint64 get_link_token();
- void set_context(std::shared_ptr<ActorContext> context);
- void set_tag(CSlice tag);
+ std::weak_ptr<ActorContext> get_context_weak_ptr() const;
+ std::shared_ptr<ActorContext> set_context(std::shared_ptr<ActorContext> context);
+ string set_tag(string tag);
void always_wait_for_mailbox();
@@ -88,10 +90,10 @@ class Actor : public ObserverBase {
bool empty() const;
template <class FuncT, class... ArgsT>
- auto self_closure(FuncT &&func, ArgsT &&... args);
+ auto self_closure(FuncT &&func, ArgsT &&...args);
template <class SelfT, class FuncT, class... ArgsT>
- auto self_closure(SelfT *self, FuncT &&func, ArgsT &&... args);
+ auto self_closure(SelfT *self, FuncT &&func, ArgsT &&...args);
template <class LambdaT>
auto self_lambda(LambdaT &&lambda);
@@ -101,7 +103,6 @@ class Actor : public ObserverBase {
template <class SelfT>
ActorId<SelfT> actor_id(SelfT *self);
- ActorShared<> actor_shared();
template <class SelfT>
ActorShared<SelfT> actor_shared(SelfT *self, uint64 id = static_cast<uint64>(-1));
@@ -114,7 +115,8 @@ class Actor : public ObserverBase {
template <class ActorT>
class ActorTraits {
public:
- static constexpr bool is_lite = false;
+ static constexpr bool need_context = true;
+ static constexpr bool need_start_up = true;
};
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h
index 3fe5e20abf..d190a2158e 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Actor.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -10,7 +10,7 @@
#include "td/actor/impl/EventFull-decl.h"
#include "td/actor/impl/Scheduler-decl.h"
-#include "td/utils/logging.h"
+#include "td/utils/common.h"
#include "td/utils/ObjectPool.h"
#include "td/utils/Slice.h"
@@ -19,14 +19,15 @@
#include <utility>
namespace td {
-inline Actor::Actor(Actor &&other) {
+
+inline Actor::Actor(Actor &&other) noexcept {
CHECK(info_.empty());
info_ = std::move(other.info_);
if (!empty()) {
info_->on_actor_moved(this);
}
}
-inline Actor &Actor::operator=(Actor &&other) {
+inline Actor &Actor::operator=(Actor &&other) noexcept {
CHECK(info_.empty());
info_ = std::move(other.info_);
if (!empty()) {
@@ -51,7 +52,10 @@ inline void Actor::do_stop() {
CHECK(empty());
}
inline bool Actor::has_timeout() const {
- return Scheduler::instance()->has_actor_timeout(this);
+ return get_info()->get_heap_node()->in_heap();
+}
+inline double Actor::get_timeout() const {
+ return Scheduler::instance()->get_actor_timeout(this);
}
inline void Actor::set_timeout_in(double timeout_in) {
Scheduler::instance()->set_actor_timeout_in(this, timeout_in);
@@ -75,32 +79,48 @@ std::enable_if_t<std::is_base_of<Actor, ActorType>::value> start_migrate(ActorTy
Scheduler::instance()->start_migrate_actor(&obj, sched_id);
}
}
+
template <class ActorType>
std::enable_if_t<std::is_base_of<Actor, ActorType>::value> finish_migrate(ActorType &obj) {
if (!obj.empty()) {
Scheduler::instance()->finish_migrate_actor(&obj);
}
}
+
inline uint64 Actor::get_link_token() {
return Scheduler::instance()->get_link_token(this);
}
-inline void Actor::set_context(std::shared_ptr<ActorContext> context) {
- info_->set_context(std::move(context));
+
+inline std::weak_ptr<ActorContext> Actor::get_context_weak_ptr() const {
+ return info_->get_context_weak_ptr();
+}
+
+inline std::shared_ptr<ActorContext> Actor::set_context(std::shared_ptr<ActorContext> context) {
+ return info_->set_context(std::move(context));
}
-inline void Actor::set_tag(CSlice tag) {
- info_->get_context()->tag_ = tag.c_str();
+
+inline string Actor::set_tag(string tag) {
+ auto *ctx = info_->get_context();
+ string old_tag;
+ if (ctx->tag_) {
+ old_tag = ctx->tag_;
+ }
+ ctx->set_tag(std::move(tag));
Scheduler::on_context_updated();
+ return old_tag;
}
inline void Actor::init(ObjectPool<ActorInfo>::OwnerPtr &&info) {
info_ = std::move(info);
}
+
inline ActorInfo *Actor::get_info() {
return &*info_;
}
inline const ActorInfo *Actor::get_info() const {
return &*info_;
}
+
inline ObjectPool<ActorInfo>::OwnerPtr Actor::clear() {
return std::move(info_);
}
@@ -118,22 +138,20 @@ ActorId<SelfT> Actor::actor_id(SelfT *self) {
return ActorId<SelfT>(info_.get_weak());
}
-inline ActorShared<> Actor::actor_shared() {
- return actor_shared(this);
-}
template <class SelfT>
ActorShared<SelfT> Actor::actor_shared(SelfT *self, uint64 id) {
CHECK(static_cast<Actor *>(self) == this);
+ CHECK(id != 0);
return ActorShared<SelfT>(actor_id(self), id);
}
template <class FuncT, class... ArgsT>
-auto Actor::self_closure(FuncT &&func, ArgsT &&... args) {
+auto Actor::self_closure(FuncT &&func, ArgsT &&...args) {
return self_closure(this, std::forward<FuncT>(func), std::forward<ArgsT>(args)...);
}
template <class SelfT, class FuncT, class... ArgsT>
-auto Actor::self_closure(SelfT *self, FuncT &&func, ArgsT &&... args) {
+auto Actor::self_closure(SelfT *self, FuncT &&func, ArgsT &&...args) {
return EventCreator::closure(actor_id(self), std::forward<FuncT>(func), std::forward<ArgsT>(args)...);
}
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h
index 5e82ed6a05..2b60c72472 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -12,8 +12,10 @@
#include <type_traits>
namespace td {
-class ActorInfo;
+
class Actor;
+class ActorInfo;
+
template <class ActorType = Actor>
class ActorId {
public:
@@ -21,12 +23,12 @@ class ActorId {
explicit ActorId(ObjectPool<ActorInfo>::WeakPtr ptr) : ptr_(ptr) {
}
ActorId() = default;
- ActorId(const ActorId &) = default;
- ActorId &operator=(const ActorId &) = default;
- ActorId(ActorId &&other) : ptr_(other.ptr_) {
- other.ptr_.clear();
+ ActorId(const ActorId &other) = default;
+ ActorId &operator=(const ActorId &other) = default;
+ ActorId(ActorId &&other) noexcept : ptr_(other.ptr_) {
+ other.clear();
}
- ActorId &operator=(ActorId &&other) {
+ ActorId &operator=(ActorId &&other) noexcept {
if (&other == this) {
return *this;
}
@@ -48,10 +50,8 @@ class ActorId {
}
ActorInfo *get_actor_info() const;
- ActorType *get_actor_unsafe() const;
- // returns pointer to actor if it is on current thread. nullptr otherwise
- ActorType *try_get_actor() const;
+ ActorType *get_actor_unsafe() const;
Slice get_name() const;
@@ -60,33 +60,27 @@ class ActorId {
return ActorId<ToActorType>(ptr_);
}
- template <class AsActorType>
- ActorId<AsActorType> as() const {
- return ActorId<AsActorType>(ptr_);
- }
-
private:
ObjectPool<ActorInfo>::WeakPtr ptr_;
};
-// threat ActorId as pointer and ActorOwn as
-// unique_ptr<ActorId>
+// treat ActorId as pointer and ActorOwn as unique_ptr<ActorId>
template <class ActorType = Actor>
class ActorOwn {
public:
using ActorT = ActorType;
ActorOwn() = default;
- explicit ActorOwn(ActorId<ActorType>);
+ explicit ActorOwn(ActorId<ActorType> id);
template <class OtherActorType>
explicit ActorOwn(ActorId<OtherActorType> id);
template <class OtherActorType>
- explicit ActorOwn(ActorOwn<OtherActorType> &&);
+ explicit ActorOwn(ActorOwn<OtherActorType> &&other);
template <class OtherActorType>
- ActorOwn &operator=(ActorOwn<OtherActorType> &&);
- ActorOwn(ActorOwn &&);
- ActorOwn &operator=(ActorOwn &&);
- ActorOwn(const ActorOwn &) = delete;
- ActorOwn &operator=(const ActorOwn &) = delete;
+ ActorOwn &operator=(ActorOwn<OtherActorType> &&other);
+ ActorOwn(ActorOwn &&other) noexcept;
+ ActorOwn &operator=(ActorOwn &&other) noexcept;
+ ActorOwn(const ActorOwn &other) = delete;
+ ActorOwn &operator=(const ActorOwn &other) = delete;
~ActorOwn();
bool empty() const;
@@ -96,11 +90,7 @@ class ActorOwn {
ActorId<ActorType> get() const;
ActorId<ActorType> release();
void reset(ActorId<ActorType> other = ActorId<ActorType>());
- void hangup() const;
- const ActorId<ActorType> *operator->() const;
-
- using ActorIdConstRef = const ActorId<ActorType> &;
- // operator ActorIdConstRef();
+ ActorType *get_actor_unsafe() const;
private:
ActorId<ActorType> id_;
@@ -112,17 +102,17 @@ class ActorShared {
using ActorT = ActorType;
ActorShared() = default;
template <class OtherActorType>
- ActorShared(ActorId<OtherActorType>, uint64 token);
+ ActorShared(ActorId<OtherActorType> id, uint64 token);
template <class OtherActorType>
- ActorShared(ActorShared<OtherActorType> &&);
+ ActorShared(ActorShared<OtherActorType> &&other);
template <class OtherActorType>
- ActorShared(ActorOwn<OtherActorType> &&);
+ ActorShared(ActorOwn<OtherActorType> &&other);
template <class OtherActorType>
- ActorShared &operator=(ActorShared<OtherActorType> &&);
- ActorShared(ActorShared &&);
- ActorShared &operator=(ActorShared &&);
- ActorShared(const ActorShared &) = delete;
- ActorShared &operator=(const ActorShared &) = delete;
+ ActorShared &operator=(ActorShared<OtherActorType> &&other);
+ ActorShared(ActorShared &&other) noexcept;
+ ActorShared &operator=(ActorShared &&other) noexcept;
+ ActorShared(const ActorShared &other) = delete;
+ ActorShared &operator=(const ActorShared &other) = delete;
~ActorShared();
uint64 token() const;
@@ -133,13 +123,10 @@ class ActorShared {
ActorId<ActorType> get() const;
ActorId<ActorType> release();
void reset(ActorId<ActorType> other = ActorId<ActorType>());
- template <class OtherActorType>
- void reset(ActorId<OtherActorType> other);
- const ActorId<ActorType> *operator->() const;
private:
ActorId<ActorType> id_;
- uint64 token_;
+ uint64 token_ = 0;
};
class ActorRef {
@@ -148,6 +135,8 @@ class ActorRef {
template <class T>
ActorRef(const ActorId<T> &actor_id);
template <class T>
+ ActorRef(ActorId<T> &&actor_id);
+ template <class T>
ActorRef(const ActorShared<T> &actor_id);
template <class T>
ActorRef(ActorShared<T> &&actor_id);
@@ -166,4 +155,5 @@ class ActorRef {
ActorId<> actor_id_;
uint64 token_ = 0;
};
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId.h
index 34d7970633..76ef6d9a8b 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorId.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -13,7 +13,6 @@
#include "td/utils/Slice.h"
namespace td {
-/*** ActorId ***/
// If actor is on our scheduler(thread) result will be valid
// If actor is on another scheduler we will see it in migrate_dest_flags
@@ -31,31 +30,24 @@ ActorType *ActorId<ActorType>::get_actor_unsafe() const {
}
template <class ActorType>
-ActorType *ActorId<ActorType>::try_get_actor() const {
- auto info = get_actor_info();
- if (info && !info->is_migrating() && Scheduler::instance()->sched_id() == info->migrate_dest()) {
- return static_cast<ActorType *>(info->get_actor_unsafe());
- }
- return nullptr;
-}
-
-template <class ActorType>
Slice ActorId<ActorType>::get_name() const {
return ptr_->get_name();
}
-// ActorOwn
template <class ActorType>
ActorOwn<ActorType>::ActorOwn(ActorId<ActorType> id) : id_(std::move(id)) {
}
+
template <class ActorType>
template <class OtherActorType>
ActorOwn<ActorType>::ActorOwn(ActorId<OtherActorType> id) : id_(std::move(id)) {
}
+
template <class ActorType>
template <class OtherActorType>
ActorOwn<ActorType>::ActorOwn(ActorOwn<OtherActorType> &&other) : id_(other.release()) {
}
+
template <class ActorType>
template <class OtherActorType>
ActorOwn<ActorType> &ActorOwn<ActorType>::operator=(ActorOwn<OtherActorType> &&other) {
@@ -64,10 +56,11 @@ ActorOwn<ActorType> &ActorOwn<ActorType>::operator=(ActorOwn<OtherActorType> &&o
}
template <class ActorType>
-ActorOwn<ActorType>::ActorOwn(ActorOwn &&other) : id_(other.release()) {
+ActorOwn<ActorType>::ActorOwn(ActorOwn &&other) noexcept : id_(other.release()) {
}
+
template <class ActorType>
-ActorOwn<ActorType> &ActorOwn<ActorType>::operator=(ActorOwn &&other) {
+ActorOwn<ActorType> &ActorOwn<ActorType>::operator=(ActorOwn &&other) noexcept {
reset(other.release());
return *this;
}
@@ -81,6 +74,7 @@ template <class ActorType>
bool ActorOwn<ActorType>::empty() const {
return id_.empty();
}
+
template <class ActorType>
ActorId<ActorType> ActorOwn<ActorType>::get() const {
return id_;
@@ -90,37 +84,36 @@ template <class ActorType>
ActorId<ActorType> ActorOwn<ActorType>::release() {
return std::move(id_);
}
+
template <class ActorType>
void ActorOwn<ActorType>::reset(ActorId<ActorType> other) {
static_assert(sizeof(ActorType) > 0, "Can't use ActorOwn with incomplete type");
- hangup();
- id_ = std::move(other);
-}
-
-template <class ActorType>
-void ActorOwn<ActorType>::hangup() const {
if (!id_.empty()) {
send_event(id_, Event::hangup());
}
+ id_ = std::move(other);
}
+
template <class ActorType>
-const ActorId<ActorType> *ActorOwn<ActorType>::operator->() const {
- return &id_;
+ActorType *ActorOwn<ActorType>::get_actor_unsafe() const {
+ return id_.get_actor_unsafe();
}
-// ActorShared
template <class ActorType>
template <class OtherActorType>
ActorShared<ActorType>::ActorShared(ActorId<OtherActorType> id, uint64 token) : id_(std::move(id)), token_(token) {
}
+
template <class ActorType>
template <class OtherActorType>
ActorShared<ActorType>::ActorShared(ActorShared<OtherActorType> &&other) : id_(other.release()), token_(other.token()) {
}
+
template <class ActorType>
template <class OtherActorType>
ActorShared<ActorType>::ActorShared(ActorOwn<OtherActorType> &&other) : id_(other.release()), token_(0) {
}
+
template <class ActorType>
template <class OtherActorType>
ActorShared<ActorType> &ActorShared<ActorType>::operator=(ActorShared<OtherActorType> &&other) {
@@ -130,10 +123,11 @@ ActorShared<ActorType> &ActorShared<ActorType>::operator=(ActorShared<OtherActor
}
template <class ActorType>
-ActorShared<ActorType>::ActorShared(ActorShared &&other) : id_(other.release()), token_(other.token_) {
+ActorShared<ActorType>::ActorShared(ActorShared &&other) noexcept : id_(other.release()), token_(other.token_) {
}
+
template <class ActorType>
-ActorShared<ActorType> &ActorShared<ActorType>::operator=(ActorShared &&other) {
+ActorShared<ActorType> &ActorShared<ActorType>::operator=(ActorShared &&other) noexcept {
reset(other.release());
token_ = other.token_;
return *this;
@@ -148,10 +142,12 @@ template <class ActorType>
uint64 ActorShared<ActorType>::token() const {
return token_;
}
+
template <class ActorType>
bool ActorShared<ActorType>::empty() const {
return id_.empty();
}
+
template <class ActorType>
ActorId<ActorType> ActorShared<ActorType>::get() const {
return id_;
@@ -161,38 +157,37 @@ template <class ActorType>
ActorId<ActorType> ActorShared<ActorType>::release() {
return std::move(id_);
}
-template <class ActorType>
-void ActorShared<ActorType>::reset(ActorId<ActorType> other) {
- reset<ActorType>(std::move(other));
-}
template <class ActorType>
-template <class OtherActorType>
-void ActorShared<ActorType>::reset(ActorId<OtherActorType> other) {
+void ActorShared<ActorType>::reset(ActorId<ActorType> other) {
static_assert(sizeof(ActorType) > 0, "Can't use ActorShared with incomplete type");
if (!id_.empty()) {
send_event(*this, Event::hangup());
}
- id_ = static_cast<ActorId<ActorType>>(other);
-}
-template <class ActorType>
-const ActorId<ActorType> *ActorShared<ActorType>::operator->() const {
- return &id_;
+ id_ = std::move(other);
}
-/*** ActorRef ***/
template <class T>
ActorRef::ActorRef(const ActorId<T> &actor_id) : actor_id_(actor_id) {
}
+
+template <class T>
+ActorRef::ActorRef(ActorId<T> &&actor_id) : actor_id_(actor_id) {
+ actor_id.clear();
+}
+
template <class T>
ActorRef::ActorRef(const ActorShared<T> &actor_id) : actor_id_(actor_id.get()), token_(actor_id.token()) {
}
+
template <class T>
ActorRef::ActorRef(ActorShared<T> &&actor_id) : actor_id_(actor_id.release()), token_(actor_id.token()) {
}
+
template <class T>
ActorRef::ActorRef(const ActorOwn<T> &actor_id) : actor_id_(actor_id.get()) {
}
+
template <class T>
ActorRef::ActorRef(ActorOwn<T> &&actor_id) : actor_id_(actor_id.release()) {
}
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h
index de9fba794e..3b9d3c2f2a 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -32,13 +32,24 @@ class ActorContext {
ActorContext(ActorContext &&) = delete;
ActorContext &operator=(ActorContext &&) = delete;
virtual ~ActorContext() = default;
+
+ virtual int32 get_id() const {
+ return 0;
+ }
+
+ void set_tag(string tag) {
+ tag_storage_ = std::move(tag);
+ tag_ = tag_storage_.c_str();
+ }
+
const char *tag_ = nullptr;
+ string tag_storage_; // sometimes tag_ == tag_storage_.c_str()
std::weak_ptr<ActorContext> this_ptr_;
};
-class ActorInfo
+class ActorInfo final
: private ListNode
- , HeapNode {
+ , private HeapNode {
public:
enum class Deleter : uint8 { Destroy, None };
@@ -52,11 +63,11 @@ class ActorInfo
ActorInfo &operator=(const ActorInfo &) = delete;
void init(int32 sched_id, Slice name, ObjectPool<ActorInfo>::OwnerPtr &&this_ptr, Actor *actor_ptr, Deleter deleter,
- bool is_lite);
+ bool need_context, bool need_start_up);
void on_actor_moved(Actor *actor_new_ptr);
template <class ActorT>
- ActorOwn<ActorT> transfer_ownership_to_scheduler(std::unique_ptr<ActorT> actor);
+ ActorOwn<ActorT> transfer_ownership_to_scheduler(unique_ptr<ActorT> actor);
void clear();
void destroy_actor();
@@ -74,7 +85,8 @@ class ActorInfo
Actor *get_actor_unsafe();
const Actor *get_actor_unsafe() const;
- void set_context(std::shared_ptr<ActorContext> context);
+ std::shared_ptr<ActorContext> set_context(std::shared_ptr<ActorContext> context);
+ std::weak_ptr<ActorContext> get_context_weak_ptr() const;
ActorContext *get_context();
const ActorContext *get_context() const;
CSlice get_name() const;
@@ -93,16 +105,18 @@ class ActorInfo
vector<Event> mailbox_;
- bool is_lite() const;
+ bool need_context() const;
+ bool need_start_up() const;
void set_wait_generation(uint32 wait_generation);
bool must_wait(uint32 wait_generation) const;
void always_wait_for_mailbox();
private:
- Deleter deleter_;
- bool is_lite_;
- bool is_running_;
+ Deleter deleter_ = Deleter::None;
+ bool need_context_ = true;
+ bool need_start_up_ = true;
+ bool is_running_ = false;
bool always_wait_for_mailbox_{false};
uint32 wait_generation_{0};
@@ -116,4 +130,5 @@ class ActorInfo
};
StringBuilder &operator<<(StringBuilder &sb, const ActorInfo &info);
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo.h
index df0b0dfd81..35ec31b168 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ActorInfo.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -11,7 +11,6 @@
#include "td/actor/impl/Scheduler-decl.h"
#include "td/utils/common.h"
-#include "td/utils/format.h"
#include "td/utils/Heap.h"
#include "td/utils/List.h"
#include "td/utils/logging.h"
@@ -24,59 +23,69 @@
#include <utility>
namespace td {
-/*** ActorInfo ***/
+
inline StringBuilder &operator<<(StringBuilder &sb, const ActorInfo &info) {
sb << info.get_name() << ":" << const_cast<void *>(static_cast<const void *>(&info)) << ":"
<< const_cast<void *>(static_cast<const void *>(info.get_context()));
return sb;
}
+
inline void ActorInfo::init(int32 sched_id, Slice name, ObjectPool<ActorInfo>::OwnerPtr &&this_ptr, Actor *actor_ptr,
- Deleter deleter, bool is_lite) {
+ Deleter deleter, bool need_context, bool need_start_up) {
CHECK(!is_running());
CHECK(!is_migrating());
sched_id_.store(sched_id, std::memory_order_relaxed);
actor_ = actor_ptr;
- if (!is_lite) {
+ if (need_context) {
context_ = Scheduler::context()->this_ptr_.lock();
+ VLOG(actor) << "Set context " << context_.get() << " for " << name;
+ }
#ifdef TD_DEBUG
- name_ = name.str();
+ name_.assign(name.data(), name.size());
#endif
- }
actor_->init(std::move(this_ptr));
deleter_ = deleter;
- is_lite_ = is_lite;
+ need_context_ = need_context;
+ need_start_up_ = need_start_up;
is_running_ = false;
wait_generation_ = 0;
}
-inline bool ActorInfo::is_lite() const {
- return is_lite_;
+
+inline bool ActorInfo::need_context() const {
+ return need_context_;
+}
+
+inline bool ActorInfo::need_start_up() const {
+ return need_start_up_;
}
+
inline void ActorInfo::set_wait_generation(uint32 wait_generation) {
wait_generation_ = wait_generation;
}
+
inline bool ActorInfo::must_wait(uint32 wait_generation) const {
return wait_generation_ == wait_generation || (always_wait_for_mailbox_ && !mailbox_.empty());
}
+
inline void ActorInfo::always_wait_for_mailbox() {
always_wait_for_mailbox_ = true;
}
+
inline void ActorInfo::on_actor_moved(Actor *actor_new_ptr) {
actor_ = actor_new_ptr;
}
inline void ActorInfo::clear() {
- // LOG_IF(WARNING, !mailbox_.empty()) << "Destroy actor with non-empty mailbox: " << get_name()
- // << format::as_array(mailbox_);
- mailbox_.clear();
+ CHECK(mailbox_.empty());
+ CHECK(!actor_);
CHECK(!is_running());
CHECK(!is_migrating());
// NB: must be in non migrating state
// store invalid scheduler id.
sched_id_.store((1 << 30) - 1, std::memory_order_relaxed);
- destroy_actor();
- // Destroy context only after destructor.
+ VLOG(actor) << "Clear context " << context_.get() << " for " << get_name();
context_.reset();
}
@@ -92,10 +101,11 @@ inline void ActorInfo::destroy_actor() {
break;
}
actor_ = nullptr;
+ mailbox_.clear();
}
template <class ActorT>
-ActorOwn<ActorT> ActorInfo::transfer_ownership_to_scheduler(std::unique_ptr<ActorT> actor) {
+ActorOwn<ActorT> ActorInfo::transfer_ownership_to_scheduler(unique_ptr<ActorT> actor) {
CHECK(!empty());
CHECK(deleter_ == Deleter::None);
ActorT *actor_ptr = actor.release();
@@ -142,14 +152,22 @@ inline const Actor *ActorInfo::get_actor_unsafe() const {
return actor_;
}
-inline void ActorInfo::set_context(std::shared_ptr<ActorContext> context) {
+inline std::shared_ptr<ActorContext> ActorInfo::set_context(std::shared_ptr<ActorContext> context) {
CHECK(is_running());
context->this_ptr_ = context;
- context->tag_ = Scheduler::context()->tag_;
- context_ = std::move(context);
+ if (Scheduler::context()->tag_) {
+ context->set_tag(Scheduler::context()->tag_);
+ }
+ std::swap(context_, context);
Scheduler::context() = context_.get();
Scheduler::on_context_updated();
+ return context;
}
+
+inline std::weak_ptr<ActorContext> ActorInfo::get_context_weak_ptr() const {
+ return context_;
+}
+
inline const ActorContext *ActorInfo::get_context() const {
return context_.get();
}
@@ -167,13 +185,15 @@ inline CSlice ActorInfo::get_name() const {
}
inline void ActorInfo::start_run() {
- VLOG(actor) << "start_run: " << *this;
- CHECK(!is_running_) << "Recursive call of actor " << tag("name", get_name());
+ VLOG(actor) << "Start run actor: " << *this;
+ LOG_CHECK(!is_running_) << "Recursive call of actor " << get_name();
is_running_ = true;
}
inline void ActorInfo::finish_run() {
is_running_ = false;
- VLOG(actor) << "stop_run: " << *this;
+ if (!empty()) {
+ VLOG(actor) << "Stop run actor: " << *this;
+ }
}
inline bool ActorInfo::is_running() const {
@@ -198,4 +218,5 @@ inline const ListNode *ActorInfo::get_list_node() const {
inline ActorInfo *ActorInfo::from_list_node(ListNode *node) {
return static_cast<ActorInfo *>(node);
}
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp
deleted file mode 100644
index 47593db90b..0000000000
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp
+++ /dev/null
@@ -1,102 +0,0 @@
-//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
-//
-// Distributed under the Boost Software License, Version 1.0. (See accompanying
-// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-//
-#include "td/actor/impl/ConcurrentScheduler.h"
-
-#include "td/actor/impl/Actor.h"
-#include "td/actor/impl/ActorId.h"
-#include "td/actor/impl/ActorInfo.h"
-#include "td/actor/impl/Scheduler.h"
-
-#include "td/utils/MpscPollableQueue.h"
-#include "td/utils/port/thread_local.h"
-
-#include <memory>
-
-namespace td {
-
-void ConcurrentScheduler::init(int32 threads_n) {
-#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
- threads_n = 0;
-#endif
- threads_n++;
- std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound(threads_n);
- for (int32 i = 0; i < threads_n; i++) {
-#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
-#else
- auto queue = std::make_shared<MpscPollableQueue<EventFull>>();
- queue->init();
- outbound[i] = queue;
-#endif
- }
-
- schedulers_.resize(threads_n);
- for (int32 i = 0; i < threads_n; i++) {
- auto &sched = schedulers_[i];
- sched = make_unique<Scheduler>();
- sched->init(i, outbound, static_cast<Scheduler::Callback *>(this));
- }
-
- state_ = State::Start;
-}
-
-void ConcurrentScheduler::test_one_thread_run() {
- do {
- for (auto &sched : schedulers_) {
- sched->run(0);
- }
- } while (!is_finished_.load(std::memory_order_relaxed));
-}
-
-void ConcurrentScheduler::start() {
- CHECK(state_ == State::Start);
- is_finished_.store(false, std::memory_order_relaxed);
- set_thread_id(0);
-#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
- for (size_t i = 1; i < schedulers_.size(); i++) {
- auto &sched = schedulers_[i];
- threads_.push_back(td::thread([&, tid = i]() {
- set_thread_id(static_cast<int32>(tid));
- while (!is_finished()) {
- sched->run(10);
- }
- }));
- }
-#endif
- state_ = State::Run;
-}
-
-bool ConcurrentScheduler::run_main(double timeout) {
- CHECK(state_ == State::Run);
- // run main scheduler in same thread
- auto &main_sched = schedulers_[0];
- if (!is_finished()) {
- main_sched->run(timeout);
- }
- return !is_finished();
-}
-
-void ConcurrentScheduler::finish() {
- CHECK(state_ == State::Run);
- if (!is_finished()) {
- on_finish();
- }
-#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
- for (auto &thread : threads_) {
- thread.join();
- }
- threads_.clear();
-#endif
- schedulers_.clear();
- for (auto &f : at_finish_) {
- f();
- }
- at_finish_.clear();
-
- state_ = State::Start;
-}
-
-} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h
index fac66dd120..2796c701e5 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Event.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -8,8 +8,6 @@
#include "td/utils/Closure.h"
#include "td/utils/common.h"
-#include "td/utils/format.h"
-#include "td/utils/logging.h"
#include "td/utils/StringBuilder.h"
#include <type_traits>
@@ -50,7 +48,6 @@ class CustomEvent {
virtual ~CustomEvent() = default;
virtual void run(Actor *actor) = 0;
- virtual CustomEvent *clone() const = 0;
virtual void start_migrate(int32 sched_id) {
}
virtual void finish_migrate() {
@@ -58,26 +55,23 @@ class CustomEvent {
};
template <class ClosureT>
-class ClosureEvent : public CustomEvent {
+class ClosureEvent final : public CustomEvent {
public:
- void run(Actor *actor) override {
+ void run(Actor *actor) final {
closure_.run(static_cast<typename ClosureT::ActorType *>(actor));
}
- CustomEvent *clone() const override {
- return new ClosureEvent<ClosureT>(closure_.clone());
- }
template <class... ArgsT>
- explicit ClosureEvent(ArgsT &&... args) : closure_(std::forward<ArgsT>(args)...) {
+ explicit ClosureEvent(ArgsT &&...args) : closure_(std::forward<ArgsT>(args)...) {
}
- void start_migrate(int32 sched_id) override {
+ void start_migrate(int32 sched_id) final {
closure_.for_each([sched_id](auto &obj) {
using ::td::start_migrate;
start_migrate(obj, sched_id);
});
}
- void finish_migrate() override {
+ void finish_migrate() final {
closure_.for_each([](auto &obj) {
using ::td::finish_migrate;
finish_migrate(obj);
@@ -89,16 +83,12 @@ class ClosureEvent : public CustomEvent {
};
template <class LambdaT>
-class LambdaEvent : public CustomEvent {
+class LambdaEvent final : public CustomEvent {
public:
- void run(Actor *actor) override {
+ void run(Actor *actor) final {
f_();
}
- CustomEvent *clone() const override {
- LOG(FATAL) << "Not supported";
- return nullptr;
- }
- template <class FromLambdaT>
+ template <class FromLambdaT, std::enable_if_t<!std::is_same<std::decay_t<FromLambdaT>, LambdaEvent>::value, int> = 0>
explicit LambdaEvent(FromLambdaT &&lambda) : f_(std::forward<FromLambdaT>(lambda)) {
}
@@ -153,7 +143,7 @@ class Event {
new ClosureEvent<typename FromImmediateClosureT::Delayed>(std::forward<FromImmediateClosureT>(closure)));
}
template <class... ArgsT>
- static Event delayed_closure(ArgsT &&... args) {
+ static Event delayed_closure(ArgsT &&...args) {
using DelayedClosureT = decltype(create_delayed_closure(std::forward<ArgsT>(args)...));
return custom(new ClosureEvent<DelayedClosureT>(std::forward<ArgsT>(args)...));
}
@@ -167,10 +157,10 @@ class Event {
}
Event(const Event &other) = delete;
Event &operator=(const Event &) = delete;
- Event(Event &&other) : type(other.type), link_token(other.link_token), data(other.data) {
+ Event(Event &&other) noexcept : type(other.type), link_token(other.link_token), data(other.data) {
other.type = Type::NoType;
}
- Event &operator=(Event &&other) {
+ Event &operator=(Event &&other) noexcept {
destroy();
type = other.type;
link_token = other.link_token;
@@ -182,17 +172,6 @@ class Event {
destroy();
}
- Event clone() const {
- Event res;
- res.type = type;
- if (type == Type::Custom) {
- res.data.custom_event = data.custom_event->clone();
- } else {
- res.data = data;
- }
- return res;
- }
-
bool empty() const {
return type == Type::NoType;
}
@@ -241,7 +220,28 @@ class Event {
}
}
};
-inline StringBuilder &operator<<(StringBuilder &sb, const Event &e) {
- return sb << tag("Event", static_cast<int32>(e.type));
+
+inline StringBuilder &operator<<(StringBuilder &string_builder, const Event &e) {
+ string_builder << "Event::";
+ switch (e.type) {
+ case Event::Type::Start:
+ return string_builder << "Start";
+ case Event::Type::Stop:
+ return string_builder << "Stop";
+ case Event::Type::Yield:
+ return string_builder << "Yield";
+ case Event::Type::Hangup:
+ return string_builder << "Hangup";
+ case Event::Type::Timeout:
+ return string_builder << "Timeout";
+ case Event::Type::Raw:
+ return string_builder << "Raw";
+ case Event::Type::Custom:
+ return string_builder << "Custom";
+ case Event::Type::NoType:
+ default:
+ return string_builder << "NoType";
+ }
}
+
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h
index ef2f1c2dcb..4137765111 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -45,7 +45,7 @@ class EventFull {
data_.link_token = actor_ref.token();
}
template <class T>
- EventFull(ActorId<T> actor_id, Event &&data) : actor_id_(actor_id), data_(std::move(data)) {
+ EventFull(ActorId<T> actor_id, Event &&data) : actor_id_(std::move(actor_id)), data_(std::move(data)) {
}
ActorId<> actor_id_;
@@ -56,7 +56,7 @@ class EventFull {
class EventCreator {
public:
template <class ActorIdT, class FunctionT, class... ArgsT>
- static EventFull closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
+ static EventFull closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&...args) {
using ActorT = typename std::decay_t<ActorIdT>::ActorT;
using FunctionClassT = member_function_class_t<FunctionT>;
static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull.h
index 1e997ee4b3..89eabef768 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/EventFull.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -9,7 +9,7 @@
#include "td/actor/impl/EventFull-decl.h"
#include "td/actor/impl/Scheduler-decl.h"
-#include "td/utils/logging.h"
+#include "td/utils/common.h"
#include <utility>
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h
index 4b51c102a5..8ed9feb10a 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -11,37 +11,40 @@
#include "td/actor/impl/EventFull-decl.h"
#include "td/utils/Closure.h"
+#include "td/utils/common.h"
+#include "td/utils/FlatHashMap.h"
#include "td/utils/Heap.h"
#include "td/utils/List.h"
+#include "td/utils/logging.h"
#include "td/utils/MovableValue.h"
#include "td/utils/MpscPollableQueue.h"
#include "td/utils/ObjectPool.h"
-#include "td/utils/port/EventFd.h"
-#include "td/utils/port/Fd.h"
+#include "td/utils/port/detail/PollableFd.h"
#include "td/utils/port/Poll.h"
+#include "td/utils/port/PollFlags.h"
#include "td/utils/port/thread_local.h"
+#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
+#include "td/utils/Time.h"
#include "td/utils/type_traits.h"
#include <functional>
-#include <map>
#include <memory>
#include <type_traits>
#include <utility>
namespace td {
+
+extern int VERBOSITY_NAME(actor);
+
class ActorInfo;
-struct Send {
- using Flags = uint32;
- static const Flags immediate = 0x001;
- static const Flags later = 0x002;
- static const Flags later_weak = 0x004;
-};
+
+enum class ActorSendType { Immediate, Later, LaterWeak };
class Scheduler;
class SchedulerGuard {
public:
- explicit SchedulerGuard(Scheduler *scheduler);
+ explicit SchedulerGuard(Scheduler *scheduler, bool lock = true);
~SchedulerGuard();
SchedulerGuard(const SchedulerGuard &other) = delete;
SchedulerGuard &operator=(const SchedulerGuard &other) = delete;
@@ -50,6 +53,7 @@ class SchedulerGuard {
private:
MovableValue<bool> is_valid_ = true;
+ bool is_locked_;
Scheduler *scheduler_;
ActorContext *save_context_;
Scheduler *save_scheduler_;
@@ -82,9 +86,9 @@ class Scheduler {
int32 sched_count() const;
template <class ActorT, class... Args>
- TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&... args);
+ TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&...args);
template <class ActorT, class... Args>
- TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args);
+ TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&...args);
template <class ActorT>
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, ActorT *actor_ptr, int32 sched_id = -1);
template <class ActorT>
@@ -96,22 +100,28 @@ class Scheduler {
void send_to_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
void send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
- template <class EventT>
- void send_lambda(ActorRef actor_ref, EventT &&lambda, Send::Flags flags = 0);
+ void run_on_scheduler(int32 sched_id, Promise<Unit> action); // TODO Action
+
+ template <class T>
+ void destroy_on_scheduler(int32 sched_id, T &value);
+
+ template <class... ArgsT>
+ void destroy_on_scheduler(int32 sched_id, ArgsT &...values);
+
+ template <ActorSendType send_type, class EventT>
+ void send_lambda(ActorRef actor_ref, EventT &&lambda);
- template <class EventT>
- void send_closure(ActorRef actor_ref, EventT &&closure, Send::Flags flags = 0);
+ template <ActorSendType send_type, class EventT>
+ void send_closure(ActorRef actor_ref, EventT &&closure);
- void send(ActorRef actor_ref, Event &&event, Send::Flags flags = 0);
+ template <ActorSendType send_type>
+ void send(ActorRef actor_ref, Event &&event);
- void hack(const ActorId<> &actor_id, Event &&event) {
- actor_id.get_actor_unsafe()->raw_event(event.data);
- }
void before_tail_send(const ActorId<> &actor_id);
- void subscribe(const Fd &fd, Fd::Flags flags = Fd::Write | Fd::Read);
- void unsubscribe(const Fd &fd);
- void unsubscribe_before_close(const Fd &fd);
+ static void subscribe(PollableFd fd, PollFlags flags = PollFlags::ReadWrite());
+ static void unsubscribe(PollableFdRef fd);
+ static void unsubscribe_before_close(PollableFdRef fd);
void yield_actor(Actor *actor);
void stop_actor(Actor *actor);
@@ -122,15 +132,15 @@ class Scheduler {
void start_migrate_actor(Actor *actor, int32 dest_sched_id);
void finish_migrate_actor(Actor *actor);
- bool has_actor_timeout(const Actor *actor) const;
+ double get_actor_timeout(const Actor *actor) const;
void set_actor_timeout_in(Actor *actor, double timeout);
void set_actor_timeout_at(Actor *actor, double timeout_at);
void cancel_actor_timeout(Actor *actor);
void finish();
void yield();
- void run(double timeout);
- void run_no_guard(double timeout);
+ void run(Timestamp timeout);
+ void run_no_guard(Timestamp timeout);
void wakeup();
@@ -139,22 +149,29 @@ class Scheduler {
static void on_context_updated();
SchedulerGuard get_guard();
+ SchedulerGuard get_const_guard();
+
+ Timestamp get_timeout();
private:
static void set_scheduler(Scheduler *scheduler);
- /*** ServiceActor ***/
+
+ void destroy_on_scheduler_impl(int32 sched_id, Promise<Unit> action);
+
class ServiceActor final : public Actor {
public:
void set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues);
- void start_up() override;
private:
std::shared_ptr<MpscPollableQueue<EventFull>> inbound_;
- void loop() override;
+ bool subscribed_{false};
+
+ void start_up() final;
+ void loop() final;
+ void tear_down() final;
};
friend class ServiceActor;
- void do_custom_event(ActorInfo *actor, CustomEvent &event);
void do_event(ActorInfo *actor, Event &&event);
void enter_actor(ActorInfo *actor_info);
@@ -168,7 +185,7 @@ class Scheduler {
void do_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id);
void start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id);
- bool has_actor_timeout(const ActorInfo *actor_info) const;
+ double get_actor_timeout(const ActorInfo *actor_info) const;
void set_actor_timeout_in(ActorInfo *actor_info, double timeout);
void set_actor_timeout_at(ActorInfo *actor_info, double timeout_at);
void cancel_actor_timeout(ActorInfo *actor_info);
@@ -180,15 +197,15 @@ class Scheduler {
template <class RunFuncT, class EventFuncT>
void flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func);
- template <class RunFuncT, class EventFuncT>
- void send_impl(const ActorId<> &actor_id, Send::Flags flags, const RunFuncT &run_func, const EventFuncT &event_func);
+ template <ActorSendType send_type, class RunFuncT, class EventFuncT>
+ void send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func);
void inc_wait_generation();
- double run_timeout();
+ Timestamp run_timeout();
void run_mailbox();
- double run_events();
- void run_poll(double timeout);
+ Timestamp run_events(Timestamp timeout);
+ void run_poll(Timestamp timeout);
template <class ActorT>
ActorOwn<ActorT> register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id);
@@ -198,42 +215,39 @@ class Scheduler {
static TD_THREAD_LOCAL ActorContext *context_;
Callback *callback_ = nullptr;
- std::unique_ptr<ObjectPool<ActorInfo>> actor_info_pool_;
+ unique_ptr<ObjectPool<ActorInfo>> actor_info_pool_;
- int32 actor_count_;
+ int32 actor_count_ = 0;
ListNode pending_actors_list_;
ListNode ready_actors_list_;
KHeap<double> timeout_queue_;
- std::map<ActorInfo *, std::vector<Event>> pending_events_;
+ FlatHashMap<ActorInfo *, std::vector<Event>> pending_events_;
-#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
- EventFd event_fd_;
-#endif
ServiceActor service_actor_;
Poll poll_;
- bool yield_flag_;
+ bool yield_flag_ = false;
bool has_guard_ = false;
bool close_flag_ = false;
- uint32 wait_generation_ = 0;
- int32 sched_id_;
- int32 sched_n_;
+ uint32 wait_generation_ = 1;
+ int32 sched_id_ = 0;
+ int32 sched_n_ = 0;
std::shared_ptr<MpscPollableQueue<EventFull>> inbound_queue_;
std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound_queues_;
std::shared_ptr<ActorContext> save_context_;
struct EventContext {
- int32 dest_sched_id;
+ int32 dest_sched_id{0};
enum Flags { Stop = 1, Migrate = 2 };
int32 flags{0};
- uint64 link_token;
+ uint64 link_token{0};
- ActorInfo *actor_info;
+ ActorInfo *actor_info{nullptr};
};
- EventContext *event_context_ptr_;
+ EventContext *event_context_ptr_{nullptr};
friend class GlobalScheduler;
friend class SchedulerGuard;
@@ -241,14 +255,10 @@ class Scheduler {
};
/*** Interface to current scheduler ***/
-void subscribe(const Fd &fd, Fd::Flags flags = Fd::Write | Fd::Read);
-void unsubscribe(const Fd &fd);
-void unsubscribe_before_close(const Fd &fd);
-
template <class ActorT, class... Args>
-TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&... args);
+TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&...args);
template <class ActorT, class... Args>
-TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args);
+TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&...args);
template <class ActorT>
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, ActorT *actor_ptr, int32 sched_id = -1);
template <class ActorT>
@@ -258,39 +268,38 @@ template <class ActorT>
TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_existing_actor(unique_ptr<ActorT> actor_ptr);
template <class ActorIdT, class FunctionT, class... ArgsT>
-void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
+void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&...args) {
using ActorT = typename std::decay_t<ActorIdT>::ActorT;
using FunctionClassT = member_function_class_t<FunctionT>;
static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
- Scheduler::instance()->send_closure(std::forward<ActorIdT>(actor_id),
- create_immediate_closure(function, std::forward<ArgsT>(args)...));
+ Scheduler::instance()->send_closure<ActorSendType::Immediate>(
+ std::forward<ActorIdT>(actor_id), create_immediate_closure(function, std::forward<ArgsT>(args)...));
}
template <class ActorIdT, class FunctionT, class... ArgsT>
-void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
+void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&...args) {
using ActorT = typename std::decay_t<ActorIdT>::ActorT;
using FunctionClassT = member_function_class_t<FunctionT>;
static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
- Scheduler::instance()->send(std::forward<ActorIdT>(actor_id),
- Event::delayed_closure(function, std::forward<ArgsT>(args)...), Send::later);
+ Scheduler::instance()->send<ActorSendType::Later>(std::forward<ActorIdT>(actor_id),
+ Event::delayed_closure(function, std::forward<ArgsT>(args)...));
}
template <class... ArgsT>
-void send_lambda(ActorRef actor_ref, ArgsT &&... args) {
- Scheduler::instance()->send_lambda(actor_ref, std::forward<ArgsT>(args)...);
+void send_lambda(ActorRef actor_ref, ArgsT &&...args) {
+ Scheduler::instance()->send_lambda<ActorSendType::Immediate>(actor_ref, std::forward<ArgsT>(args)...);
}
template <class... ArgsT>
-void send_event(ActorRef actor_ref, ArgsT &&... args) {
- Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)...);
+void send_event(ActorRef actor_ref, ArgsT &&...args) {
+ Scheduler::instance()->send<ActorSendType::Immediate>(actor_ref, std::forward<ArgsT>(args)...);
}
template <class... ArgsT>
-void send_event_later(ActorRef actor_ref, ArgsT &&... args) {
- Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)..., Send::later);
+void send_event_later(ActorRef actor_ref, ArgsT &&...args) {
+ Scheduler::instance()->send<ActorSendType::Later>(actor_ref, std::forward<ArgsT>(args)...);
}
-void yield_scheduler();
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp
index 479e419d62..38f2fc2e6f 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -13,19 +13,27 @@
#include "td/actor/impl/EventFull.h"
#include "td/utils/common.h"
+#include "td/utils/ExitGuard.h"
#include "td/utils/format.h"
#include "td/utils/List.h"
#include "td/utils/logging.h"
+#include "td/utils/misc.h"
+#include "td/utils/MpscPollableQueue.h"
#include "td/utils/ObjectPool.h"
#include "td/utils/port/thread_local.h"
+#include "td/utils/Promise.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/Time.h"
#include <functional>
+#include <iterator>
+#include <memory>
#include <utility>
namespace td {
+int VERBOSITY_NAME(actor) = VERBOSITY_NAME(DEBUG) + 10;
+
TD_THREAD_LOCAL Scheduler *Scheduler::scheduler_; // static zero-initialized
TD_THREAD_LOCAL ActorContext *Scheduler::context_; // static zero-initialized
@@ -49,6 +57,10 @@ void Scheduler::set_scheduler(Scheduler *scheduler) {
scheduler_ = scheduler;
}
+void Scheduler::ServiceActor::set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues) {
+ inbound_ = std::move(queues);
+}
+
void Scheduler::ServiceActor::start_up() {
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
CHECK(!inbound_);
@@ -56,10 +68,11 @@ void Scheduler::ServiceActor::start_up() {
if (!inbound_) {
return;
}
+#if !TD_PORT_WINDOWS
auto &fd = inbound_->reader_get_event_fd();
-
- fd.get_fd().set_observer(this);
- ::td::subscribe(fd.get_fd(), Fd::Read);
+ Scheduler::subscribe(fd.get_poll_info().extract_pollable_fd(this), PollFlags::Read());
+ subscribed_ = true;
+#endif
yield();
#endif
}
@@ -73,7 +86,11 @@ void Scheduler::ServiceActor::loop() {
while (ready_n-- > 0) {
EventFull event = queue->reader_get_unsafe();
if (event.actor_id().empty()) {
- Scheduler::instance()->register_migrated_actor(static_cast<ActorInfo *>(event.data().data.ptr));
+ if (event.data().empty()) {
+ Scheduler::instance()->yield();
+ } else {
+ Scheduler::instance()->register_migrated_actor(static_cast<ActorInfo *>(event.data().data.ptr));
+ }
} else {
VLOG(actor) << "Receive " << event.data();
finish_migrate(event.data());
@@ -84,10 +101,30 @@ void Scheduler::ServiceActor::loop() {
yield();
}
+void Scheduler::ServiceActor::tear_down() {
+ if (!subscribed_) {
+ return;
+ }
+#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
+ CHECK(!inbound_);
+#else
+ if (!inbound_) {
+ return;
+ }
+ auto &fd = inbound_->reader_get_event_fd();
+ Scheduler::unsubscribe(fd.get_poll_info().get_pollable_fd_ref());
+ subscribed_ = false;
+#endif
+}
+
/*** SchedlerGuard ***/
-SchedulerGuard::SchedulerGuard(Scheduler *scheduler) : scheduler_(scheduler) {
- CHECK(!scheduler_->has_guard_);
- scheduler_->has_guard_ = true;
+SchedulerGuard::SchedulerGuard(Scheduler *scheduler, bool lock) : scheduler_(scheduler) {
+ if (lock) {
+ // the next check can fail if OS killed the scheduler's thread without releasing the guard
+ CHECK(!scheduler_->has_guard_);
+ scheduler_->has_guard_ = true;
+ }
+ is_locked_ = lock;
save_scheduler_ = Scheduler::instance();
Scheduler::set_scheduler(scheduler_);
@@ -102,8 +139,10 @@ SchedulerGuard::~SchedulerGuard() {
if (is_valid_.get()) {
std::swap(save_context_, scheduler_->context());
Scheduler::set_scheduler(save_scheduler_);
- CHECK(scheduler_->has_guard_);
- scheduler_->has_guard_ = false;
+ if (is_locked_) {
+ CHECK(scheduler_->has_guard_);
+ scheduler_->has_guard_ = false;
+ }
LOG_TAG = save_tag_;
}
}
@@ -132,9 +171,11 @@ EventGuard::~EventGuard() {
}
info->finish_run();
swap_context(info);
- CHECK(info->is_lite() || save_context_ == info->get_context());
+ CHECK(!info->need_context() || save_context_ == info->get_context());
#ifdef TD_DEBUG
- CHECK(info->is_lite() || save_log_tag2_ == info->get_name().c_str());
+ LOG_CHECK(!info->need_context() || save_log_tag2_ == info->get_name().c_str())
+ << info->need_context() << " " << info->empty() << " " << info->is_migrating() << " " << save_log_tag2_ << " "
+ << info->get_name() << " " << scheduler_->close_flag_;
#endif
if (event_context_.flags & Scheduler::EventContext::Stop) {
scheduler_->do_stop_actor(info);
@@ -148,7 +189,7 @@ EventGuard::~EventGuard() {
void EventGuard::swap_context(ActorInfo *info) {
std::swap(scheduler_->event_context_ptr_, event_context_ptr_);
- if (info->is_lite()) {
+ if (!info->need_context()) {
return;
}
@@ -180,11 +221,6 @@ void Scheduler::init(int32 id, std::vector<std::shared_ptr<MpscPollableQueue<Eve
poll_.init();
-#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
- event_fd_.init();
- subscribe(event_fd_.get_fd(), Fd::Read);
-#endif
-
if (!outbound.empty()) {
inbound_queue_ = std::move(outbound[id]);
}
@@ -214,20 +250,12 @@ void Scheduler::clear() {
auto actor_info = ActorInfo::from_list_node(ready_actors_list_.get());
do_stop_actor(actor_info);
}
- LOG_IF(FATAL, !ready_actors_list_.empty()) << ActorInfo::from_list_node(ready_actors_list_.next)->get_name();
- CHECK(ready_actors_list_.empty());
poll_.clear();
-#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
- if (!event_fd_.empty()) {
- event_fd_.close();
- }
-#endif
-
- if (callback_) {
+ if (callback_ && !ExitGuard::is_exited()) {
// can't move lambda with unique_ptr inside into std::function
auto ptr = actor_info_pool_.release();
- callback_->register_at_finish([=]() { delete ptr; });
+ callback_->register_at_finish([ptr] { delete ptr; });
} else {
actor_info_pool_.reset();
}
@@ -236,59 +264,48 @@ void Scheduler::clear() {
void Scheduler::do_event(ActorInfo *actor_info, Event &&event) {
event_context_ptr_->link_token = event.link_token;
auto actor = actor_info->get_actor_unsafe();
+ VLOG(actor) << *actor_info << ' ' << event;
switch (event.type) {
- case Event::Type::Start: {
- VLOG(actor) << *actor_info << " Event::Start";
+ case Event::Type::Start:
actor->start_up();
break;
- }
- case Event::Type::Stop: {
- VLOG(actor) << *actor_info << " Event::Stop";
+ case Event::Type::Stop:
actor->tear_down();
break;
- }
- case Event::Type::Yield: {
- VLOG(actor) << *actor_info << " Event::Yield";
+ case Event::Type::Yield:
actor->wakeup();
break;
- }
- case Event::Type::Hangup: {
- auto token = get_link_token(actor);
- VLOG(actor) << *actor_info << " Event::Hangup " << tag("token", format::as_hex(token));
- if (token != 0) {
+ case Event::Type::Hangup:
+ if (get_link_token(actor) != 0) {
actor->hangup_shared();
} else {
actor->hangup();
}
break;
- }
- case Event::Type::Timeout: {
- VLOG(actor) << *actor_info << " Event::Timeout";
+ case Event::Type::Timeout:
actor->timeout_expired();
break;
- }
- case Event::Type::Raw: {
- VLOG(actor) << *actor_info << " Event::Raw";
+ case Event::Type::Raw:
actor->raw_event(event.data);
break;
- }
- case Event::Type::Custom: {
- do_custom_event(actor_info, *event.data.custom_event);
+ case Event::Type::Custom:
+ event.data.custom_event->run(actor);
break;
- }
- case Event::Type::NoType: {
+ case Event::Type::NoType:
+ default:
UNREACHABLE();
break;
- }
}
- // can't clear event here. It may be already destroyed during destory_actor
+ // can't clear event here. It may be already destroyed during destroy_actor
}
void Scheduler::register_migrated_actor(ActorInfo *actor_info) {
VLOG(actor) << "Register migrated actor: " << tag("name", *actor_info) << tag("ptr", actor_info)
<< tag("actor_count", actor_count_);
actor_count_++;
- CHECK(actor_info->is_migrating());
+ LOG_CHECK(actor_info->is_migrating()) << *actor_info << ' ' << actor_count_ << ' ' << sched_id_ << ' '
+ << actor_info->migrate_dest() << ' ' << actor_info->is_running() << ' '
+ << close_flag_;
CHECK(sched_id_ == actor_info->migrate_dest());
// CHECK(!actor_info->is_running());
actor_info->finish_migrate();
@@ -297,8 +314,8 @@ void Scheduler::register_migrated_actor(ActorInfo *actor_info) {
}
auto it = pending_events_.find(actor_info);
if (it != pending_events_.end()) {
- actor_info->mailbox_.insert(actor_info->mailbox_.end(), make_move_iterator(begin(it->second)),
- make_move_iterator(end(it->second)));
+ actor_info->mailbox_.insert(actor_info->mailbox_.end(), std::make_move_iterator(it->second.begin()),
+ std::make_move_iterator(it->second.end()));
pending_events_.erase(it);
}
if (actor_info->mailbox_.empty()) {
@@ -323,6 +340,43 @@ void Scheduler::send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_i
}
}
+void Scheduler::run_on_scheduler(int32 sched_id, Promise<Unit> action) {
+ if (sched_id >= 0 && sched_id_ != sched_id) {
+ class Worker final : public Actor {
+ public:
+ explicit Worker(Promise<Unit> action) : action_(std::move(action)) {
+ }
+
+ private:
+ Promise<Unit> action_;
+
+ void start_up() final {
+ action_.set_value(Unit());
+ stop();
+ }
+ };
+ create_actor_on_scheduler<Worker>("RunOnSchedulerWorker", sched_id, std::move(action)).release();
+ return;
+ }
+
+ action.set_value(Unit());
+}
+
+void Scheduler::destroy_on_scheduler_impl(int32 sched_id, Promise<Unit> action) {
+ auto empty_context = std::make_shared<ActorContext>();
+ empty_context->this_ptr_ = empty_context;
+ ActorContext *current_context = context_;
+ context_ = empty_context.get();
+
+ const char *current_tag = LOG_TAG;
+ LOG_TAG = nullptr;
+
+ run_on_scheduler(sched_id, std::move(action));
+
+ context_ = current_context;
+ LOG_TAG = current_tag;
+}
+
void Scheduler::add_to_mailbox(ActorInfo *actor_info, Event &&event) {
if (!actor_info->is_running()) {
auto node = actor_info->get_list_node();
@@ -338,9 +392,9 @@ void Scheduler::do_stop_actor(Actor *actor) {
}
void Scheduler::do_stop_actor(ActorInfo *actor_info) {
CHECK(!actor_info->is_migrating());
- CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_;
+ LOG_CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_;
ObjectPool<ActorInfo>::OwnerPtr owner_ptr;
- if (!actor_info->is_lite()) {
+ if (actor_info->need_start_up()) {
EventGuard guard(this, actor_info);
do_event(actor_info, Event::stop());
owner_ptr = actor_info->get_actor_unsafe()->clear();
@@ -349,6 +403,7 @@ void Scheduler::do_stop_actor(ActorInfo *actor_info) {
event_context_ptr_->flags = 0;
} else {
owner_ptr = actor_info->get_actor_unsafe()->clear();
+ actor_info->destroy_actor();
}
destroy_actor(actor_info);
}
@@ -382,6 +437,7 @@ void Scheduler::do_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
void Scheduler::start_migrate_actor(Actor *actor, int32 dest_sched_id) {
start_migrate_actor(actor->get_info(), dest_sched_id);
}
+
void Scheduler::start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
VLOG(actor) << "Start migrate actor: " << tag("name", actor_info) << tag("ptr", actor_info)
<< tag("actor_count", actor_count_);
@@ -396,6 +452,11 @@ void Scheduler::start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id)
cancel_actor_timeout(actor_info);
}
+double Scheduler::get_actor_timeout(const ActorInfo *actor_info) const {
+ const HeapNode *heap_node = actor_info->get_heap_node();
+ return heap_node->in_heap() ? timeout_queue_.get_key(heap_node) - Time::now() : 0.0;
+}
+
void Scheduler::set_actor_timeout_in(ActorInfo *actor_info, double timeout) {
if (timeout > 1e10) {
timeout = 1e10;
@@ -403,13 +464,13 @@ void Scheduler::set_actor_timeout_in(ActorInfo *actor_info, double timeout) {
if (timeout < 0) {
timeout = 0;
}
- double expire_at = Time::now() + timeout;
- set_actor_timeout_at(actor_info, expire_at);
+ double expires_at = Time::now() + timeout;
+ set_actor_timeout_at(actor_info, expires_at);
}
void Scheduler::set_actor_timeout_at(ActorInfo *actor_info, double timeout_at) {
HeapNode *heap_node = actor_info->get_heap_node();
- VLOG(actor) << "set actor " << *actor_info << " " << tag("timeout", timeout_at) << timeout_at - Time::now_cached();
+ VLOG(actor) << "Set actor " << *actor_info << " timeout in " << timeout_at - Time::now_cached();
if (heap_node->in_heap()) {
timeout_queue_.fix(timeout_at, heap_node);
} else {
@@ -417,21 +478,20 @@ void Scheduler::set_actor_timeout_at(ActorInfo *actor_info, double timeout_at) {
}
}
-void Scheduler::run_poll(double timeout) {
- // LOG(DEBUG) << "run poll [timeout:" << format::as_time(timeout) << "]";
+void Scheduler::run_poll(Timestamp timeout) {
// we can't wait for less than 1ms
- poll_.run(static_cast<int32>(timeout * 1000 + 1));
-
-#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
- if (can_read(event_fd_.get_fd())) {
- std::atomic_thread_fence(std::memory_order_acquire);
- event_fd_.acquire();
- }
+ auto timeout_ms = static_cast<int>(clamp(timeout.in(), 0.0, 1000000.0) * 1000 + 1);
+#if TD_PORT_WINDOWS
+ CHECK(inbound_queue_);
+ inbound_queue_->reader_get_event_fd().wait(timeout_ms);
+ service_actor_.notify();
+#elif TD_PORT_POSIX
+ poll_.run(timeout_ms);
#endif
}
void Scheduler::run_mailbox() {
- VLOG(actor) << "run mailbox : begin";
+ VLOG(actor) << "Run mailbox : begin";
ListNode actors_list = std::move(ready_actors_list_);
while (!actors_list.empty()) {
ListNode *node = actors_list.get();
@@ -440,7 +500,7 @@ void Scheduler::run_mailbox() {
inc_wait_generation();
flush_mailbox(actor_info, static_cast<void (*)(ActorInfo *)>(nullptr), static_cast<Event (*)()>(nullptr));
}
- VLOG(actor) << "run mailbox : finish " << actor_count_;
+ VLOG(actor) << "Run mailbox : finish " << actor_count_;
//Useful for debug, but O(ActorsCount) check
@@ -457,40 +517,54 @@ void Scheduler::run_mailbox() {
//LOG(ERROR) << *actor_info;
//cnt++;
//}
- //CHECK(cnt == actor_count_) << cnt << " vs " << actor_count_;
+ //LOG_CHECK(cnt == actor_count_) << cnt << " vs " << actor_count_;
}
-double Scheduler::run_timeout() {
+Timestamp Scheduler::run_timeout() {
double now = Time::now();
+ //TODO: use Timestamp().is_in_past()
while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) {
HeapNode *node = timeout_queue_.pop();
ActorInfo *actor_info = ActorInfo::from_heap_node(node);
inc_wait_generation();
- send(actor_info->actor_id(), Event::timeout(), Send::immediate);
- }
- if (timeout_queue_.empty()) {
- return 10000;
+ send<ActorSendType::Immediate>(actor_info->actor_id(), Event::timeout());
}
- double timeout = timeout_queue_.top_key() - now;
- // LOG(DEBUG) << "Timeout [cnt:" << timeout_queue_.size() << "] in " << format::as_time(timeout);
- return timeout;
+ return get_timeout();
}
-void Scheduler::run_no_guard(double timeout) {
+Timestamp Scheduler::run_events(Timestamp timeout) {
+ Timestamp res;
+ VLOG(actor) << "Run events " << sched_id_ << " " << tag("pending", pending_events_.size())
+ << tag("actors", actor_count_);
+ do {
+ run_mailbox();
+ res = run_timeout();
+ } while (!ready_actors_list_.empty() && !timeout.is_in_past());
+ return res;
+}
+
+void Scheduler::run_no_guard(Timestamp timeout) {
CHECK(has_guard_);
SCOPE_EXIT {
yield_flag_ = false;
};
- double next_timeout = run_events();
- if (next_timeout < timeout) {
- timeout = next_timeout;
- }
+ timeout.relax(run_events(timeout));
if (yield_flag_) {
return;
}
run_poll(timeout);
- run_events();
+ run_events(timeout);
+}
+
+Timestamp Scheduler::get_timeout() {
+ if (!ready_actors_list_.empty()) {
+ return Timestamp::in(0);
+ }
+ if (timeout_queue_.empty()) {
+ return Timestamp::in(10000);
+ }
+ return Timestamp::at(timeout_queue_.top_key());
}
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h
index 7edf3f1d2d..d4d075785f 100644
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h
+++ b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl/Scheduler.h
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -9,26 +9,22 @@
#include "td/actor/impl/ActorInfo-decl.h"
#include "td/actor/impl/Scheduler-decl.h"
-#include "td/utils/format.h"
+#include "td/utils/common.h"
#include "td/utils/Heap.h"
#include "td/utils/logging.h"
-#include "td/utils/MpscPollableQueue.h"
#include "td/utils/ObjectPool.h"
-#include "td/utils/port/Fd.h"
+#include "td/utils/port/detail/PollableFd.h"
+#include "td/utils/port/PollFlags.h"
+#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
+#include "td/utils/Time.h"
#include <atomic>
-#include <memory>
#include <tuple>
#include <utility>
namespace td {
-/*** ServiceActor ***/
-inline void Scheduler::ServiceActor::set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues) {
- inbound_ = std::move(queues);
-}
-
/*** EventGuard ***/
class EventGuard {
public:
@@ -58,6 +54,9 @@ class EventGuard {
inline SchedulerGuard Scheduler::get_guard() {
return SchedulerGuard(this);
}
+inline SchedulerGuard Scheduler::get_const_guard() {
+ return SchedulerGuard(this, false);
+}
inline void Scheduler::init() {
init(0, {}, nullptr);
@@ -71,12 +70,12 @@ inline int32 Scheduler::sched_count() const {
}
template <class ActorT, class... Args>
-ActorOwn<ActorT> Scheduler::create_actor(Slice name, Args &&... args) {
+ActorOwn<ActorT> Scheduler::create_actor(Slice name, Args &&...args) {
return register_actor_impl(name, new ActorT(std::forward<Args>(args)...), Actor::Deleter::Destroy, sched_id_);
}
template <class ActorT, class... Args>
-ActorOwn<ActorT> Scheduler::create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args) {
+ActorOwn<ActorT> Scheduler::create_actor_on_scheduler(Slice name, int32 sched_id, Args &&...args) {
return register_actor_impl(name, new ActorT(std::forward<Args>(args)...), Actor::Deleter::Destroy, sched_id);
}
@@ -92,29 +91,31 @@ ActorOwn<ActorT> Scheduler::register_actor(Slice name, unique_ptr<ActorT> actor_
template <class ActorT>
ActorOwn<ActorT> Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id) {
+ CHECK(has_guard_);
if (sched_id == -1) {
sched_id = sched_id_;
}
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
sched_id = 0;
#endif
- CHECK(sched_id == sched_id_ || (0 <= sched_id && sched_id < static_cast<int32>(outbound_queues_.size()))) << sched_id;
+ LOG_CHECK(sched_id == sched_id_ || (0 <= sched_id && sched_id < static_cast<int32>(outbound_queues_.size())))
+ << sched_id;
auto info = actor_info_pool_->create_empty();
- VLOG(actor) << "Create actor: " << tag("name", name) << tag("ptr", *info) << tag("context", context())
- << tag("this", this) << tag("actor_count", actor_count_);
actor_count_++;
auto weak_info = info.get_weak();
auto actor_info = info.get();
- info->init(sched_id_, name, std::move(info), static_cast<Actor *>(actor_ptr), deleter, ActorTraits<ActorT>::is_lite);
+ actor_info->init(sched_id_, name, std::move(info), static_cast<Actor *>(actor_ptr), deleter,
+ ActorTraits<ActorT>::need_context, ActorTraits<ActorT>::need_start_up);
+ VLOG(actor) << "Create actor " << *actor_info << " (actor_count = " << actor_count_ << ')';
ActorId<ActorT> actor_id = weak_info->actor_id(actor_ptr);
if (sched_id != sched_id_) {
- send(actor_id, Event::start(), Send::later_weak);
+ send<ActorSendType::LaterWeak>(actor_id, Event::start());
do_migrate_actor(actor_info, sched_id);
} else {
pending_actors_list_.put(weak_info->get_list_node());
- if (!ActorTraits<ActorT>::is_lite) {
- send(actor_id, Event::start(), Send::later_weak);
+ if (ActorTraits<ActorT>::need_start_up) {
+ send<ActorSendType::LaterWeak>(actor_id, Event::start());
}
}
@@ -122,7 +123,7 @@ ActorOwn<ActorT> Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, A
}
template <class ActorT>
-ActorOwn<ActorT> Scheduler::register_existing_actor(std::unique_ptr<ActorT> actor_ptr) {
+ActorOwn<ActorT> Scheduler::register_existing_actor(unique_ptr<ActorT> actor_ptr) {
CHECK(!actor_ptr->empty());
auto actor_info = actor_ptr->get_info();
CHECK(actor_info->migrate_dest_flag_atomic().first == sched_id_);
@@ -130,10 +131,9 @@ ActorOwn<ActorT> Scheduler::register_existing_actor(std::unique_ptr<ActorT> acto
}
inline void Scheduler::destroy_actor(ActorInfo *actor_info) {
- VLOG(actor) << "Destroy actor: " << tag("name", *actor_info) << tag("ptr", actor_info)
- << tag("actor_count", actor_count_);
+ VLOG(actor) << "Destroy actor " << *actor_info << " (actor_count = " << actor_count_ << ')';
- CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_;
+ LOG_CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_;
cancel_actor_timeout(actor_info);
actor_info->get_list_node()->remove();
// called by ObjectPool
@@ -142,11 +142,6 @@ inline void Scheduler::destroy_actor(ActorInfo *actor_info) {
CHECK(actor_count_ >= 0);
}
-inline void Scheduler::do_custom_event(ActorInfo *actor_info, CustomEvent &event) {
- VLOG(actor) << *actor_info << " Event::Custom";
- event.run(actor_info->get_actor_unsafe());
-}
-
template <class RunFuncT, class EventFuncT>
void Scheduler::flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func) {
auto &mailbox = actor_info->mailbox_;
@@ -161,13 +156,13 @@ void Scheduler::flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, c
if (guard.can_run()) {
(*run_func)(actor_info);
} else {
- mailbox.insert(begin(mailbox) + i, (*event_func)());
+ mailbox.insert(mailbox.begin() + i, (*event_func)());
}
}
- mailbox.erase(begin(mailbox), begin(mailbox) + i);
+ mailbox.erase(mailbox.begin(), mailbox.begin() + i);
}
-inline void Scheduler::send_to_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event) {
+inline void Scheduler::send_to_scheduler(int32 sched_id, const ActorId<Actor> &actor_id, Event &&event) {
if (sched_id == sched_id_) {
ActorInfo *actor_info = actor_id.get_actor_info();
pending_events_[actor_info].push_back(std::move(event));
@@ -176,21 +171,34 @@ inline void Scheduler::send_to_scheduler(int32 sched_id, const ActorId<> &actor_
}
}
+template <class T>
+void Scheduler::destroy_on_scheduler(int32 sched_id, T &value) {
+ if (!value.empty()) {
+ destroy_on_scheduler_impl(sched_id, PromiseCreator::lambda([value = std::move(value)](Unit) {
+ // destroy value
+ }));
+ }
+}
+
+template <class... ArgsT>
+void Scheduler::destroy_on_scheduler(int32 sched_id, ArgsT &...values) {
+ destroy_on_scheduler_impl(sched_id, PromiseCreator::lambda([values = std::make_tuple(std::move(values)...)](Unit) {
+ // destroy values
+ }));
+}
+
inline void Scheduler::before_tail_send(const ActorId<> &actor_id) {
// TODO
}
inline void Scheduler::inc_wait_generation() {
- wait_generation_++;
+ wait_generation_ += 2;
}
-template <class RunFuncT, class EventFuncT>
-void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const RunFuncT &run_func,
- const EventFuncT &event_func) {
- CHECK(has_guard_);
+template <ActorSendType send_type, class RunFuncT, class EventFuncT>
+void Scheduler::send_impl(const ActorId<> &actor_id, const RunFuncT &run_func, const EventFuncT &event_func) {
ActorInfo *actor_info = actor_id.get_actor_info();
if (unlikely(actor_info == nullptr || close_flag_)) {
- // LOG(ERROR) << "Invalid actor id";
return;
}
@@ -199,8 +207,9 @@ void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const Ru
bool is_migrating;
std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic();
bool on_current_sched = !is_migrating && sched_id_ == actor_sched_id;
+ CHECK(has_guard_ || !on_current_sched);
- if (likely(!(flags & Send::later) && !(flags & Send::later_weak) && on_current_sched && !actor_info->is_running() &&
+ if (likely(send_type == ActorSendType::Immediate && on_current_sched && !actor_info->is_running() &&
!actor_info->must_wait(wait_generation_))) { // run immediately
if (likely(actor_info->mailbox_.empty())) {
EventGuard guard(this, actor_info);
@@ -211,7 +220,7 @@ void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const Ru
} else {
if (on_current_sched) {
add_to_mailbox(actor_info, event_func());
- if (flags & Send::later) {
+ if (send_type == ActorSendType::Later) {
actor_info->set_wait_generation(wait_generation_);
}
} else {
@@ -220,57 +229,61 @@ void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const Ru
}
}
-template <class EventT>
-void Scheduler::send_lambda(ActorRef actor_ref, EventT &&lambda, Send::Flags flags) {
- return send_impl(actor_ref.get(), flags,
- [&](ActorInfo *actor_info) {
- event_context_ptr_->link_token = actor_ref.token();
- lambda();
- },
- [&]() {
- auto event = Event::lambda(std::forward<EventT>(lambda));
- event.set_link_token(actor_ref.token());
- return std::move(event);
- });
-}
-
-template <class EventT>
-void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure, Send::Flags flags) {
- return send_impl(actor_ref.get(), flags,
- [&](ActorInfo *actor_info) {
- event_context_ptr_->link_token = actor_ref.token();
- closure.run(static_cast<typename EventT::ActorType *>(actor_info->get_actor_unsafe()));
- },
- [&]() {
- auto event = Event::immediate_closure(std::forward<EventT>(closure));
- event.set_link_token(actor_ref.token());
- return std::move(event);
- });
-}
-
-inline void Scheduler::send(ActorRef actor_ref, Event &&event, Send::Flags flags) {
+template <ActorSendType send_type, class EventT>
+void Scheduler::send_lambda(ActorRef actor_ref, EventT &&lambda) {
+ return send_impl<send_type>(
+ actor_ref.get(),
+ [&](ActorInfo *actor_info) {
+ event_context_ptr_->link_token = actor_ref.token();
+ lambda();
+ },
+ [&] {
+ auto event = Event::lambda(std::forward<EventT>(lambda));
+ event.set_link_token(actor_ref.token());
+ return event;
+ });
+}
+
+template <ActorSendType send_type, class EventT>
+void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure) {
+ return send_impl<send_type>(
+ actor_ref.get(),
+ [&](ActorInfo *actor_info) {
+ event_context_ptr_->link_token = actor_ref.token();
+ closure.run(static_cast<typename EventT::ActorType *>(actor_info->get_actor_unsafe()));
+ },
+ [&] {
+ auto event = Event::immediate_closure(std::forward<EventT>(closure));
+ event.set_link_token(actor_ref.token());
+ return event;
+ });
+}
+
+template <ActorSendType send_type>
+void Scheduler::send(ActorRef actor_ref, Event &&event) {
event.set_link_token(actor_ref.token());
- return send_impl(actor_ref.get(), flags, [&](ActorInfo *actor_info) { do_event(actor_info, std::move(event)); },
- [&]() { return std::move(event); });
+ return send_impl<send_type>(
+ actor_ref.get(), [&](ActorInfo *actor_info) { do_event(actor_info, std::move(event)); },
+ [&] { return std::move(event); });
}
-inline void Scheduler::subscribe(const Fd &fd, Fd::Flags flags) {
- poll_.subscribe(fd, flags);
+inline void Scheduler::subscribe(PollableFd fd, PollFlags flags) {
+ instance()->poll_.subscribe(std::move(fd), flags);
}
-inline void Scheduler::unsubscribe(const Fd &fd) {
- poll_.unsubscribe(fd);
+inline void Scheduler::unsubscribe(PollableFdRef fd) {
+ instance()->poll_.unsubscribe(std::move(fd));
}
-inline void Scheduler::unsubscribe_before_close(const Fd &fd) {
- poll_.unsubscribe_before_close(fd);
+inline void Scheduler::unsubscribe_before_close(PollableFdRef fd) {
+ instance()->poll_.unsubscribe_before_close(std::move(fd));
}
inline void Scheduler::yield_actor(Actor *actor) {
yield_actor(actor->get_info());
}
inline void Scheduler::yield_actor(ActorInfo *actor_info) {
- send(actor_info->actor_id(), Event::yield(), Send::later_weak);
+ send<ActorSendType::LaterWeak>(actor_info->actor_id(), Event::yield());
}
inline void Scheduler::stop_actor(Actor *actor) {
@@ -285,7 +298,7 @@ inline uint64 Scheduler::get_link_token(Actor *actor) {
return get_link_token(actor->get_info());
}
inline uint64 Scheduler::get_link_token(ActorInfo *actor_info) {
- CHECK(event_context_ptr_->actor_info == actor_info);
+ LOG_CHECK(event_context_ptr_->actor_info == actor_info) << actor_info->get_name();
return event_context_ptr_->link_token;
}
@@ -293,8 +306,8 @@ inline void Scheduler::finish_migrate_actor(Actor *actor) {
register_migrated_actor(actor->get_info());
}
-inline bool Scheduler::has_actor_timeout(const Actor *actor) const {
- return has_actor_timeout(actor->get_info());
+inline double Scheduler::get_actor_timeout(const Actor *actor) const {
+ return get_actor_timeout(actor->get_info());
}
inline void Scheduler::set_actor_timeout_in(Actor *actor, double timeout) {
set_actor_timeout_in(actor->get_info(), timeout);
@@ -306,11 +319,6 @@ inline void Scheduler::cancel_actor_timeout(Actor *actor) {
cancel_actor_timeout(actor->get_info());
}
-inline bool Scheduler::has_actor_timeout(const ActorInfo *actor_info) const {
- const HeapNode *heap_node = actor_info->get_heap_node();
- return heap_node->in_heap();
-}
-
inline void Scheduler::cancel_actor_timeout(ActorInfo *actor_info) {
HeapNode *heap_node = actor_info->get_heap_node();
if (heap_node->in_heap()) {
@@ -332,46 +340,23 @@ inline void Scheduler::yield() {
inline void Scheduler::wakeup() {
std::atomic_thread_fence(std::memory_order_release);
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
- event_fd_.release();
+ inbound_queue_->writer_put({});
#endif
}
-inline double Scheduler::run_events() {
- double res;
- VLOG(actor) << "run events " << sched_id_ << " " << tag("pending", pending_events_.size())
- << tag("actors", actor_count_);
- do {
- run_mailbox();
- res = run_timeout();
- } while (!ready_actors_list_.empty());
- return res;
-}
-
-inline void Scheduler::run(double timeout) {
+inline void Scheduler::run(Timestamp timeout) {
auto guard = get_guard();
run_no_guard(timeout);
}
/*** Interface to current scheduler ***/
-inline void subscribe(const Fd &fd, Fd::Flags flags) {
- Scheduler::instance()->subscribe(fd, flags);
-}
-
-inline void unsubscribe(const Fd &fd) {
- Scheduler::instance()->unsubscribe(fd);
-}
-
-inline void unsubscribe_before_close(const Fd &fd) {
- Scheduler::instance()->unsubscribe_before_close(fd);
-}
-
template <class ActorT, class... Args>
-ActorOwn<ActorT> create_actor(Slice name, Args &&... args) {
+ActorOwn<ActorT> create_actor(Slice name, Args &&...args) {
return Scheduler::instance()->create_actor<ActorT>(name, std::forward<Args>(args)...);
}
template <class ActorT, class... Args>
-ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args) {
+ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&...args) {
return Scheduler::instance()->create_actor_on_scheduler<ActorT>(name, sched_id, std::forward<Args>(args)...);
}
@@ -390,8 +375,4 @@ ActorOwn<ActorT> register_existing_actor(unique_ptr<ActorT> actor_ptr) {
return Scheduler::instance()->register_existing_actor(std::move(actor_ptr));
}
-inline void yield_scheduler() {
- Scheduler::instance()->yield();
-}
-
} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h
deleted file mode 100644
index 2cb5cb2127..0000000000
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h
+++ /dev/null
@@ -1,117 +0,0 @@
-//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
-//
-// Distributed under the Boost Software License, Version 1.0. (See accompanying
-// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-//
-#pragma once
-
-#include "td/actor/impl2/ActorSignals.h"
-#include "td/actor/impl2/ActorState.h"
-
-#include "td/utils/logging.h"
-
-#include <atomic>
-
-namespace td {
-namespace actor2 {
-class ActorLocker {
- public:
- struct Options {
- Options() {
- }
- bool can_execute_paused = false;
- bool is_shared = true;
- Options &with_can_execute_paused(bool new_can_execute_paused) {
- can_execute_paused = new_can_execute_paused;
- return *this;
- }
- Options &with_is_shared(bool new_is_shared) {
- is_shared = new_is_shared;
- return *this;
- }
- };
- explicit ActorLocker(ActorState *state, Options options = {})
- : state_(state), flags_(state->get_flags_unsafe()), new_flags_{}, options_{options} {
- }
- bool try_lock() {
- CHECK(!own_lock());
- while (!can_try_add_signals()) {
- new_flags_ = flags_;
- new_flags_.set_locked(true);
- new_flags_.clear_signals();
- if (state_->state_.compare_exchange_strong(flags_.raw_ref(), new_flags_.raw(), std::memory_order_acq_rel)) {
- own_lock_ = true;
- return true;
- }
- }
- return false;
- }
- bool try_unlock(ActorState::Flags flags) {
- CHECK(!flags.is_locked());
- CHECK(own_lock());
- // can't unlock with signals set
- //CHECK(!flags.has_signals());
-
- flags_ = flags;
- //try unlock
- if (state_->state_.compare_exchange_strong(new_flags_.raw_ref(), flags.raw(), std::memory_order_acq_rel)) {
- own_lock_ = false;
- return true;
- }
-
- // read all signals
- flags.set_locked(true);
- flags.clear_signals();
- do {
- flags_.add_signals(new_flags_.get_signals());
- } while (!state_->state_.compare_exchange_strong(new_flags_.raw_ref(), flags.raw(), std::memory_order_acq_rel));
- new_flags_ = flags;
- return false;
- }
-
- bool try_add_signals(ActorSignals signals) {
- CHECK(!own_lock());
- CHECK(can_try_add_signals());
- new_flags_ = flags_;
- new_flags_.add_signals(signals);
- return state_->state_.compare_exchange_strong(flags_.raw_ref(), new_flags_.raw(), std::memory_order_acq_rel);
- }
- bool add_signals(ActorSignals signals) {
- CHECK(!own_lock());
- while (true) {
- if (can_try_add_signals()) {
- if (try_add_signals(signals)) {
- return false;
- }
- } else {
- if (try_lock()) {
- flags_.add_signals(signals);
- return true;
- }
- }
- }
- }
- bool own_lock() const {
- return own_lock_;
- }
- ActorState::Flags flags() const {
- return flags_;
- }
- bool can_execute() const {
- return flags_.is_shared() == options_.is_shared && (options_.can_execute_paused || !flags_.is_pause());
- }
-
- private:
- ActorState *state_{nullptr};
- ActorState::Flags flags_;
- ActorState::Flags new_flags_;
- bool own_lock_{false};
- Options options_;
-
- bool can_try_add_signals() const {
- return flags_.is_locked() || (flags_.is_in_queue() && !can_execute());
- }
-};
-} // namespace actor2
-} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h
deleted file mode 100644
index b7a7483022..0000000000
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h
+++ /dev/null
@@ -1,84 +0,0 @@
-//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
-//
-// Distributed under the Boost Software License, Version 1.0. (See accompanying
-// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-//
-#pragma once
-
-#include "td/utils/common.h"
-
-namespace td {
-namespace actor2 {
-class ActorSignals {
- public:
- ActorSignals() = default;
- uint32 raw() const {
- return raw_;
- }
- bool empty() const {
- return raw_ == 0;
- }
- bool has_signal(uint32 signal) const {
- return (raw_ & (1u << signal)) != 0;
- }
- void add_signal(uint32 signal) {
- raw_ |= (1u << signal);
- }
- void add_signals(ActorSignals signals) {
- raw_ |= signals.raw();
- }
- void clear_signal(uint32 signal) {
- raw_ &= ~(1u << signal);
- }
- uint32 first_signal() {
- if (!raw_) {
- return 0;
- }
-#if TD_MSVC
- int res = 0;
- int bit = 1;
- while ((raw_ & bit) == 0) {
- res++;
- bit <<= 1;
- }
- return res;
-#else
- return __builtin_ctz(raw_);
-#endif
- }
- enum Signal : uint32 {
- // Signals in order of priority
- Wakeup = 1,
- Alarm = 2,
- Kill = 3, // immediate kill
- Io = 4, // move to io thread
- Cpu = 5, // move to cpu thread
- StartUp = 6,
- TearDown = 7,
- // Two signals for mpmc queue logic
- //
- // PopSignal is set after actor is popped from queue
- // When processed it should set InQueue and Pause flags to false.
- //
- // MessagesSignal is set after new messages was added to actor
- // If owner of actor wish to delay message handling, she should set InQueue flag to true and
- // add actor into mpmc queue.
- Pop = 8, // got popped from queue
- Message = 9, // got new message
- };
-
- static ActorSignals one(uint32 signal) {
- ActorSignals res;
- res.add_signal(signal);
- return res;
- }
-
- private:
- uint32 raw_{0};
- friend class ActorState;
- explicit ActorSignals(uint32 raw) : raw_(raw) {
- }
-};
-} // namespace actor2
-} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorState.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorState.h
deleted file mode 100644
index 02ead6bcf6..0000000000
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/ActorState.h
+++ /dev/null
@@ -1,166 +0,0 @@
-//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
-//
-// Distributed under the Boost Software License, Version 1.0. (See accompanying
-// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-//
-#pragma once
-
-#include "td/actor/impl2/ActorSignals.h"
-#include "td/actor/impl2/SchedulerId.h"
-
-#include "td/utils/common.h"
-
-#include <atomic>
-
-namespace td {
-namespace actor2 {
-class ActorState {
- public:
- class Flags {
- public:
- Flags() = default;
- uint32 raw() const {
- return raw_;
- }
- uint32 &raw_ref() {
- return raw_;
- }
- SchedulerId get_scheduler_id() const {
- return SchedulerId{static_cast<uint8>(raw_ & SchedulerMask)};
- }
- void set_scheduler_id(SchedulerId id) {
- raw_ = (raw_ & ~SchedulerMask) | id.value();
- }
-
- bool is_shared() const {
- return check_flag(SharedFlag);
- }
- void set_shared(bool shared) {
- set_flag(SharedFlag, shared);
- }
-
- bool is_locked() const {
- return check_flag(LockFlag);
- }
- void set_locked(bool locked) {
- set_flag(LockFlag, locked);
- }
-
- bool is_migrate() const {
- return check_flag(MigrateFlag);
- }
- void set_migrate(bool migrate) {
- set_flag(MigrateFlag, migrate);
- }
-
- bool is_pause() const {
- return check_flag(PauseFlag);
- }
- void set_pause(bool pause) {
- set_flag(PauseFlag, pause);
- }
-
- bool is_closed() const {
- return check_flag(ClosedFlag);
- }
- void set_closed(bool closed) {
- set_flag(ClosedFlag, closed);
- }
-
- bool is_in_queue() const {
- return check_flag(InQueueFlag);
- }
- void set_in_queue(bool in_queue) {
- set_flag(InQueueFlag, in_queue);
- }
-
- bool has_signals() const {
- return check_flag(SignalMask);
- }
- void clear_signals() {
- set_flag(SignalMask, false);
- }
- void set_signals(ActorSignals signals) {
- raw_ = (raw_ & ~SignalMask) | (signals.raw() << SignalOffset);
- }
- void add_signals(ActorSignals signals) {
- raw_ = raw_ | (signals.raw() << SignalOffset);
- }
- ActorSignals get_signals() const {
- return ActorSignals{(raw_ & SignalMask) >> SignalOffset};
- }
-
- private:
- uint32 raw_{0};
-
- friend class ActorState;
- Flags(uint32 raw) : raw_(raw) {
- }
-
- bool check_flag(uint32 mask) const {
- return (raw_ & mask) != 0;
- }
- void set_flag(uint32 mask, bool flag) {
- raw_ = (raw_ & ~mask) | (flag * mask);
- }
- };
-
- Flags get_flags_unsafe() {
- return Flags(state_.load(std::memory_order_relaxed));
- }
- void set_flags_unsafe(Flags flags) {
- state_.store(flags.raw(), std::memory_order_relaxed);
- }
-
- private:
- friend class ActorLocker;
- std::atomic<uint32> state_{0};
- enum : uint32 {
- SchedulerMask = 255,
-
- // Actors can be shared or not.
- // If actor is shared, than any thread may try to lock it
- // If actor is not shared, than it is owned by its scheduler, and only
- // its scheduler is allowed to access it
- // This flag may NOT change during the lifetime of an actor
- SharedFlag = 1 << 9,
-
- // Only shared actors need lock
- // Lock if somebody is going to unlock it eventually.
- // For example actor is locked, when some scheduler is executing its mailbox
- // Or it is locked when it is in Mpmc queue, so someone will pop it eventually.
- LockFlag = 1 << 10,
-
- // While actor is migrating from one scheduler to another no one is allowed to change it
- // Could not be set for shared actors.
- MigrateFlag = 1 << 11,
-
- // While set all messages are delayed
- // Dropped from flush_maibox
- // PauseFlag => InQueueFlag
- PauseFlag = 1 << 12,
-
- ClosedFlag = 1 << 13,
-
- InQueueFlag = 1 << 14,
-
- // Signals
- SignalOffset = 15,
- Signal = 1 << SignalOffset,
- WakeupSignalFlag = Signal << ActorSignals::Wakeup,
- AlarmSignalFlag = Signal << ActorSignals::Alarm,
- KillSignalFlag = Signal << ActorSignals::Kill, // immediate kill
- IoSignalFlag = Signal << ActorSignals::Io, // move to io thread
- CpuSignalFlag = Signal << ActorSignals::Cpu, // move to cpu thread
- StartUpSignalFlag = Signal << ActorSignals::StartUp,
- TearDownSignalFlag = Signal << ActorSignals::TearDown,
- MessageSignalFlag = Signal << ActorSignals::Message,
- PopSignalFlag = Signal << ActorSignals::Pop,
-
- SignalMask = WakeupSignalFlag | AlarmSignalFlag | KillSignalFlag | IoSignalFlag | CpuSignalFlag |
- StartUpSignalFlag | TearDownSignalFlag | MessageSignalFlag | PopSignalFlag
- };
-};
-} // namespace actor2
-} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp
deleted file mode 100644
index 720bf6bc4f..0000000000
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp
+++ /dev/null
@@ -1,11 +0,0 @@
-//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
-//
-// Distributed under the Boost Software License, Version 1.0. (See accompanying
-// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-//
-#include "td/actor/impl2/Scheduler.h"
-
-namespace td {
-namespace actor2 {}
-} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.h
deleted file mode 100644
index 9d5783b165..0000000000
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/Scheduler.h
+++ /dev/null
@@ -1,1508 +0,0 @@
-//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
-//
-// Distributed under the Boost Software License, Version 1.0. (See accompanying
-// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-//
-#pragma once
-
-#include "td/actor/impl2/ActorLocker.h"
-#include "td/actor/impl2/SchedulerId.h"
-
-#include "td/utils/Closure.h"
-#include "td/utils/common.h"
-#include "td/utils/format.h"
-#include "td/utils/Heap.h"
-#include "td/utils/List.h"
-#include "td/utils/logging.h"
-#include "td/utils/MpmcQueue.h"
-#include "td/utils/MpmcWaiter.h"
-#include "td/utils/MpscLinkQueue.h"
-#include "td/utils/MpscPollableQueue.h"
-#include "td/utils/port/Fd.h"
-#include "td/utils/port/Poll.h"
-#include "td/utils/port/thread.h"
-#include "td/utils/port/thread_local.h"
-#include "td/utils/ScopeGuard.h"
-#include "td/utils/SharedObjectPool.h"
-#include "td/utils/Slice.h"
-#include "td/utils/Time.h"
-#include "td/utils/type_traits.h"
-
-#include <atomic>
-#include <condition_variable>
-#include <limits>
-#include <memory>
-#include <mutex>
-#include <type_traits>
-#include <utility>
-
-namespace td {
-namespace actor2 {
-class Actor;
-
-template <class Impl>
-class Context {
- public:
- static Impl *get() {
- return context_;
- }
- class Guard {
- public:
- explicit Guard(Impl *new_context) {
- old_context_ = context_;
- context_ = new_context;
- }
- ~Guard() {
- context_ = old_context_;
- }
- Guard(const Guard &) = delete;
- Guard &operator=(const Guard &) = delete;
- Guard(Guard &&) = delete;
- Guard &operator=(Guard &&) = delete;
-
- private:
- Impl *old_context_;
- };
-
- private:
- static TD_THREAD_LOCAL Impl *context_;
-};
-
-template <class Impl>
-TD_THREAD_LOCAL Impl *Context<Impl>::context_;
-
-enum : uint64 { EmptyLinkToken = std::numeric_limits<uint64>::max() };
-
-class ActorExecuteContext : public Context<ActorExecuteContext> {
- public:
- explicit ActorExecuteContext(Actor *actor, Timestamp alarm_timestamp = Timestamp::never())
- : actor_(actor), alarm_timestamp_(alarm_timestamp) {
- }
- Actor &actor() const {
- CHECK(actor_);
- return *actor_;
- }
- bool has_flags() const {
- return flags_ != 0;
- }
- void set_stop() {
- flags_ |= 1 << Stop;
- }
- bool get_stop() const {
- return (flags_ & (1 << Stop)) != 0;
- }
- void set_pause() {
- flags_ |= 1 << Pause;
- }
- bool get_pause() const {
- return (flags_ & (1 << Pause)) != 0;
- }
- void clear_actor() {
- actor_ = nullptr;
- }
- void set_link_token(uint64 link_token) {
- link_token_ = link_token;
- }
- uint64 get_link_token() const {
- return link_token_;
- }
- Timestamp &alarm_timestamp() {
- flags_ |= 1 << Alarm;
- return alarm_timestamp_;
- }
- bool get_alarm_flag() const {
- return (flags_ & (1 << Alarm)) != 0;
- }
- Timestamp get_alarm_timestamp() const {
- return alarm_timestamp_;
- }
-
- private:
- Actor *actor_;
- uint32 flags_{0};
- uint64 link_token_{EmptyLinkToken};
- Timestamp alarm_timestamp_;
- enum { Stop, Pause, Alarm };
-};
-
-class ActorMessageImpl : private MpscLinkQueueImpl::Node {
- public:
- ActorMessageImpl() = default;
- ActorMessageImpl(const ActorMessageImpl &) = delete;
- ActorMessageImpl &operator=(const ActorMessageImpl &) = delete;
- ActorMessageImpl(ActorMessageImpl &&other) = delete;
- ActorMessageImpl &operator=(ActorMessageImpl &&other) = delete;
- virtual ~ActorMessageImpl() = default;
- virtual void run() = 0;
- //virtual void run_anonymous() = 0;
-
- // ActorMessage <--> MpscLintQueue::Node
- // Each actor's mailbox will be a queue
- static ActorMessageImpl *from_mpsc_link_queue_node(MpscLinkQueueImpl::Node *node) {
- return static_cast<ActorMessageImpl *>(node);
- }
- MpscLinkQueueImpl::Node *to_mpsc_link_queue_node() {
- return static_cast<MpscLinkQueueImpl::Node *>(this);
- }
-
- uint64 link_token_{EmptyLinkToken};
- bool is_big_{false};
-};
-
-class ActorMessage {
- public:
- ActorMessage() = default;
- explicit ActorMessage(std::unique_ptr<ActorMessageImpl> impl) : impl_(std::move(impl)) {
- }
- void run() {
- CHECK(impl_);
- impl_->run();
- }
- explicit operator bool() {
- return bool(impl_);
- }
- friend class ActorMailbox;
-
- void set_link_token(uint64 link_token) {
- impl_->link_token_ = link_token;
- }
- uint64 get_link_token() const {
- return impl_->link_token_;
- }
- bool is_big() const {
- return impl_->is_big_;
- }
- void set_big() {
- impl_->is_big_ = true;
- }
-
- private:
- std::unique_ptr<ActorMessageImpl> impl_;
-
- template <class T>
- friend class td::MpscLinkQueue;
-
- static ActorMessage from_mpsc_link_queue_node(MpscLinkQueueImpl::Node *node) {
- return ActorMessage(std::unique_ptr<ActorMessageImpl>(ActorMessageImpl::from_mpsc_link_queue_node(node)));
- }
- MpscLinkQueueImpl::Node *to_mpsc_link_queue_node() {
- return impl_.release()->to_mpsc_link_queue_node();
- }
-};
-
-class ActorMailbox {
- public:
- ActorMailbox() = default;
- ActorMailbox(const ActorMailbox &) = delete;
- ActorMailbox &operator=(const ActorMailbox &) = delete;
- ActorMailbox(ActorMailbox &&other) = delete;
- ActorMailbox &operator=(ActorMailbox &&other) = delete;
- ~ActorMailbox() {
- pop_all();
- while (reader_.read()) {
- // skip
- }
- }
- class Reader;
- void push(ActorMessage message) {
- queue_.push(std::move(message));
- }
- void push_unsafe(ActorMessage message) {
- queue_.push_unsafe(std::move(message));
- }
-
- td::MpscLinkQueue<ActorMessage>::Reader &reader() {
- return reader_;
- }
-
- void pop_all() {
- queue_.pop_all(reader_);
- }
- void pop_all_unsafe() {
- queue_.pop_all_unsafe(reader_);
- }
-
- private:
- td::MpscLinkQueue<ActorMessage> queue_;
- td::MpscLinkQueue<ActorMessage>::Reader reader_;
-};
-
-class ActorInfo
- : private HeapNode
- , private ListNode {
- public:
- ActorInfo(std::unique_ptr<Actor> actor, ActorState::Flags state_flags, Slice name)
- : actor_(std::move(actor)), name_(name.begin(), name.size()) {
- state_.set_flags_unsafe(state_flags);
- }
-
- bool has_actor() const {
- return bool(actor_);
- }
- Actor &actor() {
- CHECK(has_actor());
- return *actor_;
- }
- Actor *actor_ptr() const {
- return actor_.get();
- }
- void destroy_actor() {
- actor_.reset();
- }
- ActorState &state() {
- return state_;
- }
- ActorMailbox &mailbox() {
- return mailbox_;
- }
- CSlice get_name() const {
- return name_;
- }
-
- HeapNode *as_heap_node() {
- return this;
- }
- static ActorInfo *from_heap_node(HeapNode *node) {
- return static_cast<ActorInfo *>(node);
- }
-
- Timestamp &alarm_timestamp() {
- return alarm_timestamp_;
- }
-
- private:
- std::unique_ptr<Actor> actor_;
- ActorState state_;
- ActorMailbox mailbox_;
- std::string name_;
- Timestamp alarm_timestamp_;
-};
-
-using ActorInfoPtr = SharedObjectPool<ActorInfo>::Ptr;
-
-class Actor {
- public:
- Actor() = default;
- Actor(const Actor &) = delete;
- Actor &operator=(const Actor &) = delete;
- Actor(Actor &&other) = delete;
- Actor &operator=(Actor &&other) = delete;
- virtual ~Actor() = default;
-
- void set_actor_info_ptr(ActorInfoPtr actor_info_ptr) {
- actor_info_ptr_ = std::move(actor_info_ptr);
- }
- ActorInfoPtr get_actor_info_ptr() {
- return actor_info_ptr_;
- }
-
- protected:
- // Signal handlers
- virtual void start_up(); // StartUp signal handler
- virtual void tear_down(); // TearDown signal handler (or Kill)
- virtual void hang_up(); // HangUp signal handler
- virtual void wake_up(); // WakeUp signal handler
- virtual void alarm(); // Alarm signal handler
-
- friend class ActorMessageHangup;
-
- // Event handlers
- //virtual void hangup_shared();
- // TODO: raw event?
-
- virtual void loop(); // default handler
-
- // Useful functions
- void yield(); // send wakeup signal to itself
- void stop(); // send Kill signal to itself
- Timestamp &alarm_timestamp() {
- return ActorExecuteContext::get()->alarm_timestamp();
- }
- Timestamp get_alarm_timestamp() {
- return ActorExecuteContext::get()->get_alarm_timestamp();
- }
-
- CSlice get_name() {
- return actor_info_ptr_->get_name();
- }
-
- // Inteface to scheduler
- // Query will be just passed to current scheduler
- // Timeout functions
- //bool has_timeout() const;
- //void set_timeout_in(double timeout_in);
- //void set_timeout_at(double timeout_at);
- //void cancel_timeout();
- //uint64 get_link_token(); // get current request's link_token
- //set context that will be inherited by all childrens
- //void set_context(std::shared_ptr<ActorContext> context);
-
- //ActorShared<> actor_shared(); // ActorShared to itself
- //template <class SelfT>
- //ActorShared<SelfT> actor_shared(SelfT *self, uint64 id = static_cast<uint64>(-1)); // ActorShared with type
-
- // Create EventFull to itself
- //template <class FuncT, class... ArgsT>
- //auto self_closure(FuncT &&func, ArgsT &&... args);
- //template <class SelfT, class FuncT, class... ArgsT>
- //auto self_closure(SelfT *self, FuncT &&func, ArgsT &&... args);
- //template <class LambdaT>
- //auto self_lambda(LambdaT &&lambda);
-
- //void do_stop(); // process Kill signal immediately
-
- private:
- friend class ActorExecutor;
- ActorInfoPtr actor_info_ptr_;
-};
-// Signal handlers
-inline void Actor::start_up() {
- yield();
-}
-inline void Actor::tear_down() {
- // noop
-}
-inline void Actor::hang_up() {
- stop();
-}
-inline void Actor::wake_up() {
- loop();
-}
-inline void Actor::alarm() {
- loop();
-}
-
-inline void Actor::loop() {
- // noop
-}
-
-// Useful functions
-inline void Actor::yield() {
- // TODO
-}
-inline void Actor::stop() {
- ActorExecuteContext::get()->set_stop();
-}
-
-class ActorInfoCreator {
- public:
- class Options {
- public:
- Options() = default;
-
- Options &with_name(Slice new_name) {
- name = new_name;
- return *this;
- }
-
- Options &on_scheduler(SchedulerId new_scheduler_id) {
- scheduler_id = new_scheduler_id;
- return *this;
- }
- bool has_scheduler() const {
- return scheduler_id.is_valid();
- }
- Options &with_poll() {
- is_shared = false;
- return *this;
- }
-
- private:
- friend class ActorInfoCreator;
- Slice name;
- SchedulerId scheduler_id;
- bool is_shared{true};
- bool in_queue{true};
- //TODO: rename
- };
-
- //Create unlocked actor. One must send StartUp signal immediately.
- ActorInfoPtr create(std::unique_ptr<Actor> actor, const Options &args) {
- ActorState::Flags flags;
- flags.set_scheduler_id(args.scheduler_id);
- flags.set_shared(args.is_shared);
- flags.set_in_queue(args.in_queue);
- flags.set_signals(ActorSignals::one(ActorSignals::StartUp));
-
- auto actor_info_ptr = pool_.alloc(std::move(actor), flags, args.name);
- actor_info_ptr->actor().set_actor_info_ptr(actor_info_ptr);
- return actor_info_ptr;
- }
-
- ActorInfoCreator() = default;
- ActorInfoCreator(const ActorInfoCreator &) = delete;
- ActorInfoCreator &operator=(const ActorInfoCreator &) = delete;
- ActorInfoCreator(ActorInfoCreator &&other) = delete;
- ActorInfoCreator &operator=(ActorInfoCreator &&other) = delete;
- ~ActorInfoCreator() {
- pool_.for_each([](auto &actor_info) { actor_info.destroy_actor(); });
- }
-
- private:
- SharedObjectPool<ActorInfo> pool_;
-};
-
-using ActorOptions = ActorInfoCreator::Options;
-
-class SchedulerDispatcher {
- public:
- virtual SchedulerId get_scheduler_id() const = 0;
- virtual void add_to_queue(ActorInfoPtr actor_info_ptr, SchedulerId scheduler_id, bool need_poll) = 0;
- virtual void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) = 0;
-
- SchedulerDispatcher() = default;
- SchedulerDispatcher(const SchedulerDispatcher &) = delete;
- SchedulerDispatcher &operator=(const SchedulerDispatcher &) = delete;
- SchedulerDispatcher(SchedulerDispatcher &&other) = delete;
- SchedulerDispatcher &operator=(SchedulerDispatcher &&other) = delete;
- virtual ~SchedulerDispatcher() = default;
-};
-
-class ActorExecutor {
- public:
- struct Options {
- Options &with_from_queue() {
- from_queue = true;
- return *this;
- }
- Options &with_has_poll(bool new_has_poll) {
- this->has_poll = new_has_poll;
- return *this;
- }
- bool from_queue{false};
- bool has_poll{false};
- };
- ActorExecutor(ActorInfo &actor_info, SchedulerDispatcher &dispatcher, Options options)
- : actor_info_(actor_info), dispatcher_(dispatcher), options_(options) {
- //LOG(ERROR) << "START " << actor_info_.get_name() << " " << tag("from_queue", from_queue);
- start();
- }
- ActorExecutor(const ActorExecutor &) = delete;
- ActorExecutor &operator=(const ActorExecutor &) = delete;
- ActorExecutor(ActorExecutor &&other) = delete;
- ActorExecutor &operator=(ActorExecutor &&other) = delete;
- ~ActorExecutor() {
- //LOG(ERROR) << "FINISH " << actor_info_.get_name() << " " << tag("own_lock", actor_locker_.own_lock());
- finish();
- }
-
- // our best guess if actor is closed or not
- bool can_send() {
- return !flags().is_closed();
- }
-
- bool can_send_immediate() {
- return actor_locker_.own_lock() && !actor_execute_context_.has_flags() && actor_locker_.can_execute();
- }
-
- template <class F>
- void send_immediate(F &&f, uint64 link_token) {
- CHECK(can_send_immediate());
- if (!can_send()) {
- return;
- }
- actor_execute_context_.set_link_token(link_token);
- f();
- }
- void send_immediate(ActorMessage message) {
- CHECK(can_send_immediate());
- if (message.is_big()) {
- actor_info_.mailbox().reader().delay(std::move(message));
- pending_signals_.add_signal(ActorSignals::Message);
- actor_execute_context_.set_pause();
- return;
- }
- actor_execute_context_.set_link_token(message.get_link_token());
- message.run();
- }
- void send_immediate(ActorSignals signals) {
- CHECK(can_send_immediate());
- while (flush_one_signal(signals) && can_send_immediate()) {
- }
- pending_signals_.add_signals(signals);
- }
-
- void send(ActorMessage message) {
- if (!can_send()) {
- return;
- }
- if (can_send_immediate()) {
- return send_immediate(std::move(message));
- }
- actor_info_.mailbox().push(std::move(message));
- pending_signals_.add_signal(ActorSignals::Message);
- }
-
- void send(ActorSignals signals) {
- if (!can_send()) {
- return;
- }
-
- pending_signals_.add_signals(signals);
- }
-
- private:
- ActorInfo &actor_info_;
- SchedulerDispatcher &dispatcher_;
- Options options_;
- ActorLocker actor_locker_{
- &actor_info_.state(),
- ActorLocker::Options().with_can_execute_paused(options_.from_queue).with_is_shared(!options_.has_poll)};
-
- ActorExecuteContext actor_execute_context_{actor_info_.actor_ptr(), actor_info_.alarm_timestamp()};
- ActorExecuteContext::Guard guard{&actor_execute_context_};
-
- ActorState::Flags flags_;
- ActorSignals pending_signals_;
-
- ActorState::Flags &flags() {
- return flags_;
- }
-
- void start() {
- if (!can_send()) {
- return;
- }
-
- ActorSignals signals;
- SCOPE_EXIT {
- pending_signals_.add_signals(signals);
- };
-
- if (options_.from_queue) {
- signals.add_signal(ActorSignals::Pop);
- }
-
- actor_locker_.try_lock();
- flags_ = actor_locker_.flags();
-
- if (!actor_locker_.own_lock()) {
- return;
- }
-
- if (options_.from_queue) {
- flags().set_pause(false);
- }
- if (!actor_locker_.can_execute()) {
- CHECK(!options_.from_queue);
- return;
- }
-
- signals.add_signals(flags().get_signals());
- actor_info_.mailbox().pop_all();
-
- while (!actor_execute_context_.has_flags() && flush_one(signals)) {
- }
- }
-
- void finish() {
- if (!actor_locker_.own_lock()) {
- if (!pending_signals_.empty() && actor_locker_.add_signals(pending_signals_)) {
- flags_ = actor_locker_.flags();
- } else {
- return;
- }
- }
- CHECK(actor_locker_.own_lock());
-
- if (actor_execute_context_.has_flags()) {
- if (actor_execute_context_.get_stop()) {
- if (actor_info_.alarm_timestamp()) {
- dispatcher_.set_alarm_timestamp(actor_info_.actor().get_actor_info_ptr(), Timestamp::never());
- }
- flags_.set_closed(true);
- actor_info_.actor().tear_down();
- actor_info_.destroy_actor();
- return;
- }
- if (actor_execute_context_.get_pause()) {
- flags_.set_pause(true);
- }
- if (actor_execute_context_.get_alarm_flag()) {
- auto old_timestamp = actor_info_.alarm_timestamp();
- auto new_timestamp = actor_execute_context_.get_alarm_timestamp();
- if (!(old_timestamp == new_timestamp)) {
- actor_info_.alarm_timestamp() = new_timestamp;
- dispatcher_.set_alarm_timestamp(actor_info_.actor().get_actor_info_ptr(), new_timestamp);
- }
- }
- }
- flags_.set_signals(pending_signals_);
-
- bool add_to_queue = false;
- while (true) {
- // Drop InQueue flag if has pop signal
- // Can't delay this signal
- auto signals = flags().get_signals();
- if (signals.has_signal(ActorSignals::Pop)) {
- signals.clear_signal(ActorSignals::Pop);
- flags().set_signals(signals);
- flags().set_in_queue(false);
- }
-
- if (flags().has_signals() && !flags().is_in_queue()) {
- add_to_queue = true;
- flags().set_in_queue(true);
- }
- if (actor_locker_.try_unlock(flags())) {
- if (add_to_queue) {
- dispatcher_.add_to_queue(actor_info_.actor().get_actor_info_ptr(), flags().get_scheduler_id(),
- !flags().is_shared());
- }
- break;
- }
- flags_ = actor_locker_.flags();
- }
- }
-
- bool flush_one(ActorSignals &signals) {
- return flush_one_signal(signals) || flush_one_message();
- }
-
- bool flush_one_signal(ActorSignals &signals) {
- auto signal = signals.first_signal();
- if (!signal) {
- return false;
- }
- switch (signal) {
- case ActorSignals::Wakeup:
- actor_info_.actor().wake_up();
- break;
- case ActorSignals::Alarm:
- if (actor_execute_context_.get_alarm_timestamp().is_in_past()) {
- actor_execute_context_.alarm_timestamp() = Timestamp::never();
- actor_info_.actor().alarm();
- }
- break;
- case ActorSignals::Kill:
- actor_execute_context_.set_stop();
- break;
- case ActorSignals::StartUp:
- actor_info_.actor().start_up();
- break;
- case ActorSignals::TearDown:
- actor_info_.actor().tear_down();
- break;
- case ActorSignals::Pop:
- flags().set_in_queue(false);
- break;
-
- case ActorSignals::Message:
- break;
- case ActorSignals::Io:
- case ActorSignals::Cpu:
- LOG(FATAL) << "TODO";
- default:
- UNREACHABLE();
- }
- signals.clear_signal(signal);
- return true;
- }
-
- bool flush_one_message() {
- auto message = actor_info_.mailbox().reader().read();
- if (!message) {
- return false;
- }
- if (message.is_big() && !options_.from_queue) {
- actor_info_.mailbox().reader().delay(std::move(message));
- pending_signals_.add_signal(ActorSignals::Message);
- actor_execute_context_.set_pause();
- return false;
- }
-
- actor_execute_context_.set_link_token(message.get_link_token());
- message.run();
- return true;
- }
-};
-
-using SchedulerMessage = ActorInfoPtr;
-
-struct WorkerInfo {
- enum class Type { Io, Cpu } type{Type::Io};
- WorkerInfo() = default;
- explicit WorkerInfo(Type type) : type(type) {
- }
- ActorInfoCreator actor_info_creator;
-};
-
-struct SchedulerInfo {
- SchedulerId id;
- // will be read by all workers is any thread
- std::unique_ptr<MpmcQueue<SchedulerMessage>> cpu_queue;
- std::unique_ptr<MpmcWaiter> cpu_queue_waiter;
- // only scheduler itself may read from io_queue_
- std::unique_ptr<MpscPollableQueue<SchedulerMessage>> io_queue;
- size_t cpu_threads_count{0};
-
- std::unique_ptr<WorkerInfo> io_worker;
- std::vector<std::unique_ptr<WorkerInfo>> cpu_workers;
-};
-
-struct SchedulerGroupInfo {
- explicit SchedulerGroupInfo(size_t n) : schedulers(n) {
- }
- std::atomic<bool> is_stop_requested{false};
-
- int active_scheduler_count{0};
- std::mutex active_scheduler_count_mutex;
- std::condition_variable active_scheduler_count_condition_variable;
-
- std::vector<SchedulerInfo> schedulers;
-};
-
-class SchedulerContext
- : public Context<SchedulerContext>
- , public SchedulerDispatcher {
- public:
- // DispatcherInterface
- SchedulerDispatcher &dispatcher() {
- return *this;
- }
-
- // ActorCreator Interface
- virtual ActorInfoCreator &get_actor_info_creator() = 0;
-
- // Poll interface
- virtual bool has_poll() = 0;
- virtual Poll &get_poll() = 0;
-
- // Timeout interface
- virtual bool has_heap() = 0;
- virtual KHeap<double> &get_heap() = 0;
-
- // Stop all schedulers
- virtual bool is_stop_requested() = 0;
- virtual void stop() = 0;
-};
-
-#if !TD_THREAD_UNSUPPORTED
-class Scheduler {
- public:
- Scheduler(std::shared_ptr<SchedulerGroupInfo> scheduler_group_info, SchedulerId id, size_t cpu_threads_count)
- : scheduler_group_info_(std::move(scheduler_group_info)), cpu_threads_(cpu_threads_count) {
- scheduler_group_info_->active_scheduler_count++;
- info_ = &scheduler_group_info_->schedulers.at(id.value());
- info_->id = id;
- if (cpu_threads_count != 0) {
- info_->cpu_threads_count = cpu_threads_count;
- info_->cpu_queue = std::make_unique<MpmcQueue<SchedulerMessage>>(1024, max_thread_count());
- info_->cpu_queue_waiter = std::make_unique<MpmcWaiter>();
- }
- info_->io_queue = std::make_unique<MpscPollableQueue<SchedulerMessage>>();
- info_->io_queue->init();
-
- info_->cpu_workers.resize(cpu_threads_count);
- for (auto &worker : info_->cpu_workers) {
- worker = std::make_unique<WorkerInfo>(WorkerInfo::Type::Cpu);
- }
- info_->io_worker = std::make_unique<WorkerInfo>(WorkerInfo::Type::Io);
-
- poll_.init();
- io_worker_ = std::make_unique<IoWorker>(*info_->io_queue);
- }
-
- Scheduler(const Scheduler &) = delete;
- Scheduler &operator=(const Scheduler &) = delete;
- Scheduler(Scheduler &&other) = delete;
- Scheduler &operator=(Scheduler &&other) = delete;
- ~Scheduler() {
- // should stop
- stop();
- do_stop();
- }
-
- void start() {
- for (size_t i = 0; i < cpu_threads_.size(); i++) {
- cpu_threads_[i] = td::thread([this, i] {
- this->run_in_context_impl(*this->info_->cpu_workers[i],
- [this] { CpuWorker(*info_->cpu_queue, *info_->cpu_queue_waiter).run(); });
- });
- }
- this->run_in_context([this] { this->io_worker_->start_up(); });
- }
-
- template <class F>
- void run_in_context(F &&f) {
- run_in_context_impl(*info_->io_worker, std::forward<F>(f));
- }
-
- bool run(double timeout) {
- bool res;
- run_in_context_impl(*info_->io_worker, [this, timeout, &res] {
- if (SchedulerContext::get()->is_stop_requested()) {
- res = false;
- } else {
- res = io_worker_->run_once(timeout);
- }
- if (!res) {
- io_worker_->tear_down();
- }
- });
- if (!res) {
- do_stop();
- }
- return res;
- }
-
- // Just syntactic sugar
- void stop() {
- run_in_context([] { SchedulerContext::get()->stop(); });
- }
-
- SchedulerId get_scheduler_id() const {
- return info_->id;
- }
-
- private:
- std::shared_ptr<SchedulerGroupInfo> scheduler_group_info_;
- SchedulerInfo *info_;
- std::vector<td::thread> cpu_threads_;
- bool is_stopped_{false};
- Poll poll_;
- KHeap<double> heap_;
- class IoWorker;
- std::unique_ptr<IoWorker> io_worker_;
-
- class SchedulerContextImpl : public SchedulerContext {
- public:
- SchedulerContextImpl(WorkerInfo *worker, SchedulerInfo *scheduler, SchedulerGroupInfo *scheduler_group, Poll *poll,
- KHeap<double> *heap)
- : worker_(worker), scheduler_(scheduler), scheduler_group_(scheduler_group), poll_(poll), heap_(heap) {
- }
-
- SchedulerId get_scheduler_id() const override {
- return scheduler()->id;
- }
- void add_to_queue(ActorInfoPtr actor_info_ptr, SchedulerId scheduler_id, bool need_poll) override {
- if (!scheduler_id.is_valid()) {
- scheduler_id = scheduler()->id;
- }
- auto &info = scheduler_group()->schedulers.at(scheduler_id.value());
- if (need_poll) {
- info.io_queue->writer_put(std::move(actor_info_ptr));
- } else {
- info.cpu_queue->push(std::move(actor_info_ptr), get_thread_id());
- info.cpu_queue_waiter->notify();
- }
- }
-
- ActorInfoCreator &get_actor_info_creator() override {
- return worker()->actor_info_creator;
- }
-
- bool has_poll() override {
- return poll_ != nullptr;
- }
- Poll &get_poll() override {
- CHECK(has_poll());
- return *poll_;
- }
-
- bool has_heap() override {
- return heap_ != nullptr;
- }
- KHeap<double> &get_heap() override {
- CHECK(has_heap());
- return *heap_;
- }
-
- void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) override {
- // we are in PollWorker
- CHECK(has_heap());
- auto &heap = get_heap();
- auto *heap_node = actor_info_ptr->as_heap_node();
- if (timestamp) {
- if (heap_node->in_heap()) {
- heap.fix(timestamp.at(), heap_node);
- } else {
- heap.insert(timestamp.at(), heap_node);
- }
- } else {
- if (heap_node->in_heap()) {
- heap.erase(heap_node);
- }
- }
-
- // TODO: do something in plain worker
- }
-
- bool is_stop_requested() override {
- return scheduler_group()->is_stop_requested;
- }
-
- void stop() override {
- bool expect_false = false;
- // Trying to set close_flag_ to true with CAS
- auto &group = *scheduler_group();
- if (!group.is_stop_requested.compare_exchange_strong(expect_false, true)) {
- return;
- }
-
- // Notify all workers of all schedulers
- for (auto &scheduler_info : group.schedulers) {
- scheduler_info.io_queue->writer_put({});
- for (size_t i = 0; i < scheduler_info.cpu_threads_count; i++) {
- scheduler_info.cpu_queue->push({}, get_thread_id());
- scheduler_info.cpu_queue_waiter->notify();
- }
- }
- }
-
- private:
- WorkerInfo *worker() const {
- return worker_;
- }
- SchedulerInfo *scheduler() const {
- return scheduler_;
- }
- SchedulerGroupInfo *scheduler_group() const {
- return scheduler_group_;
- }
-
- WorkerInfo *worker_;
- SchedulerInfo *scheduler_;
- SchedulerGroupInfo *scheduler_group_;
- Poll *poll_;
-
- KHeap<double> *heap_;
- };
-
- template <class F>
- void run_in_context_impl(WorkerInfo &worker_info, F &&f) {
- bool is_io_worker = worker_info.type == WorkerInfo::Type::Io;
- SchedulerContextImpl context(&worker_info, info_, scheduler_group_info_.get(), is_io_worker ? &poll_ : nullptr,
- is_io_worker ? &heap_ : nullptr);
- SchedulerContext::Guard guard(&context);
- f();
- }
-
- class CpuWorker {
- public:
- CpuWorker(MpmcQueue<SchedulerMessage> &queue, MpmcWaiter &waiter) : queue_(queue), waiter_(waiter) {
- }
- void run() {
- auto thread_id = get_thread_id();
- auto &dispatcher = SchedulerContext::get()->dispatcher();
-
- int yields = 0;
- while (true) {
- SchedulerMessage message;
- if (queue_.try_pop(message, thread_id)) {
- if (!message) {
- return;
- }
- ActorExecutor executor(*message, dispatcher, ActorExecutor::Options().with_from_queue());
- yields = waiter_.stop_wait(yields, thread_id);
- } else {
- yields = waiter_.wait(yields, thread_id);
- }
- }
- }
-
- private:
- MpmcQueue<SchedulerMessage> &queue_;
- MpmcWaiter &waiter_;
- };
-
- class IoWorker {
- public:
- explicit IoWorker(MpscPollableQueue<SchedulerMessage> &queue) : queue_(queue) {
- }
-
- void start_up() {
- auto &poll = SchedulerContext::get()->get_poll();
- poll.subscribe(queue_.reader_get_event_fd().get_fd(), Fd::Flag::Read);
- }
- void tear_down() {
- auto &poll = SchedulerContext::get()->get_poll();
- poll.unsubscribe(queue_.reader_get_event_fd().get_fd());
- }
-
- bool run_once(double timeout) {
- auto &dispatcher = SchedulerContext::get()->dispatcher();
- auto &poll = SchedulerContext::get()->get_poll();
- auto &heap = SchedulerContext::get()->get_heap();
-
- auto now = Time::now(); // update Time::now_cached()
- while (!heap.empty() && heap.top_key() <= now) {
- auto *heap_node = heap.pop();
- auto *actor_info = ActorInfo::from_heap_node(heap_node);
-
- ActorExecutor executor(*actor_info, dispatcher, ActorExecutor::Options().with_has_poll(true));
- if (executor.can_send_immediate()) {
- executor.send_immediate(ActorSignals::one(ActorSignals::Alarm));
- } else {
- executor.send(ActorSignals::one(ActorSignals::Alarm));
- }
- }
-
- const int size = queue_.reader_wait_nonblock();
- for (int i = 0; i < size; i++) {
- auto message = queue_.reader_get_unsafe();
- if (!message) {
- return false;
- }
- ActorExecutor executor(*message, dispatcher, ActorExecutor::Options().with_from_queue().with_has_poll(true));
- }
- queue_.reader_flush();
-
- bool can_sleep = size == 0 && timeout != 0;
- int32 timeout_ms = 0;
- if (can_sleep) {
- auto wakeup_timestamp = Timestamp::in(timeout);
- if (!heap.empty()) {
- wakeup_timestamp.relax(Timestamp::at(heap.top_key()));
- }
- timeout_ms = static_cast<int>(wakeup_timestamp.in() * 1000) + 1;
- if (timeout_ms < 0) {
- timeout_ms = 0;
- }
- //const int thirty_seconds = 30 * 1000;
- //if (timeout_ms > thirty_seconds) {
- //timeout_ms = thirty_seconds;
- //}
- }
- poll.run(timeout_ms);
- return true;
- }
-
- private:
- MpscPollableQueue<SchedulerMessage> &queue_;
- };
-
- void do_stop() {
- if (is_stopped_) {
- return;
- }
- // wait other threads to finish
- for (auto &thread : cpu_threads_) {
- thread.join();
- }
- // Can't do anything else, other schedulers may send queries to this one.
- // Must wait till every scheduler is stopped first..
- is_stopped_ = true;
-
- io_worker_.reset();
- poll_.clear();
-
- std::unique_lock<std::mutex> lock(scheduler_group_info_->active_scheduler_count_mutex);
- scheduler_group_info_->active_scheduler_count--;
- scheduler_group_info_->active_scheduler_count_condition_variable.notify_all();
- }
-
- public:
- static void close_scheduler_group(SchedulerGroupInfo &group_info) {
- LOG(ERROR) << "close scheduler group";
- // Cannot close scheduler group before somebody asked to stop them
- CHECK(group_info.is_stop_requested);
- {
- std::unique_lock<std::mutex> lock(group_info.active_scheduler_count_mutex);
- group_info.active_scheduler_count_condition_variable.wait(lock,
- [&] { return group_info.active_scheduler_count == 0; });
- }
-
- // Drain all queues
- // Just to destroy all elements should be ok.
- for (auto &scheduler_info : group_info.schedulers) {
- // Drain io queue
- auto &io_queue = *scheduler_info.io_queue;
- while (true) {
- int n = io_queue.reader_wait_nonblock();
- if (n == 0) {
- break;
- }
- while (n-- > 0) {
- auto message = io_queue.reader_get_unsafe();
- // message's destructor is called
- }
- }
- scheduler_info.io_queue.reset();
-
- // Drain cpu queue
- auto &cpu_queue = *scheduler_info.cpu_queue;
- while (true) {
- SchedulerMessage message;
- if (!cpu_queue.try_pop(message, get_thread_id())) {
- break;
- }
- // message's destructor is called
- }
- scheduler_info.cpu_queue.reset();
-
- // Do not destroy worker infos. run_in_context will crash if they are empty
- }
- }
-};
-
-// Actor messages
-template <class LambdaT>
-class ActorMessageLambda : public ActorMessageImpl {
- public:
- template <class FromLambdaT>
- explicit ActorMessageLambda(FromLambdaT &&lambda) : lambda_(std::forward<FromLambdaT>(lambda)) {
- }
- void run() override {
- lambda_();
- }
-
- private:
- LambdaT lambda_;
-};
-
-class ActorMessageHangup : public ActorMessageImpl {
- public:
- void run() override {
- ActorExecuteContext::get()->actor().hang_up();
- }
-};
-
-class ActorMessageCreator {
- public:
- template <class F>
- static ActorMessage lambda(F &&f) {
- return ActorMessage(std::make_unique<ActorMessageLambda<F>>(std::forward<F>(f)));
- }
-
- static ActorMessage hangup() {
- return ActorMessage(std::make_unique<ActorMessageHangup>());
- }
-
- // Use faster allocation?
-};
-
-// SYNTAX SHUGAR
-namespace detail {
-struct ActorRef {
- ActorRef(ActorInfo &actor_info, uint64 link_token = EmptyLinkToken) : actor_info(actor_info), link_token(link_token) {
- }
-
- ActorInfo &actor_info;
- uint64 link_token;
-};
-
-template <class T>
-T &current_actor() {
- return static_cast<T &>(ActorExecuteContext::get()->actor());
-}
-
-void send_message(ActorInfo &actor_info, ActorMessage message) {
- ActorExecutor executor(actor_info, SchedulerContext::get()->dispatcher(), ActorExecutor::Options());
- executor.send(std::move(message));
-}
-
-void send_message(ActorRef actor_ref, ActorMessage message) {
- message.set_link_token(actor_ref.link_token);
- send_message(actor_ref.actor_info, std::move(message));
-}
-void send_message_later(ActorInfo &actor_info, ActorMessage message) {
- ActorExecutor executor(actor_info, SchedulerContext::get()->dispatcher(), ActorExecutor::Options());
- executor.send(std::move(message));
-}
-
-void send_message_later(ActorRef actor_ref, ActorMessage message) {
- message.set_link_token(actor_ref.link_token);
- send_message_later(actor_ref.actor_info, std::move(message));
-}
-
-template <class ExecuteF, class ToMessageF>
-void send_immediate(ActorRef actor_ref, ExecuteF &&execute, ToMessageF &&to_message) {
- auto &scheduler_context = *SchedulerContext::get();
- ActorExecutor executor(actor_ref.actor_info, scheduler_context.dispatcher(),
- ActorExecutor::Options().with_has_poll(scheduler_context.has_poll()));
- if (executor.can_send_immediate()) {
- return executor.send_immediate(execute, actor_ref.link_token);
- }
- auto message = to_message();
- message.set_link_token(actor_ref.link_token);
- executor.send(std::move(message));
-}
-
-template <class F>
-void send_lambda(ActorRef actor_ref, F &&lambda) {
- send_immediate(actor_ref, lambda, [&lambda]() mutable { return ActorMessageCreator::lambda(std::move(lambda)); });
-}
-template <class F>
-void send_lambda_later(ActorRef actor_ref, F &&lambda) {
- send_message_later(actor_ref, ActorMessageCreator::lambda(std::move(lambda)));
-}
-
-template <class ClosureT>
-void send_closure_impl(ActorRef actor_ref, ClosureT &&closure) {
- using ActorType = typename ClosureT::ActorType;
- send_immediate(actor_ref, [&closure]() mutable { closure.run(&current_actor<ActorType>()); },
- [&closure]() mutable {
- return ActorMessageCreator::lambda([closure = to_delayed_closure(std::move(closure))]() mutable {
- closure.run(&current_actor<ActorType>());
- });
- });
-}
-
-template <class... ArgsT>
-void send_closure(ActorRef actor_ref, ArgsT &&... args) {
- send_closure_impl(actor_ref, create_immediate_closure(std::forward<ArgsT>(args)...));
-}
-
-template <class ClosureT>
-void send_closure_later_impl(ActorRef actor_ref, ClosureT &&closure) {
- using ActorType = typename ClosureT::ActorType;
- send_message_later(actor_ref, ActorMessageCreator::lambda([closure = std::move(closure)]() mutable {
- closure.run(&current_actor<ActorType>());
- }));
-}
-
-template <class... ArgsT>
-void send_closure_later(ActorRef actor_ref, ArgsT &&... args) {
- send_closure_later_impl(actor_ref, create_delayed_closure(std::forward<ArgsT>(args)...));
-}
-
-void register_actor_info_ptr(ActorInfoPtr actor_info_ptr) {
- auto state = actor_info_ptr->state().get_flags_unsafe();
- SchedulerContext::get()->add_to_queue(std::move(actor_info_ptr), state.get_scheduler_id(), !state.is_shared());
-}
-
-template <class T, class... ArgsT>
-ActorInfoPtr create_actor(ActorOptions &options, ArgsT &&... args) {
- auto *scheduler_context = SchedulerContext::get();
- if (!options.has_scheduler()) {
- options.on_scheduler(scheduler_context->get_scheduler_id());
- }
- auto res =
- scheduler_context->get_actor_info_creator().create(std::make_unique<T>(std::forward<ArgsT>(args)...), options);
- register_actor_info_ptr(res);
- return res;
-}
-} // namespace detail
-
-// Essentially ActorInfoWeakPtr with Type
-template <class ActorType = Actor>
-class ActorId {
- public:
- using ActorT = ActorType;
- ActorId() = default;
- ActorId(const ActorId &) = default;
- ActorId &operator=(const ActorId &) = default;
- ActorId(ActorId &&other) = default;
- ActorId &operator=(ActorId &&other) = default;
-
- // allow only conversion from child to parent
- template <class ToActorType, class = std::enable_if_t<std::is_base_of<ToActorType, ActorType>::value>>
- explicit operator ActorId<ToActorType>() const {
- return ActorId<ToActorType>(ptr_);
- }
-
- const ActorInfoPtr &actor_info_ptr() const {
- return ptr_;
- }
-
- ActorInfo &actor_info() const {
- CHECK(ptr_);
- return *ptr_;
- }
- bool empty() const {
- return !ptr_;
- }
-
- template <class... ArgsT>
- static ActorId<ActorType> create(ActorOptions &options, ArgsT &&... args) {
- return ActorId<ActorType>(detail::create_actor<ActorType>(options, std::forward<ArgsT>(args)...));
- }
-
- detail::ActorRef as_actor_ref() const {
- CHECK(!empty());
- return detail::ActorRef(*actor_info_ptr());
- }
-
- private:
- ActorInfoPtr ptr_;
-
- explicit ActorId(ActorInfoPtr ptr) : ptr_(std::move(ptr)) {
- }
-
- template <class SelfT>
- friend ActorId<SelfT> actor_id(SelfT *self);
-};
-
-template <class ActorType = Actor>
-class ActorOwn {
- public:
- using ActorT = ActorType;
- ActorOwn() = default;
- explicit ActorOwn(ActorId<ActorType> id) : id_(std::move(id)) {
- }
- template <class OtherActorType>
- explicit ActorOwn(ActorId<OtherActorType> id) : id_(std::move(id)) {
- }
- template <class OtherActorType>
- explicit ActorOwn(ActorOwn<OtherActorType> &&other) : id_(other.release()) {
- }
- template <class OtherActorType>
- ActorOwn &operator=(ActorOwn<OtherActorType> &&other) {
- reset(other.release());
- }
- ActorOwn(ActorOwn &&other) : id_(other.release()) {
- }
- ActorOwn &operator=(ActorOwn &&other) {
- reset(other.release());
- }
- ActorOwn(const ActorOwn &) = delete;
- ActorOwn &operator=(const ActorOwn &) = delete;
- ~ActorOwn() {
- reset();
- }
-
- bool empty() const {
- return id_.empty();
- }
- bool is_alive() const {
- return id_.is_alive();
- }
- ActorId<ActorType> get() const {
- return id_;
- }
- ActorId<ActorType> release() {
- return std::move(id_);
- }
- void reset(ActorId<ActorType> other = ActorId<ActorType>()) {
- static_assert(sizeof(ActorType) > 0, "Can't use ActorOwn with incomplete type");
- hangup();
- id_ = std::move(other);
- }
- const ActorId<ActorType> *operator->() const {
- return &id_;
- }
-
- detail::ActorRef as_actor_ref() const {
- CHECK(!empty());
- return detail::ActorRef(*id_.actor_info_ptr(), 0);
- }
-
- private:
- ActorId<ActorType> id_;
- void hangup() const {
- if (empty()) {
- return;
- }
- detail::send_message(as_actor_ref(), ActorMessageCreator::hangup());
- }
-};
-
-template <class ActorType = Actor>
-class ActorShared {
- public:
- using ActorT = ActorType;
- ActorShared() = default;
- template <class OtherActorType>
- ActorShared(ActorId<OtherActorType> id, uint64 token) : id_(std::move(id)), token_(token) {
- CHECK(token_ != 0);
- }
- template <class OtherActorType>
- ActorShared(ActorShared<OtherActorType> &&other) : id_(other.release()), token_(other.token()) {
- }
- template <class OtherActorType>
- ActorShared(ActorOwn<OtherActorType> &&other) : id_(other.release()), token_(other.token()) {
- }
- template <class OtherActorType>
- ActorShared &operator=(ActorShared<OtherActorType> &&other) {
- reset(other.release(), other.token());
- }
- ActorShared(ActorShared &&other) : id_(other.release()), token_(other.token()) {
- }
- ActorShared &operator=(ActorShared &&other) {
- reset(other.release(), other.token());
- }
- ActorShared(const ActorShared &) = delete;
- ActorShared &operator=(const ActorShared &) = delete;
- ~ActorShared() {
- reset();
- }
-
- uint64 token() const {
- return token_;
- }
- bool empty() const {
- return id_.empty();
- }
- bool is_alive() const {
- return id_.is_alive();
- }
- ActorId<ActorType> get() const {
- return id_;
- }
- ActorId<ActorType> release();
- void reset(ActorId<ActorType> other = ActorId<ActorType>(), uint64 link_token = EmptyLinkToken) {
- static_assert(sizeof(ActorType) > 0, "Can't use ActorShared with incomplete type");
- hangup();
- id_ = other;
- token_ = link_token;
- }
- const ActorId<ActorType> *operator->() const {
- return &id_;
- }
-
- detail::ActorRef as_actor_ref() const {
- CHECK(!empty());
- return detail::ActorRef(*id_.actor_info_ptr(), token_);
- }
-
- private:
- ActorId<ActorType> id_;
- uint64 token_;
-
- void hangup() const {
- }
-};
-
-// common interface
-template <class SelfT>
-ActorId<SelfT> actor_id(SelfT *self) {
- CHECK(self);
- CHECK(static_cast<Actor *>(self) == &ActorExecuteContext::get()->actor());
- return ActorId<SelfT>(ActorExecuteContext::get()->actor().get_actor_info_ptr());
-}
-
-inline ActorId<> actor_id() {
- return actor_id(&ActorExecuteContext::get()->actor());
-}
-
-template <class T, class... ArgsT>
-ActorOwn<T> create_actor(ActorOptions options, ArgsT &&... args) {
- return ActorOwn<T>(ActorId<T>::create(options, std::forward<ArgsT>(args)...));
-}
-
-template <class T, class... ArgsT>
-ActorOwn<T> create_actor(Slice name, ArgsT &&... args) {
- return ActorOwn<T>(ActorId<T>::create(ActorOptions().with_name(name), std::forward<ArgsT>(args)...));
-}
-
-template <class ActorIdT, class FunctionT, class... ArgsT>
-void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
- using ActorT = typename std::decay_t<ActorIdT>::ActorT;
- using FunctionClassT = member_function_class_t<FunctionT>;
- static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
-
- ActorIdT id = std::forward<ActorIdT>(actor_id);
- detail::send_closure(id.as_actor_ref(), function, std::forward<ArgsT>(args)...);
-}
-
-template <class ActorIdT, class FunctionT, class... ArgsT>
-void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
- using ActorT = typename std::decay_t<ActorIdT>::ActorT;
- using FunctionClassT = member_function_class_t<FunctionT>;
- static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
-
- ActorIdT id = std::forward<ActorIdT>(actor_id);
- detail::send_closure_later(id.as_actor_ref(), function, std::forward<ArgsT>(args)...);
-}
-
-template <class ActorIdT, class... ArgsT>
-void send_lambda(ActorIdT &&actor_id, ArgsT &&... args) {
- ActorIdT id = std::forward<ActorIdT>(actor_id);
- detail::send_lambda(id.as_actor_ref(), std::forward<ArgsT>(args)...);
-}
-
-#endif //!TD_THREAD_UNSUPPORTED
-} // namespace actor2
-} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h b/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h
deleted file mode 100644
index 5850f1a94c..0000000000
--- a/protocols/Telegram/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h
+++ /dev/null
@@ -1,32 +0,0 @@
-//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
-//
-// Distributed under the Boost Software License, Version 1.0. (See accompanying
-// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-//
-#pragma once
-
-#include "td/utils/common.h"
-#include "td/utils/logging.h"
-
-namespace td {
-namespace actor2 {
-class SchedulerId {
- public:
- SchedulerId() : id_(-1) {
- }
- explicit SchedulerId(uint8 id) : id_(id) {
- }
- bool is_valid() const {
- return id_ >= 0;
- }
- uint8 value() const {
- CHECK(is_valid());
- return static_cast<uint8>(id_);
- }
-
- private:
- int32 id_{0};
-};
-} // namespace actor2
-} // namespace td
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp
index f4267f2818..0720f0ed6f 100644
--- a/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_bugs.cpp
@@ -1,38 +1,39 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
-#include "td/utils/tests.h"
-
-#include "td/actor/Timeout.h"
+#include "td/actor/actor.h"
+#include "td/actor/ConcurrentScheduler.h"
+#include "td/actor/MultiTimeout.h"
-using namespace td;
+#include "td/utils/common.h"
+#include "td/utils/logging.h"
+#include "td/utils/Random.h"
+#include "td/utils/tests.h"
TEST(MultiTimeout, bug) {
- ConcurrentScheduler sched;
- int threads_n = 0;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(0, 0);
sched.start();
- std::unique_ptr<MultiTimeout> multi_timeout;
+ td::unique_ptr<td::MultiTimeout> multi_timeout;
struct Data {
- MultiTimeout *multi_timeout;
+ td::MultiTimeout *multi_timeout;
};
Data data;
{
- auto guard = sched.get_current_guard();
- multi_timeout = std::make_unique<MultiTimeout>();
+ auto guard = sched.get_main_guard();
+ multi_timeout = td::make_unique<td::MultiTimeout>("MultiTimeout");
data.multi_timeout = multi_timeout.get();
- multi_timeout->set_callback([](void *void_data, int64 key) {
+ multi_timeout->set_callback([](void *void_data, td::int64 key) {
auto &data = *static_cast<Data *>(void_data);
if (key == 1) {
data.multi_timeout->cancel_timeout(key + 1);
data.multi_timeout->set_timeout_in(key + 2, 1);
} else {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
});
multi_timeout->set_callback_data(&data);
@@ -45,3 +46,67 @@ TEST(MultiTimeout, bug) {
}
sched.finish();
}
+
+class TimeoutManager final : public td::Actor {
+ static td::int32 count;
+
+ public:
+ TimeoutManager() {
+ count++;
+
+ test_timeout_.set_callback(on_test_timeout_callback);
+ test_timeout_.set_callback_data(static_cast<void *>(this));
+ }
+ TimeoutManager(const TimeoutManager &) = delete;
+ TimeoutManager &operator=(const TimeoutManager &) = delete;
+ TimeoutManager(TimeoutManager &&) = delete;
+ TimeoutManager &operator=(TimeoutManager &&) = delete;
+ ~TimeoutManager() final {
+ count--;
+ LOG(INFO) << "Destroy TimeoutManager";
+ }
+
+ static void on_test_timeout_callback(void *timeout_manager_ptr, td::int64 id) {
+ CHECK(count >= 0);
+ if (count == 0) {
+ LOG(ERROR) << "Receive timeout after manager was closed";
+ return;
+ }
+
+ auto manager = static_cast<TimeoutManager *>(timeout_manager_ptr);
+ send_closure_later(manager->actor_id(manager), &TimeoutManager::test_timeout);
+ }
+
+ void test_timeout() {
+ CHECK(count > 0);
+ // we must yield scheduler, so run_main breaks immediately, if timeouts are handled immediately
+ td::Scheduler::instance()->yield();
+ }
+
+ td::MultiTimeout test_timeout_{"TestTimeout"};
+};
+
+td::int32 TimeoutManager::count;
+
+TEST(MultiTimeout, Destroy) {
+ td::ConcurrentScheduler sched(0, 0);
+
+ auto timeout_manager = sched.create_actor_unsafe<TimeoutManager>(0, "TimeoutManager");
+ TimeoutManager *manager = timeout_manager.get().get_actor_unsafe();
+ sched.start();
+ int cnt = 100;
+ while (sched.run_main(cnt == 100 || cnt <= 0 ? 0.001 : 10)) {
+ auto guard = sched.get_main_guard();
+ cnt--;
+ if (cnt > 0) {
+ for (int i = 0; i < 2; i++) {
+ manager->test_timeout_.set_timeout_in(td::Random::fast(0, 1000000000), td::Random::fast(2, 5) / 1000.0);
+ }
+ } else if (cnt == 0) {
+ timeout_manager.reset();
+ } else if (cnt == -10) {
+ td::Scheduler::instance()->finish();
+ }
+ }
+ sched.finish();
+}
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp
deleted file mode 100644
index 9185fe8858..0000000000
--- a/protocols/Telegram/tdlib/td/tdactor/test/actors_impl2.cpp
+++ /dev/null
@@ -1,535 +0,0 @@
-//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
-//
-// Distributed under the Boost Software License, Version 1.0. (See accompanying
-// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-//
-#include "td/actor/impl2/ActorLocker.h"
-#include "td/actor/impl2/Scheduler.h"
-
-#include "td/utils/format.h"
-#include "td/utils/logging.h"
-#include "td/utils/port/thread.h"
-#include "td/utils/Slice.h"
-#include "td/utils/StringBuilder.h"
-#include "td/utils/tests.h"
-#include "td/utils/Time.h"
-
-#include <array>
-#include <atomic>
-#include <deque>
-#include <memory>
-
-using td::actor2::ActorLocker;
-using td::actor2::ActorSignals;
-using td::actor2::ActorState;
-using td::actor2::SchedulerId;
-
-TEST(Actor2, signals) {
- ActorSignals signals;
- signals.add_signal(ActorSignals::Wakeup);
- signals.add_signal(ActorSignals::Cpu);
- signals.add_signal(ActorSignals::Kill);
- signals.clear_signal(ActorSignals::Cpu);
-
- bool was_kill = false;
- bool was_wakeup = false;
- while (!signals.empty()) {
- auto s = signals.first_signal();
- if (s == ActorSignals::Kill) {
- was_kill = true;
- } else if (s == ActorSignals::Wakeup) {
- was_wakeup = true;
- } else {
- UNREACHABLE();
- }
- signals.clear_signal(s);
- }
- CHECK(was_kill && was_wakeup);
-}
-
-TEST(Actors2, flags) {
- ActorState::Flags flags;
- CHECK(!flags.is_locked());
- flags.set_locked(true);
- CHECK(flags.is_locked());
- flags.set_locked(false);
- CHECK(!flags.is_locked());
- flags.set_pause(true);
-
- flags.set_scheduler_id(SchedulerId{123});
-
- auto signals = flags.get_signals();
- CHECK(signals.empty());
- signals.add_signal(ActorSignals::Cpu);
- signals.add_signal(ActorSignals::Kill);
- CHECK(signals.has_signal(ActorSignals::Cpu));
- CHECK(signals.has_signal(ActorSignals::Kill));
- flags.set_signals(signals);
- CHECK(flags.get_signals().raw() == signals.raw()) << flags.get_signals().raw() << " " << signals.raw();
-
- auto wakeup = ActorSignals{};
- wakeup.add_signal(ActorSignals::Wakeup);
-
- flags.add_signals(wakeup);
- signals.add_signal(ActorSignals::Wakeup);
- CHECK(flags.get_signals().raw() == signals.raw());
-
- flags.clear_signals();
- CHECK(flags.get_signals().empty());
-
- CHECK(flags.get_scheduler_id().value() == 123);
- CHECK(flags.is_pause());
-}
-
-TEST(Actor2, locker) {
- ActorState state;
-
- ActorSignals kill_signal;
- kill_signal.add_signal(ActorSignals::Kill);
-
- ActorSignals wakeup_signal;
- kill_signal.add_signal(ActorSignals::Wakeup);
-
- ActorSignals cpu_signal;
- kill_signal.add_signal(ActorSignals::Cpu);
-
- {
- ActorLocker lockerA(&state);
- ActorLocker lockerB(&state);
- ActorLocker lockerC(&state);
-
- CHECK(lockerA.try_lock());
- CHECK(lockerA.own_lock());
- auto flagsA = lockerA.flags();
- CHECK(lockerA.try_unlock(flagsA));
- CHECK(!lockerA.own_lock());
-
- CHECK(lockerA.try_lock());
- CHECK(!lockerB.try_lock());
- CHECK(!lockerC.try_lock());
-
- CHECK(lockerB.try_add_signals(kill_signal));
- CHECK(!lockerC.try_add_signals(wakeup_signal));
- CHECK(lockerC.try_add_signals(wakeup_signal));
- CHECK(!lockerC.add_signals(cpu_signal));
- CHECK(!lockerA.flags().has_signals());
- CHECK(!lockerA.try_unlock(lockerA.flags()));
- {
- auto flags = lockerA.flags();
- auto signals = flags.get_signals();
- bool was_kill = false;
- bool was_wakeup = false;
- bool was_cpu = false;
- while (!signals.empty()) {
- auto s = signals.first_signal();
- if (s == ActorSignals::Kill) {
- was_kill = true;
- } else if (s == ActorSignals::Wakeup) {
- was_wakeup = true;
- } else if (s == ActorSignals::Cpu) {
- was_cpu = true;
- } else {
- UNREACHABLE();
- }
- signals.clear_signal(s);
- }
- CHECK(was_kill && was_wakeup && was_cpu);
- flags.clear_signals();
- CHECK(lockerA.try_unlock(flags));
- }
- }
-
- {
- ActorLocker lockerB(&state);
- CHECK(lockerB.try_lock());
- CHECK(lockerB.try_unlock(lockerB.flags()));
- CHECK(lockerB.add_signals(kill_signal));
- CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill));
- auto flags = lockerB.flags();
- flags.clear_signals();
- ActorLocker lockerA(&state);
- CHECK(!lockerA.add_signals(kill_signal));
- CHECK(!lockerB.try_unlock(flags));
- CHECK(!lockerA.add_signals(kill_signal)); // do not loose this signal!
- CHECK(!lockerB.try_unlock(flags));
- CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill));
- CHECK(lockerB.try_unlock(flags));
- }
-
- {
- ActorLocker lockerA(&state);
- CHECK(lockerA.try_lock());
- auto flags = lockerA.flags();
- flags.set_pause(true);
- CHECK(lockerA.try_unlock(flags));
- //We have to lock, though we can't execute.
- CHECK(lockerA.add_signals(wakeup_signal));
- }
-}
-
-#if !TD_THREAD_UNSUPPORTED
-TEST(Actor2, locker_stress) {
- ActorState state;
-
- constexpr size_t threads_n = 5;
- auto stage = [&](std::atomic<int> &value, int need) {
- value.fetch_add(1, std::memory_order_release);
- while (value.load(std::memory_order_acquire) < need) {
- td::this_thread::yield();
- }
- };
-
- struct Node {
- std::atomic<td::uint32> request{0};
- td::uint32 response = 0;
- char pad[64];
- };
- std::array<Node, threads_n> nodes;
- auto do_work = [&]() {
- for (auto &node : nodes) {
- auto query = node.request.load(std::memory_order_acquire);
- if (query) {
- node.response = query * query;
- node.request.store(0, std::memory_order_relaxed);
- }
- }
- };
-
- std::atomic<int> begin{0};
- std::atomic<int> ready{0};
- std::atomic<int> check{0};
- std::atomic<int> finish{0};
- std::vector<td::thread> threads;
- for (size_t i = 0; i < threads_n; i++) {
- threads.push_back(td::thread([&, id = i] {
- for (size_t i = 1; i < 1000000; i++) {
- ActorLocker locker(&state);
- auto need = static_cast<int>(threads_n * i);
- auto query = static_cast<td::uint32>(id + need);
- stage(begin, need);
- nodes[id].request = 0;
- nodes[id].response = 0;
- stage(ready, need);
- if (locker.try_lock()) {
- nodes[id].response = query * query;
- } else {
- auto cpu = ActorSignals::one(ActorSignals::Cpu);
- nodes[id].request.store(query, std::memory_order_release);
- locker.add_signals(cpu);
- }
- while (locker.own_lock()) {
- auto flags = locker.flags();
- auto signals = flags.get_signals();
- if (!signals.empty()) {
- do_work();
- }
- flags.clear_signals();
- locker.try_unlock(flags);
- }
-
- stage(check, need);
- if (id == 0) {
- CHECK(locker.add_signals(ActorSignals{}));
- CHECK(!locker.flags().has_signals());
- CHECK(locker.try_unlock(locker.flags()));
- for (size_t thread_id = 0; thread_id < threads_n; thread_id++) {
- CHECK(nodes[thread_id].response ==
- static_cast<td::uint32>(thread_id + need) * static_cast<td::uint32>(thread_id + need))
- << td::tag("thread", thread_id) << " " << nodes[thread_id].response << " "
- << nodes[thread_id].request.load();
- }
- }
- }
- }));
- }
- for (auto &thread : threads) {
- thread.join();
- }
-}
-
-namespace {
-const size_t BUF_SIZE = 1024 * 1024;
-char buf[BUF_SIZE];
-td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1));
-} // namespace
-
-TEST(Actor2, executor_simple) {
- using namespace td;
- using namespace td::actor2;
- struct Dispatcher : public SchedulerDispatcher {
- void add_to_queue(ActorInfoPtr ptr, SchedulerId scheduler_id, bool need_poll) override {
- queue.push_back(std::move(ptr));
- }
- void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) override {
- UNREACHABLE();
- }
- SchedulerId get_scheduler_id() const override {
- return SchedulerId{0};
- }
- std::deque<ActorInfoPtr> queue;
- };
- Dispatcher dispatcher;
-
- class TestActor : public Actor {
- public:
- void close() {
- stop();
- }
-
- private:
- void start_up() override {
- sb << "StartUp";
- }
- void tear_down() override {
- sb << "TearDown";
- }
- };
- ActorInfoCreator actor_info_creator;
- auto actor = actor_info_creator.create(
- std::make_unique<TestActor>(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor"));
- dispatcher.add_to_queue(actor, SchedulerId{0}, false);
-
- {
- ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
- CHECK(executor.can_send());
- CHECK(executor.can_send_immediate());
- CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice();
- sb.clear();
- executor.send(ActorMessageCreator::lambda([&] { sb << "A"; }));
- CHECK(sb.as_cslice() == "A") << sb.as_cslice();
- sb.clear();
- auto big_message = ActorMessageCreator::lambda([&] { sb << "big"; });
- big_message.set_big();
- executor.send(std::move(big_message));
- CHECK(sb.as_cslice() == "") << sb.as_cslice();
- executor.send(ActorMessageCreator::lambda([&] { sb << "A"; }));
- CHECK(sb.as_cslice() == "") << sb.as_cslice();
- }
- CHECK(dispatcher.queue.size() == 1);
- { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue()); }
- CHECK(dispatcher.queue.size() == 1);
- dispatcher.queue.clear();
- CHECK(sb.as_cslice() == "bigA") << sb.as_cslice();
- sb.clear();
- {
- ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
- executor.send(
- ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
- }
- CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice();
- sb.clear();
- CHECK(!actor->has_actor());
- {
- ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
- executor.send(
- ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
- }
- CHECK(dispatcher.queue.empty());
- CHECK(sb.as_cslice() == "");
-}
-
-using namespace td::actor2;
-using td::uint32;
-static std::atomic<int> cnt;
-class Worker : public Actor {
- public:
- void query(uint32 x, ActorInfoPtr master);
- void close() {
- stop();
- }
-};
-class Master : public Actor {
- public:
- void on_result(uint32 x, uint32 y) {
- loop();
- }
-
- private:
- uint32 l = 0;
- uint32 r = 100000;
- ActorInfoPtr worker;
- void start_up() override {
- worker = detail::create_actor<Worker>(ActorOptions().with_name("Master"));
- loop();
- }
- void loop() override {
- l++;
- if (l == r) {
- if (!--cnt) {
- SchedulerContext::get()->stop();
- }
- detail::send_closure(*worker, &Worker::close);
- stop();
- return;
- }
- detail::send_lambda(*worker,
- [x = l, self = get_actor_info_ptr()] { detail::current_actor<Worker>().query(x, self); });
- }
-};
-
-void Worker::query(uint32 x, ActorInfoPtr master) {
- auto y = x;
- for (int i = 0; i < 100; i++) {
- y = y * y;
- }
- detail::send_lambda(*master, [result = y, x] { detail::current_actor<Master>().on_result(x, result); });
-}
-
-TEST(Actor2, scheduler_simple) {
- auto group_info = std::make_shared<SchedulerGroupInfo>(1);
- Scheduler scheduler{group_info, SchedulerId{0}, 2};
- scheduler.start();
- scheduler.run_in_context([] {
- cnt = 10;
- for (int i = 0; i < 10; i++) {
- detail::create_actor<Master>(ActorOptions().with_name("Master"));
- }
- });
- while (scheduler.run(1000)) {
- }
- Scheduler::close_scheduler_group(*group_info);
-}
-
-TEST(Actor2, actor_id_simple) {
- auto group_info = std::make_shared<SchedulerGroupInfo>(1);
- Scheduler scheduler{group_info, SchedulerId{0}, 2};
- sb.clear();
- scheduler.start();
-
- scheduler.run_in_context([] {
- class A : public Actor {
- public:
- A(int value) : value_(value) {
- sb << "A" << value_;
- }
- void hello() {
- sb << "hello";
- }
- ~A() {
- sb << "~A";
- if (--cnt <= 0) {
- SchedulerContext::get()->stop();
- }
- }
-
- private:
- int value_;
- };
- cnt = 1;
- auto id = create_actor<A>("A", 123);
- CHECK(sb.as_cslice() == "A123");
- sb.clear();
- send_closure(id, &A::hello);
- });
- while (scheduler.run(1000)) {
- }
- CHECK(sb.as_cslice() == "hello~A");
- Scheduler::close_scheduler_group(*group_info);
- sb.clear();
-}
-
-TEST(Actor2, actor_creation) {
- auto group_info = std::make_shared<SchedulerGroupInfo>(1);
- Scheduler scheduler{group_info, SchedulerId{0}, 1};
- scheduler.start();
-
- scheduler.run_in_context([]() mutable {
- class B;
- class A : public Actor {
- public:
- void f() {
- check();
- stop();
- }
-
- private:
- void start_up() override {
- check();
- create_actor<B>("Simple", actor_id(this)).release();
- }
-
- void check() {
- auto &context = *SchedulerContext::get();
- CHECK(context.has_poll());
- context.get_poll();
- }
-
- void tear_down() override {
- if (--cnt <= 0) {
- SchedulerContext::get()->stop();
- }
- }
- };
-
- class B : public Actor {
- public:
- B(ActorId<A> a) : a_(a) {
- }
-
- private:
- void start_up() override {
- auto &context = *SchedulerContext::get();
- CHECK(!context.has_poll());
- send_closure(a_, &A::f);
- stop();
- }
- void tear_down() override {
- if (--cnt <= 0) {
- SchedulerContext::get()->stop();
- }
- }
- ActorId<A> a_;
- };
- cnt = 2;
- create_actor<A>(ActorOptions().with_name("Poll").with_poll()).release();
- });
- while (scheduler.run(1000)) {
- }
- scheduler.stop();
- Scheduler::close_scheduler_group(*group_info);
-}
-
-TEST(Actor2, actor_timeout_simple) {
- auto group_info = std::make_shared<SchedulerGroupInfo>(1);
- Scheduler scheduler{group_info, SchedulerId{0}, 2};
- sb.clear();
- scheduler.start();
-
- scheduler.run_in_context([] {
- class A : public Actor {
- public:
- void start_up() override {
- set_timeout();
- }
- void alarm() override {
- double diff = td::Time::now() - expected_timeout_;
- CHECK(-0.001 < diff && diff < 0.1) << diff;
- if (cnt_-- > 0) {
- set_timeout();
- } else {
- stop();
- }
- }
-
- void tear_down() override {
- SchedulerContext::get()->stop();
- }
-
- private:
- double expected_timeout_;
- int cnt_ = 5;
- void set_timeout() {
- auto wakeup_timestamp = td::Timestamp::in(0.1);
- expected_timeout_ = wakeup_timestamp.at();
- alarm_timestamp() = wakeup_timestamp;
- }
- };
- create_actor<A>(ActorInfoCreator::Options().with_name("A").with_poll()).release();
- });
- while (scheduler.run(1000)) {
- }
- Scheduler::close_scheduler_group(*group_info);
- sb.clear();
-}
-#endif //!TD_THREAD_UNSUPPORTED
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp
index ffceacc595..628b74a94c 100644
--- a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp
@@ -1,35 +1,32 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
-#include "td/utils/tests.h"
-
#include "td/actor/actor.h"
+#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/PromiseFuture.h"
+#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/Random.h"
+#include "td/utils/ScopeGuard.h"
+#include "td/utils/tests.h"
#include <limits>
#include <map>
+#include <memory>
#include <utility>
-using namespace td;
-
-REGISTER_TESTS(actors_main);
-
-namespace {
-
template <class ContainerT>
static typename ContainerT::value_type &rand_elem(ContainerT &cont) {
CHECK(0 < cont.size() && cont.size() <= static_cast<size_t>(std::numeric_limits<int>::max()));
- return cont[Random::fast(0, static_cast<int>(cont.size()) - 1)];
+ return cont[td::Random::fast(0, static_cast<int>(cont.size()) - 1)];
}
-static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) {
- uint32 res = 1;
+static td::uint32 fast_pow_mod_uint32(td::uint32 x, td::uint32 p) {
+ td::uint32 res = 1;
while (p) {
if (p & 1) {
res *= x;
@@ -40,25 +37,25 @@ static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) {
return res;
}
-static uint32 slow_pow_mod_uint32(uint32 x, uint32 p) {
- uint32 res = 1;
- for (uint32 i = 0; i < p; i++) {
+static td::uint32 slow_pow_mod_uint32(td::uint32 x, td::uint32 p) {
+ td::uint32 res = 1;
+ for (td::uint32 i = 0; i < p; i++) {
res *= x;
}
return res;
}
-struct Query {
- uint32 query_id;
- uint32 result;
- std::vector<int> todo;
- Query() = default;
- Query(const Query &) = delete;
- Query &operator=(const Query &) = delete;
- Query(Query &&) = default;
- Query &operator=(Query &&) = default;
- ~Query() {
- CHECK(todo.empty()) << "Query lost";
+struct ActorQuery {
+ td::uint32 query_id{};
+ td::uint32 result{};
+ td::vector<int> todo;
+ ActorQuery() = default;
+ ActorQuery(const ActorQuery &) = delete;
+ ActorQuery &operator=(const ActorQuery &) = delete;
+ ActorQuery(ActorQuery &&) = default;
+ ActorQuery &operator=(ActorQuery &&) = default;
+ ~ActorQuery() {
+ LOG_CHECK(todo.empty()) << "ActorQuery lost";
}
int next_pow() {
CHECK(!todo.empty());
@@ -71,25 +68,25 @@ struct Query {
}
};
-static uint32 fast_calc(Query &q) {
- uint32 result = q.result;
+static td::uint32 fast_calc(ActorQuery &q) {
+ td::uint32 result = q.result;
for (auto x : q.todo) {
result = fast_pow_mod_uint32(result, x);
}
return result;
}
-class Worker final : public Actor {
+class Worker final : public td::Actor {
public:
explicit Worker(int threads_n) : threads_n_(threads_n) {
}
- void query(PromiseActor<uint32> &&promise, uint32 x, uint32 p) {
- uint32 result = slow_pow_mod_uint32(x, p);
+ void query(td::PromiseActor<td::uint32> &&promise, td::uint32 x, td::uint32 p) {
+ td::uint32 result = slow_pow_mod_uint32(x, p);
promise.set_value(std::move(result));
(void)threads_n_;
- // if (threads_n_ > 1 && Random::fast(0, 9) == 0) {
- // migrate(Random::fast(2, threads_n));
+ // if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) {
+ // migrate(td::Random::fast(2, threads_n));
//}
}
@@ -97,7 +94,7 @@ class Worker final : public Actor {
int threads_n_;
};
-class QueryActor final : public Actor {
+class QueryActor final : public td::Actor {
public:
class Callback {
public:
@@ -107,44 +104,46 @@ class QueryActor final : public Actor {
Callback(Callback &&) = delete;
Callback &operator=(Callback &&) = delete;
virtual ~Callback() = default;
- virtual void on_result(Query &&query) = 0;
+ virtual void on_result(ActorQuery &&query) = 0;
virtual void on_closed() = 0;
};
explicit QueryActor(int threads_n) : threads_n_(threads_n) {
}
- void set_callback(std::unique_ptr<Callback> callback) {
+ void set_callback(td::unique_ptr<Callback> callback) {
callback_ = std::move(callback);
}
- void set_workers(std::vector<ActorId<Worker>> workers) {
+ void set_workers(td::vector<td::ActorId<Worker>> workers) {
workers_ = std::move(workers);
}
- void query(Query &&query) {
- uint32 x = query.result;
- uint32 p = query.next_pow();
- if (Random::fast(0, 3) && (p <= 1000 || workers_.empty())) {
+ void query(ActorQuery &&query) {
+ td::uint32 x = query.result;
+ td::uint32 p = query.next_pow();
+ if (td::Random::fast(0, 3) && (p <= 1000 || workers_.empty())) {
query.result = slow_pow_mod_uint32(x, p);
callback_->on_result(std::move(query));
} else {
- auto future = send_promise(rand_elem(workers_), Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, x, p);
+ auto future = td::Random::fast(0, 3) == 0
+ ? td::send_promise<td::ActorSendType::Immediate>(rand_elem(workers_), &Worker::query, x, p)
+ : td::send_promise<td::ActorSendType::Later>(rand_elem(workers_), &Worker::query, x, p);
if (future.is_ready()) {
query.result = future.move_as_ok();
callback_->on_result(std::move(query));
} else {
- future.set_event(EventCreator::raw(actor_id(), query.query_id));
+ future.set_event(td::EventCreator::raw(actor_id(), query.query_id));
auto query_id = query.query_id;
- pending_.insert(std::make_pair(query_id, std::make_pair(std::move(future), std::move(query))));
+ pending_.emplace(query_id, std::make_pair(std::move(future), std::move(query)));
}
}
- if (threads_n_ > 1 && Random::fast(0, 9) == 0) {
- migrate(Random::fast(2, threads_n_));
+ if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) {
+ migrate(td::Random::fast(2, threads_n_));
}
}
- void raw_event(const Event::Raw &event) override {
- uint32 id = event.u32;
+ void raw_event(const td::Event::Raw &event) final {
+ td::uint32 id = event.u32;
auto it = pending_.find(id);
auto future = std::move(it->second.first);
auto query = std::move(it->second.second);
@@ -159,44 +158,44 @@ class QueryActor final : public Actor {
stop();
}
- void on_start_migrate(int32 sched_id) override {
+ void on_start_migrate(td::int32 sched_id) final {
for (auto &it : pending_) {
start_migrate(it.second.first, sched_id);
}
}
- void on_finish_migrate() override {
+ void on_finish_migrate() final {
for (auto &it : pending_) {
finish_migrate(it.second.first);
}
}
private:
- unique_ptr<Callback> callback_;
- std::map<uint32, std::pair<FutureActor<uint32>, Query>> pending_;
- std::vector<ActorId<Worker>> workers_;
+ td::unique_ptr<Callback> callback_;
+ std::map<td::uint32, std::pair<td::FutureActor<td::uint32>, ActorQuery>> pending_;
+ td::vector<td::ActorId<Worker>> workers_;
int threads_n_;
};
-class MainQueryActor final : public Actor {
- class QueryActorCallback : public QueryActor::Callback {
+class MainQueryActor final : public td::Actor {
+ class QueryActorCallback final : public QueryActor::Callback {
public:
- void on_result(Query &&query) override {
+ void on_result(ActorQuery &&query) final {
if (query.ready()) {
send_closure(parent_id_, &MainQueryActor::on_result, std::move(query));
} else {
send_closure(next_solver_, &QueryActor::query, std::move(query));
}
}
- void on_closed() override {
+ void on_closed() final {
send_closure(parent_id_, &MainQueryActor::on_closed);
}
- QueryActorCallback(ActorId<MainQueryActor> parent_id, ActorId<QueryActor> next_solver)
+ QueryActorCallback(td::ActorId<MainQueryActor> parent_id, td::ActorId<QueryActor> next_solver)
: parent_id_(parent_id), next_solver_(next_solver) {
}
private:
- ActorId<MainQueryActor> parent_id_;
- ActorId<QueryActor> next_solver_;
+ td::ActorId<MainQueryActor> parent_id_;
+ td::ActorId<QueryActor> next_solver_;
};
const int ACTORS_CNT = 10;
@@ -206,39 +205,39 @@ class MainQueryActor final : public Actor {
explicit MainQueryActor(int threads_n) : threads_n_(threads_n) {
}
- void start_up() override {
+ void start_up() final {
actors_.resize(ACTORS_CNT);
for (auto &actor : actors_) {
- auto actor_ptr = make_unique<QueryActor>(threads_n_);
- actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0)
+ auto actor_ptr = td::make_unique<QueryActor>(threads_n_);
+ actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0)
.release();
}
workers_.resize(WORKERS_CNT);
for (auto &worker : workers_) {
- auto actor_ptr = make_unique<Worker>(threads_n_);
- worker =
- register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release();
+ auto actor_ptr = td::make_unique<Worker>(threads_n_);
+ worker = register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0)
+ .release();
}
for (int i = 0; i < ACTORS_CNT; i++) {
ref_cnt_++;
send_closure(actors_[i], &QueryActor::set_callback,
- make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT]));
+ td::make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT]));
send_closure(actors_[i], &QueryActor::set_workers, workers_);
}
yield();
}
- void on_result(Query &&query) {
+ void on_result(ActorQuery &&query) {
CHECK(query.ready());
CHECK(query.result == expected_[query.query_id]);
in_cnt_++;
wakeup();
}
- Query create_query() {
- Query q;
+ ActorQuery create_query() {
+ ActorQuery q;
q.query_id = (query_id_ += 2);
q.result = q.query_id;
q.todo = {1, 1, 1, 1, 1, 1, 1, 1, 10000};
@@ -249,14 +248,14 @@ class MainQueryActor final : public Actor {
void on_closed() {
ref_cnt_--;
if (ref_cnt_ == 0) {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
}
- void wakeup() override {
- int cnt = 100000;
+ void wakeup() final {
+ int cnt = 10000;
while (out_cnt_ < in_cnt_ + 100 && out_cnt_ < cnt) {
- if (Random::fast(0, 1)) {
+ if (td::Random::fast_bool()) {
send_closure(rand_elem(actors_), &QueryActor::query, create_query());
} else {
send_closure_later(rand_elem(actors_), &QueryActor::query, create_query());
@@ -273,9 +272,9 @@ class MainQueryActor final : public Actor {
}
private:
- std::map<uint32, uint32> expected_;
- std::vector<ActorId<QueryActor>> actors_;
- std::vector<ActorId<Worker>> workers_;
+ std::map<td::uint32, td::uint32> expected_;
+ td::vector<td::ActorId<QueryActor>> actors_;
+ td::vector<td::ActorId<Worker>> workers_;
int out_cnt_ = 0;
int in_cnt_ = 0;
int query_id_ = 1;
@@ -283,101 +282,104 @@ class MainQueryActor final : public Actor {
int threads_n_;
};
-class SimpleActor final : public Actor {
+class SimpleActor final : public td::Actor {
public:
- explicit SimpleActor(int32 threads_n) : threads_n_(threads_n) {
+ explicit SimpleActor(td::int32 threads_n) : threads_n_(threads_n) {
}
- void start_up() override {
- auto actor_ptr = make_unique<Worker>(threads_n_);
+ void start_up() final {
+ auto actor_ptr = td::make_unique<Worker>(threads_n_);
worker_ =
- register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release();
+ register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0).release();
yield();
}
- void wakeup() override {
- if (q_ == 100000) {
- Scheduler::instance()->finish();
+ void wakeup() final {
+ if (q_ == 10000) {
+ td::Scheduler::instance()->finish();
stop();
return;
}
q_++;
- p_ = Random::fast(0, 1) ? 1 : 10000;
- auto future = send_promise(worker_, Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, q_, p_);
+ p_ = td::Random::fast_bool() ? 1 : 10000;
+ auto future = td::Random::fast(0, 3) == 0
+ ? td::send_promise<td::ActorSendType::Immediate>(worker_, &Worker::query, q_, p_)
+ : td::send_promise<td::ActorSendType::Later>(worker_, &Worker::query, q_, p_);
if (future.is_ready()) {
auto result = future.move_as_ok();
CHECK(result == fast_pow_mod_uint32(q_, p_));
yield();
} else {
- future.set_event(EventCreator::raw(actor_id(), nullptr));
+ future.set_event(td::EventCreator::raw(actor_id(), nullptr));
future_ = std::move(future);
}
- // if (threads_n_ > 1 && Random::fast(0, 2) == 0) {
- // migrate(Random::fast(1, threads_n));
+ // if (threads_n_ > 1 && td::Random::fast(0, 2) == 0) {
+ // migrate(td::Random::fast(1, threads_n));
//}
}
- void raw_event(const Event::Raw &event) override {
+ void raw_event(const td::Event::Raw &event) final {
auto result = future_.move_as_ok();
CHECK(result == fast_pow_mod_uint32(q_, p_));
yield();
}
- void on_start_migrate(int32 sched_id) override {
+ void on_start_migrate(td::int32 sched_id) final {
start_migrate(future_, sched_id);
}
- void on_finish_migrate() override {
+ void on_finish_migrate() final {
finish_migrate(future_);
}
private:
- int32 threads_n_;
- ActorId<Worker> worker_;
- FutureActor<uint32> future_;
- uint32 q_ = 1;
- uint32 p_;
+ td::int32 threads_n_;
+ td::ActorId<Worker> worker_;
+ td::FutureActor<td::uint32> future_;
+ td::uint32 q_ = 1;
+ td::uint32 p_ = 0;
};
-} // namespace
-class SendToDead : public Actor {
+class SendToDead final : public td::Actor {
public:
- class Parent : public Actor {
+ class Parent final : public td::Actor {
public:
- explicit Parent(ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) {
+ explicit Parent(td::ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) {
}
- void start_up() override {
- set_timeout_in(Random::fast_uint32() % 3 * 0.001);
+ void start_up() final {
+ set_timeout_in(td::Random::fast_uint32() % 3 * 0.001);
if (ttl_ != 0) {
- child_ = create_actor_on_scheduler<Parent>(
- "Child", Random::fast_uint32() % Scheduler::instance()->sched_count(), actor_shared(), ttl_ - 1);
+ child_ = td::create_actor_on_scheduler<Parent>(
+ "Child", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), actor_shared(this),
+ ttl_ - 1);
}
}
- void timeout_expired() override {
+ void timeout_expired() final {
stop();
}
private:
- ActorOwn<Parent> child_;
- ActorShared<> parent_;
+ td::ActorOwn<Parent> child_;
+ td::ActorShared<> parent_;
int ttl_;
};
- void start_up() override {
+ void start_up() final {
for (int i = 0; i < 2000; i++) {
- create_actor_on_scheduler<Parent>("Parent", Random::fast_uint32() % Scheduler::instance()->sched_count(),
- create_reference(), 4)
+ td::create_actor_on_scheduler<Parent>(
+ "Parent", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), create_reference(), 4)
.release();
}
}
- ActorShared<> create_reference() {
+ td::ActorShared<> create_reference() {
ref_cnt_++;
- return actor_shared();
+ return actor_shared(this);
}
- void hangup_shared() override {
+
+ void hangup_shared() final {
ref_cnt_--;
if (ref_cnt_ == 0) {
ttl_--;
if (ttl_ <= 0) {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
stop();
} else {
start_up();
@@ -385,18 +387,17 @@ class SendToDead : public Actor {
}
}
- uint32 ttl_{50};
- uint32 ref_cnt_{0};
+ td::uint32 ttl_{50};
+ td::uint32 ref_cnt_{0};
};
TEST(Actors, send_to_dead) {
//TODO: fix CHECK(storage_count_.load() == 0)
return;
- ConcurrentScheduler sched;
int threads_n = 5;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(threads_n, 0);
- sched.create_actor_unsafe<SendToDead>(0, "manager").release();
+ sched.create_actor_unsafe<SendToDead>(0, "SendToDead").release();
sched.start();
while (sched.run_main(10)) {
// empty
@@ -405,11 +406,8 @@ TEST(Actors, send_to_dead) {
}
TEST(Actors, main_simple) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
-
- ConcurrentScheduler sched;
int threads_n = 3;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(threads_n, 0);
sched.create_actor_unsafe<SimpleActor>(threads_n > 1 ? 1 : 0, "simple", threads_n).release();
sched.start();
@@ -420,13 +418,10 @@ TEST(Actors, main_simple) {
}
TEST(Actors, main) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
-
- ConcurrentScheduler sched;
int threads_n = 9;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(threads_n, 0);
- sched.create_actor_unsafe<MainQueryActor>(threads_n > 1 ? 1 : 0, "manager", threads_n).release();
+ sched.create_actor_unsafe<MainQueryActor>(threads_n > 1 ? 1 : 0, "MainQuery", threads_n).release();
sched.start();
while (sched.run_main(10)) {
// empty
@@ -434,27 +429,76 @@ TEST(Actors, main) {
sched.finish();
}
-class DoAfterStop : public Actor {
+class DoAfterStop final : public td::Actor {
public:
- void loop() override {
- ptr = std::make_unique<int>(10);
+ void loop() final {
+ ptr = td::make_unique<int>(10);
stop();
CHECK(*ptr == 10);
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
private:
- std::unique_ptr<int> ptr;
+ td::unique_ptr<int> ptr;
};
TEST(Actors, do_after_stop) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
+ int threads_n = 0;
+ td::ConcurrentScheduler sched(threads_n, 0);
- ConcurrentScheduler sched;
+ sched.create_actor_unsafe<DoAfterStop>(0, "DoAfterStop").release();
+ sched.start();
+ while (sched.run_main(10)) {
+ // empty
+ }
+ sched.finish();
+}
+
+class XContext final : public td::ActorContext {
+ public:
+ td::int32 get_id() const final {
+ return 123456789;
+ }
+
+ void validate() {
+ CHECK(x == 1234);
+ }
+ ~XContext() final {
+ x = 0;
+ }
+ int x = 1234;
+};
+
+class WithXContext final : public td::Actor {
+ public:
+ void start_up() final {
+ auto old_context = set_context(std::make_shared<XContext>());
+ }
+ void f(td::unique_ptr<td::Guard> guard) {
+ }
+ void close() {
+ stop();
+ }
+};
+
+static void check_context() {
+ auto ptr = static_cast<XContext *>(td::Scheduler::context());
+ CHECK(ptr != nullptr);
+ ptr->validate();
+}
+
+TEST(Actors, context_during_destruction) {
int threads_n = 0;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(threads_n, 0);
- sched.create_actor_unsafe<DoAfterStop>(0, "manager").release();
+ {
+ auto guard = sched.get_main_guard();
+ auto with_context = td::create_actor<WithXContext>("WithXContext").release();
+ send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); }));
+ send_closure_later(with_context, &WithXContext::close);
+ send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); }));
+ send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { td::Scheduler::instance()->finish(); }));
+ }
sched.start();
while (sched.run_main(10)) {
// empty
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp
index c0a6c32b61..78d32d5437 100644
--- a/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_simple.cpp
@@ -1,57 +1,66 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
-#include "td/utils/tests.h"
-
#include "td/actor/actor.h"
+#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/MultiPromise.h"
#include "td/actor/PromiseFuture.h"
#include "td/actor/SleepActor.h"
-#include "td/actor/Timeout.h"
+#include "td/utils/common.h"
#include "td/utils/logging.h"
+#include "td/utils/MpscPollableQueue.h"
#include "td/utils/Observer.h"
#include "td/utils/port/FileFd.h"
+#include "td/utils/port/thread.h"
+#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include "td/utils/StringBuilder.h"
+#include "td/utils/tests.h"
+#include "td/utils/Time.h"
+#include <memory>
#include <tuple>
-REGISTER_TESTS(actors_simple)
-
-namespace {
-using namespace td;
-
static const size_t BUF_SIZE = 1024 * 1024;
static char buf[BUF_SIZE];
static char buf2[BUF_SIZE];
-static StringBuilder sb(MutableSlice(buf, BUF_SIZE - 1));
-static StringBuilder sb2(MutableSlice(buf2, BUF_SIZE - 1));
+static td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1));
+static td::StringBuilder sb2(td::MutableSlice(buf2, BUF_SIZE - 1));
+
+static td::vector<std::shared_ptr<td::MpscPollableQueue<td::EventFull>>> create_queues() {
+#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
+ return {};
+#else
+ auto res = std::make_shared<td::MpscPollableQueue<td::EventFull>>();
+ res->init();
+ return {res};
+#endif
+}
TEST(Actors, SendLater) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
sb.clear();
- Scheduler scheduler;
- scheduler.init();
+ td::Scheduler scheduler;
+ scheduler.init(0, create_queues(), nullptr);
auto guard = scheduler.get_guard();
- class Worker : public Actor {
+ class Worker final : public td::Actor {
public:
void f() {
sb << "A";
}
};
- auto id = create_actor<Worker>("Worker");
- scheduler.run_no_guard(0);
- send_closure(id, &Worker::f);
- send_closure_later(id, &Worker::f);
- send_closure(id, &Worker::f);
+ auto id = td::create_actor<Worker>("Worker");
+ scheduler.run_no_guard(td::Timestamp::in(1));
+ td::send_closure(id, &Worker::f);
+ td::send_closure_later(id, &Worker::f);
+ td::send_closure(id, &Worker::f);
ASSERT_STREQ("A", sb.as_cslice().c_str());
- scheduler.run_no_guard(0);
+ scheduler.run_no_guard(td::Timestamp::in(1));
ASSERT_STREQ("AAA", sb.as_cslice().c_str());
}
@@ -63,21 +72,21 @@ class X {
X(const X &) {
sb << "[cnstr_copy]";
}
- X(X &&) {
+ X(X &&) noexcept {
sb << "[cnstr_move]";
}
X &operator=(const X &) {
sb << "[set_copy]";
return *this;
}
- X &operator=(X &&) {
+ X &operator=(X &&) noexcept {
sb << "[set_move]";
return *this;
}
~X() = default;
};
-class XReceiver final : public Actor {
+class XReceiver final : public td::Actor {
public:
void by_const_ref(const X &) {
sb << "[by_const_ref]";
@@ -91,13 +100,12 @@ class XReceiver final : public Actor {
};
TEST(Actors, simple_pass_event_arguments) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
- Scheduler scheduler;
- scheduler.init();
+ td::Scheduler scheduler;
+ scheduler.init(0, create_queues(), nullptr);
auto guard = scheduler.get_guard();
- auto id = create_actor<XReceiver>("XR").release();
- scheduler.run_no_guard(0);
+ auto id = td::create_actor<XReceiver>("XR").release();
+ scheduler.run_no_guard(td::Timestamp::in(1));
X x;
@@ -112,47 +120,47 @@ TEST(Actors, simple_pass_event_arguments) {
// Tmp-->ConstRef
sb.clear();
- send_closure(id, &XReceiver::by_const_ref, X());
+ td::send_closure(id, &XReceiver::by_const_ref, X());
ASSERT_STREQ("[cnstr_default][by_const_ref]", sb.as_cslice().c_str());
// Tmp-->ConstRef (Delayed)
sb.clear();
- send_closure_later(id, &XReceiver::by_const_ref, X());
- scheduler.run_no_guard(0);
+ td::send_closure_later(id, &XReceiver::by_const_ref, X());
+ scheduler.run_no_guard(td::Timestamp::in(1));
// LOG(ERROR) << sb.as_cslice();
ASSERT_STREQ("[cnstr_default][cnstr_move][by_const_ref]", sb.as_cslice().c_str());
// Tmp-->LvalueRef
sb.clear();
- send_closure(id, &XReceiver::by_lvalue_ref, X());
+ td::send_closure(id, &XReceiver::by_lvalue_ref, X());
ASSERT_STREQ("[cnstr_default][by_lvalue_ref]", sb.as_cslice().c_str());
// Tmp-->LvalueRef (Delayed)
sb.clear();
- send_closure_later(id, &XReceiver::by_lvalue_ref, X());
- scheduler.run_no_guard(0);
+ td::send_closure_later(id, &XReceiver::by_lvalue_ref, X());
+ scheduler.run_no_guard(td::Timestamp::in(1));
ASSERT_STREQ("[cnstr_default][cnstr_move][by_lvalue_ref]", sb.as_cslice().c_str());
// Tmp-->Value
sb.clear();
- send_closure(id, &XReceiver::by_value, X());
+ td::send_closure(id, &XReceiver::by_value, X());
ASSERT_STREQ("[cnstr_default][cnstr_move][by_value]", sb.as_cslice().c_str());
// Tmp-->Value (Delayed)
sb.clear();
- send_closure_later(id, &XReceiver::by_value, X());
- scheduler.run_no_guard(0);
+ td::send_closure_later(id, &XReceiver::by_value, X());
+ scheduler.run_no_guard(td::Timestamp::in(1));
ASSERT_STREQ("[cnstr_default][cnstr_move][cnstr_move][by_value]", sb.as_cslice().c_str());
// Var-->ConstRef
sb.clear();
- send_closure(id, &XReceiver::by_const_ref, x);
+ td::send_closure(id, &XReceiver::by_const_ref, x);
ASSERT_STREQ("[by_const_ref]", sb.as_cslice().c_str());
// Var-->ConstRef (Delayed)
sb.clear();
- send_closure_later(id, &XReceiver::by_const_ref, x);
- scheduler.run_no_guard(0);
+ td::send_closure_later(id, &XReceiver::by_const_ref, x);
+ scheduler.run_no_guard(td::Timestamp::in(1));
ASSERT_STREQ("[cnstr_copy][by_const_ref]", sb.as_cslice().c_str());
// Var-->LvalueRef
@@ -161,24 +169,24 @@ TEST(Actors, simple_pass_event_arguments) {
// Var-->Value
sb.clear();
- send_closure(id, &XReceiver::by_value, x);
+ td::send_closure(id, &XReceiver::by_value, x);
ASSERT_STREQ("[cnstr_copy][by_value]", sb.as_cslice().c_str());
// Var-->Value (Delayed)
sb.clear();
- send_closure_later(id, &XReceiver::by_value, x);
- scheduler.run_no_guard(0);
+ td::send_closure_later(id, &XReceiver::by_value, x);
+ scheduler.run_no_guard(td::Timestamp::in(1));
ASSERT_STREQ("[cnstr_copy][cnstr_move][by_value]", sb.as_cslice().c_str());
}
-class PrintChar final : public Actor {
+class PrintChar final : public td::Actor {
public:
PrintChar(char c, int cnt) : char_(c), cnt_(cnt) {
}
- void start_up() override {
+ void start_up() final {
yield();
}
- void wakeup() override {
+ void wakeup() final {
if (cnt_ == 0) {
stop();
} else {
@@ -192,25 +200,23 @@ class PrintChar final : public Actor {
char char_;
int cnt_;
};
-} // namespace
//
// Yield must add actor to the end of queue
//
TEST(Actors, simple_hand_yield) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
- Scheduler scheduler;
- scheduler.init();
+ td::Scheduler scheduler;
+ scheduler.init(0, create_queues(), nullptr);
sb.clear();
int cnt = 1000;
{
auto guard = scheduler.get_guard();
- create_actor<PrintChar>("PrintA", 'A', cnt).release();
- create_actor<PrintChar>("PrintB", 'B', cnt).release();
- create_actor<PrintChar>("PrintC", 'C', cnt).release();
+ td::create_actor<PrintChar>("PrintA", 'A', cnt).release();
+ td::create_actor<PrintChar>("PrintB", 'B', cnt).release();
+ td::create_actor<PrintChar>("PrintC", 'C', cnt).release();
}
- scheduler.run(0);
- std::string expected;
+ scheduler.run(td::Timestamp::in(1));
+ td::string expected;
for (int i = 0; i < cnt; i++) {
expected += "ABC";
}
@@ -219,7 +225,7 @@ TEST(Actors, simple_hand_yield) {
class Ball {
public:
- friend void start_migrate(Ball &ball, int32 sched_id) {
+ friend void start_migrate(Ball &ball, td::int32 sched_id) {
sb << "start";
}
friend void finish_migrate(Ball &ball) {
@@ -227,31 +233,30 @@ class Ball {
}
};
-class Pong final : public Actor {
+class Pong final : public td::Actor {
public:
void pong(Ball ball) {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
};
-class Ping final : public Actor {
+class Ping final : public td::Actor {
public:
- explicit Ping(ActorId<Pong> pong) : pong_(pong) {
+ explicit Ping(td::ActorId<Pong> pong) : pong_(pong) {
}
- void start_up() override {
- send_closure(pong_, &Pong::pong, Ball());
+ void start_up() final {
+ td::send_closure(pong_, &Pong::pong, Ball());
}
private:
- ActorId<Pong> pong_;
+ td::ActorId<Pong> pong_;
};
TEST(Actors, simple_migrate) {
sb.clear();
sb2.clear();
- ConcurrentScheduler scheduler;
- scheduler.init(2);
+ td::ConcurrentScheduler scheduler(2, 0);
auto pong = scheduler.create_actor_unsafe<Pong>(2, "Pong").release();
scheduler.create_actor_unsafe<Ping>(1, "Ping", pong).release();
scheduler.start();
@@ -267,26 +272,25 @@ TEST(Actors, simple_migrate) {
#endif
}
-class OpenClose final : public Actor {
+class OpenClose final : public td::Actor {
public:
explicit OpenClose(int cnt) : cnt_(cnt) {
}
- void start_up() override {
+ void start_up() final {
yield();
}
- void wakeup() override {
- ObserverBase *observer = reinterpret_cast<ObserverBase *>(123);
+ void wakeup() final {
+ auto observer = reinterpret_cast<td::ObserverBase *>(123);
if (cnt_ > 0) {
- auto r_file_fd = FileFd::open("server", FileFd::Read | FileFd::Create);
- CHECK(r_file_fd.is_ok()) << r_file_fd.error();
+ auto r_file_fd = td::FileFd::open("server", td::FileFd::Read | td::FileFd::Create);
+ LOG_CHECK(r_file_fd.is_ok()) << r_file_fd.error();
auto file_fd = r_file_fd.move_as_ok();
- // LOG(ERROR) << file_fd.get_native_fd();
- file_fd.get_fd().set_observer(observer);
+ { auto pollable_fd = file_fd.get_poll_info().extract_pollable_fd(observer); }
file_fd.close();
cnt_--;
yield();
} else {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
}
@@ -295,14 +299,8 @@ class OpenClose final : public Actor {
};
TEST(Actors, open_close) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
- ConcurrentScheduler scheduler;
- scheduler.init(2);
- int cnt = 1000000;
-#if TD_WINDOWS || TD_ANDROID
- // TODO(perf) optimize
- cnt = 100;
-#endif
+ td::ConcurrentScheduler scheduler(2, 0);
+ int cnt = 10000; // TODO(perf) optimize
scheduler.create_actor_unsafe<OpenClose>(1, "A", cnt).release();
scheduler.create_actor_unsafe<OpenClose>(2, "B", cnt).release();
scheduler.start();
@@ -311,62 +309,59 @@ TEST(Actors, open_close) {
scheduler.finish();
}
-namespace {
-class MsgActor : public Actor {
+class MsgActor : public td::Actor {
public:
virtual void msg() = 0;
};
-class Slave : public Actor {
+class Slave final : public td::Actor {
public:
- ActorId<MsgActor> msg;
- explicit Slave(ActorId<MsgActor> msg) : msg(msg) {
+ td::ActorId<MsgActor> msg;
+ explicit Slave(td::ActorId<MsgActor> msg) : msg(msg) {
}
- void hangup() override {
- send_closure(msg, &MsgActor::msg);
+ void hangup() final {
+ td::send_closure(msg, &MsgActor::msg);
}
};
-class MasterActor : public MsgActor {
+class MasterActor final : public MsgActor {
public:
- void loop() override {
+ void loop() final {
alive_ = true;
- slave = create_actor<Slave>("slave", static_cast<ActorId<MsgActor>>(actor_id(this)));
+ slave = td::create_actor<Slave>("Slave", static_cast<td::ActorId<MsgActor>>(actor_id(this)));
stop();
}
- ActorOwn<Slave> slave;
+ td::ActorOwn<Slave> slave;
MasterActor() = default;
MasterActor(const MasterActor &) = delete;
MasterActor &operator=(const MasterActor &) = delete;
MasterActor(MasterActor &&) = delete;
MasterActor &operator=(MasterActor &&) = delete;
- ~MasterActor() override {
+ ~MasterActor() final {
alive_ = 987654321;
}
- void msg() override {
+ void msg() final {
CHECK(alive_ == 123456789);
}
- uint64 alive_ = 123456789;
+ td::uint64 alive_ = 123456789;
};
-} // namespace
TEST(Actors, call_after_destruct) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
- Scheduler scheduler;
- scheduler.init();
+ td::Scheduler scheduler;
+ scheduler.init(0, create_queues(), nullptr);
{
auto guard = scheduler.get_guard();
- create_actor<MasterActor>("Master").release();
+ td::create_actor<MasterActor>("Master").release();
}
- scheduler.run(0);
+ scheduler.run(td::Timestamp::in(1));
}
-class LinkTokenSlave : public Actor {
+class LinkTokenSlave final : public td::Actor {
public:
- explicit LinkTokenSlave(ActorShared<> parent) : parent_(std::move(parent)) {
+ explicit LinkTokenSlave(td::ActorShared<> parent) : parent_(std::move(parent)) {
}
- void add(uint64 link_token) {
+ void add(td::uint64 link_token) {
CHECK(link_token == get_link_token());
}
void close() {
@@ -374,62 +369,61 @@ class LinkTokenSlave : public Actor {
}
private:
- ActorShared<> parent_;
+ td::ActorShared<> parent_;
};
-class LinkTokenMasterActor : public Actor {
+class LinkTokenMasterActor final : public td::Actor {
public:
explicit LinkTokenMasterActor(int cnt) : cnt_(cnt) {
}
- void start_up() override {
- child_ = create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release();
+ void start_up() final {
+ child_ = td::create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release();
yield();
}
- void loop() override {
+ void loop() final {
for (int i = 0; i < 100 && cnt_ > 0; cnt_--, i++) {
+ auto token = static_cast<td::uint64>(cnt_) + 1;
switch (i % 4) {
case 0: {
- send_closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1);
+ td::send_closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token);
break;
}
case 1: {
- send_closure_later(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1);
+ td::send_closure_later(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token);
break;
}
case 2: {
- EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1)
+ td::EventCreator::closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token)
.try_emit();
break;
}
case 3: {
- EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1)
+ td::EventCreator::closure(td::ActorShared<LinkTokenSlave>(child_, token), &LinkTokenSlave::add, token)
.try_emit_later();
break;
}
}
}
if (cnt_ == 0) {
- send_closure(child_, &LinkTokenSlave::close);
+ td::send_closure(child_, &LinkTokenSlave::close);
} else {
yield();
}
}
- void hangup_shared() override {
+ void hangup_shared() final {
CHECK(get_link_token() == 123);
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
stop();
}
private:
int cnt_;
- ActorId<LinkTokenSlave> child_;
+ td::ActorId<LinkTokenSlave> child_;
};
TEST(Actors, link_token) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
- ConcurrentScheduler scheduler;
- scheduler.init(0);
+ td::ConcurrentScheduler scheduler(0, 0);
auto cnt = 100000;
scheduler.create_actor_unsafe<LinkTokenMasterActor>(0, "A", cnt).release();
scheduler.start();
@@ -440,25 +434,25 @@ TEST(Actors, link_token) {
TEST(Actors, promise) {
int value = -1;
- Promise<int> p1 = PromiseCreator::lambda([&](int x) { value = x; });
- p1.set_error(Status::Error("Test error"));
+ td::Promise<int> p1 = td::PromiseCreator::lambda([&](int x) { value = x; });
+ p1.set_error(td::Status::Error("Test error"));
ASSERT_EQ(0, value);
- Promise<int32> p2 = PromiseCreator::lambda([&](Result<int32> x) { value = 1; });
- p2.set_error(Status::Error("Test error"));
+ td::Promise<td::int32> p2 = td::PromiseCreator::lambda([&](td::Result<td::int32> x) { value = 1; });
+ p2.set_error(td::Status::Error("Test error"));
ASSERT_EQ(1, value);
}
-class LaterSlave : public Actor {
+class LaterSlave final : public td::Actor {
public:
- explicit LaterSlave(ActorShared<> parent) : parent_(std::move(parent)) {
+ explicit LaterSlave(td::ActorShared<> parent) : parent_(std::move(parent)) {
}
private:
- ActorShared<> parent_;
+ td::ActorShared<> parent_;
- void hangup() override {
+ void hangup() final {
sb << "A";
- send_closure(actor_id(this), &LaterSlave::finish);
+ td::send_closure(actor_id(this), &LaterSlave::finish);
}
void finish() {
sb << "B";
@@ -466,31 +460,29 @@ class LaterSlave : public Actor {
}
};
-class LaterMasterActor : public Actor {
+class LaterMasterActor final : public td::Actor {
int cnt_ = 3;
- std::vector<ActorOwn<LaterSlave>> children_;
- void start_up() override {
+ td::vector<td::ActorOwn<LaterSlave>> children_;
+ void start_up() final {
for (int i = 0; i < cnt_; i++) {
- children_.push_back(create_actor<LaterSlave>("B", actor_shared()));
+ children_.push_back(td::create_actor<LaterSlave>("B", actor_shared(this)));
}
yield();
}
- void loop() override {
+ void loop() final {
children_.clear();
}
- void hangup_shared() override {
+ void hangup_shared() final {
if (!--cnt_) {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
stop();
}
}
};
TEST(Actors, later) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
sb.clear();
- ConcurrentScheduler scheduler;
- scheduler.init(0);
+ td::ConcurrentScheduler scheduler(0, 0);
scheduler.create_actor_unsafe<LaterMasterActor>(0, "A").release();
scheduler.start();
while (scheduler.run_main(10)) {
@@ -499,39 +491,36 @@ TEST(Actors, later) {
ASSERT_STREQ(sb.as_cslice().c_str(), "AAABBB");
}
-class MultiPromise2 : public Actor {
+class MultiPromise2 final : public td::Actor {
public:
- void start_up() override {
- auto promise = PromiseCreator::lambda([](Result<Unit> result) {
+ void start_up() final {
+ auto promise = td::PromiseCreator::lambda([](td::Result<td::Unit> result) {
result.ensure();
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
});
- MultiPromiseActorSafe multi_promise;
+ td::MultiPromiseActorSafe multi_promise{"MultiPromiseActor2"};
multi_promise.add_promise(std::move(promise));
for (int i = 0; i < 10; i++) {
- create_actor<SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release();
+ td::create_actor<td::SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release();
}
}
};
-class MultiPromise1 : public Actor {
+class MultiPromise1 final : public td::Actor {
public:
- void start_up() override {
- auto promise = PromiseCreator::lambda([](Result<Unit> result) {
+ void start_up() final {
+ auto promise = td::PromiseCreator::lambda([](td::Result<td::Unit> result) {
CHECK(result.is_error());
- create_actor<MultiPromise2>("B").release();
+ td::create_actor<MultiPromise2>("B").release();
});
- MultiPromiseActorSafe multi_promise;
+ td::MultiPromiseActorSafe multi_promise{"MultiPromiseActor1"};
multi_promise.add_promise(std::move(promise));
}
};
TEST(Actors, MultiPromise) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
- sb.clear();
- ConcurrentScheduler scheduler;
- scheduler.init(0);
+ td::ConcurrentScheduler scheduler(0, 0);
scheduler.create_actor_unsafe<MultiPromise1>(0, "A").release();
scheduler.start();
while (scheduler.run_main(10)) {
@@ -539,23 +528,20 @@ TEST(Actors, MultiPromise) {
scheduler.finish();
}
-class FastPromise : public Actor {
+class FastPromise final : public td::Actor {
public:
- void start_up() override {
- PromiseFuture<int> pf;
+ void start_up() final {
+ td::PromiseFuture<int> pf;
auto promise = pf.move_promise();
auto future = pf.move_future();
promise.set_value(123);
CHECK(future.move_as_ok() == 123);
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
};
TEST(Actors, FastPromise) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
- sb.clear();
- ConcurrentScheduler scheduler;
- scheduler.init(0);
+ td::ConcurrentScheduler scheduler(0, 0);
scheduler.create_actor_unsafe<FastPromise>(0, "A").release();
scheduler.start();
while (scheduler.run_main(10)) {
@@ -563,21 +549,18 @@ TEST(Actors, FastPromise) {
scheduler.finish();
}
-class StopInTeardown : public Actor {
- void loop() override {
+class StopInTeardown final : public td::Actor {
+ void loop() final {
stop();
}
- void tear_down() override {
+ void tear_down() final {
stop();
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
};
TEST(Actors, stop_in_teardown) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
- sb.clear();
- ConcurrentScheduler scheduler;
- scheduler.init(0);
+ td::ConcurrentScheduler scheduler(0, 0);
scheduler.create_actor_unsafe<StopInTeardown>(0, "A").release();
scheduler.start();
while (scheduler.run_main(10)) {
@@ -585,38 +568,113 @@ TEST(Actors, stop_in_teardown) {
scheduler.finish();
}
-class AlwaysWaitForMailbox : public Actor {
+class AlwaysWaitForMailbox final : public td::Actor {
public:
- void start_up() override {
+ void start_up() final {
always_wait_for_mailbox();
- create_actor<SleepActor>("Sleep", 0.1, PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](Unit) {
- send_closure(actor_id, &AlwaysWaitForMailbox::g);
- send_closure(actor_id, &AlwaysWaitForMailbox::g);
- CHECK(!ptr->was_f_);
- }))
+ td::create_actor<td::SleepActor>("Sleep", 0.1,
+ td::PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](td::Unit) {
+ td::send_closure(actor_id, &AlwaysWaitForMailbox::g);
+ td::send_closure(actor_id, &AlwaysWaitForMailbox::g);
+ CHECK(!ptr->was_f_);
+ }))
.release();
}
void f() {
was_f_ = true;
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
void g() {
- send_closure(actor_id(this), &AlwaysWaitForMailbox::f);
+ td::send_closure(actor_id(this), &AlwaysWaitForMailbox::f);
}
private:
- Timeout timeout_;
bool was_f_{false};
};
TEST(Actors, always_wait_for_mailbox) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
- ConcurrentScheduler scheduler;
- scheduler.init(0);
+ td::ConcurrentScheduler scheduler(0, 0);
scheduler.create_actor_unsafe<AlwaysWaitForMailbox>(0, "A").release();
scheduler.start();
while (scheduler.run_main(10)) {
}
scheduler.finish();
}
+
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+TEST(Actors, send_from_other_threads) {
+ td::ConcurrentScheduler scheduler(1, 0);
+ int thread_n = 10;
+ class Listener final : public td::Actor {
+ public:
+ explicit Listener(int cnt) : cnt_(cnt) {
+ }
+ void dec() {
+ if (--cnt_ == 0) {
+ td::Scheduler::instance()->finish();
+ }
+ }
+
+ private:
+ int cnt_;
+ };
+
+ auto A = scheduler.create_actor_unsafe<Listener>(1, "A", thread_n).release();
+ scheduler.start();
+ td::vector<td::thread> threads(thread_n);
+ for (auto &thread : threads) {
+ thread = td::thread([&A, &scheduler] {
+ auto guard = scheduler.get_send_guard();
+ td::send_closure(A, &Listener::dec);
+ });
+ }
+ while (scheduler.run_main(10)) {
+ }
+ for (auto &thread : threads) {
+ thread.join();
+ }
+ scheduler.finish();
+}
+#endif
+
+class DelayedCall final : public td::Actor {
+ public:
+ void on_called(int *step) {
+ CHECK(*step == 0);
+ *step = 1;
+ }
+};
+
+class MultiPromiseSendClosureLaterTest final : public td::Actor {
+ public:
+ void start_up() final {
+ delayed_call_ = td::create_actor<DelayedCall>("DelayedCall").release();
+ mpa_.add_promise(td::PromiseCreator::lambda([this](td::Unit) {
+ CHECK(step_ == 1);
+ step_++;
+ td::Scheduler::instance()->finish();
+ }));
+ auto lock = mpa_.get_promise();
+ td::send_closure_later(delayed_call_, &DelayedCall::on_called, &step_);
+ lock.set_value(td::Unit());
+ }
+
+ void tear_down() final {
+ CHECK(step_ == 2);
+ }
+
+ private:
+ int step_ = 0;
+ td::MultiPromiseActor mpa_{"MultiPromiseActor"};
+ td::ActorId<DelayedCall> delayed_call_;
+};
+
+TEST(Actors, MultiPromiseSendClosureLater) {
+ td::ConcurrentScheduler scheduler(0, 0);
+ scheduler.create_actor_unsafe<MultiPromiseSendClosureLaterTest>(0, "MultiPromiseSendClosureLaterTest").release();
+ scheduler.start();
+ while (scheduler.run_main(1)) {
+ }
+ scheduler.finish();
+}
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp
index b97a258a44..bac42e3fd5 100644
--- a/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_workers.cpp
@@ -1,22 +1,17 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
-#include "td/utils/tests.h"
-
#include "td/actor/actor.h"
+#include "td/actor/ConcurrentScheduler.h"
-#include "td/utils/logging.h"
-
-REGISTER_TESTS(actors_workers);
-
-namespace {
-
-using namespace td;
+#include "td/utils/common.h"
+#include "td/utils/SliceBuilder.h"
+#include "td/utils/tests.h"
-class PowerWorker final : public Actor {
+class PowerWorker final : public td::Actor {
public:
class Callback {
public:
@@ -29,12 +24,12 @@ class PowerWorker final : public Actor {
virtual void on_ready(int query, int res) = 0;
virtual void on_closed() = 0;
};
- void set_callback(unique_ptr<Callback> callback) {
+ void set_callback(td::unique_ptr<Callback> callback) {
callback_ = std::move(callback);
}
- void task(uint32 x, uint32 p) {
- uint32 res = 1;
- for (uint32 i = 0; i < p; i++) {
+ void task(td::uint32 x, td::uint32 p) {
+ td::uint32 res = 1;
+ for (td::uint32 i = 0; i < p; i++) {
res *= x;
}
callback_->on_ready(x, res);
@@ -45,39 +40,41 @@ class PowerWorker final : public Actor {
}
private:
- std::unique_ptr<Callback> callback_;
+ td::unique_ptr<Callback> callback_;
};
-class Manager final : public Actor {
+class Manager final : public td::Actor {
public:
- Manager(int queries_n, int query_size, std::vector<ActorId<PowerWorker>> workers)
- : workers_(std::move(workers)), left_query_(queries_n), query_size_(query_size) {
+ Manager(int queries_n, int query_size, td::vector<td::ActorId<PowerWorker>> workers)
+ : workers_(std::move(workers))
+ , ref_cnt_(static_cast<int>(workers_.size()))
+ , left_query_(queries_n)
+ , query_size_(query_size) {
}
- class Callback : public PowerWorker::Callback {
+ class Callback final : public PowerWorker::Callback {
public:
- Callback(ActorId<Manager> actor_id, int worker_id) : actor_id_(actor_id), worker_id_(worker_id) {
+ Callback(td::ActorId<Manager> actor_id, int worker_id) : actor_id_(actor_id), worker_id_(worker_id) {
}
- void on_ready(int query, int result) override {
- send_closure(actor_id_, &Manager::on_ready, worker_id_, query, result);
+ void on_ready(int query, int result) final {
+ td::send_closure(actor_id_, &Manager::on_ready, worker_id_, query, result);
}
- void on_closed() override {
- send_closure_later(actor_id_, &Manager::on_closed, worker_id_);
+ void on_closed() final {
+ td::send_closure_later(actor_id_, &Manager::on_closed, worker_id_);
}
private:
- ActorId<Manager> actor_id_;
+ td::ActorId<Manager> actor_id_;
int worker_id_;
};
- void start_up() override {
- ref_cnt_ = static_cast<int>(workers_.size());
+ void start_up() final {
int i = 0;
for (auto &worker : workers_) {
ref_cnt_++;
- send_closure_later(worker, &PowerWorker::set_callback, make_unique<Callback>(actor_id(this), i));
+ td::send_closure_later(worker, &PowerWorker::set_callback, td::make_unique<Callback>(actor_id(this), i));
i++;
- send_closure_later(worker, &PowerWorker::task, 3, query_size_);
+ td::send_closure_later(worker, &PowerWorker::task, 3, query_size_);
left_query_--;
}
}
@@ -85,10 +82,10 @@ class Manager final : public Actor {
void on_ready(int worker_id, int query, int res) {
ref_cnt_--;
if (left_query_ == 0) {
- send_closure(workers_[worker_id], &PowerWorker::close);
+ td::send_closure(workers_[worker_id], &PowerWorker::close);
} else {
ref_cnt_++;
- send_closure(workers_[worker_id], &PowerWorker::task, 3, query_size_);
+ td::send_closure(workers_[worker_id], &PowerWorker::task, 3, query_size_);
left_query_--;
}
}
@@ -96,30 +93,27 @@ class Manager final : public Actor {
void on_closed(int worker_id) {
ref_cnt_--;
if (ref_cnt_ == 0) {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
stop();
}
}
private:
- std::vector<ActorId<PowerWorker>> workers_;
- int left_query_;
+ td::vector<td::ActorId<PowerWorker>> workers_;
int ref_cnt_;
+ int left_query_;
int query_size_;
};
-void test_workers(int threads_n, int workers_n, int queries_n, int query_size) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
-
- ConcurrentScheduler sched;
- sched.init(threads_n);
+static void test_workers(int threads_n, int workers_n, int queries_n, int query_size) {
+ td::ConcurrentScheduler sched(threads_n, 0);
- std::vector<ActorId<PowerWorker>> workers;
+ td::vector<td::ActorId<PowerWorker>> workers;
for (int i = 0; i < workers_n; i++) {
int thread_id = threads_n ? i % (threads_n - 1) + 2 : 0;
- workers.push_back(sched.create_actor_unsafe<PowerWorker>(thread_id, "worker" + to_string(i)).release());
+ workers.push_back(sched.create_actor_unsafe<PowerWorker>(thread_id, PSLICE() << "worker" << i).release());
}
- sched.create_actor_unsafe<Manager>(threads_n ? 1 : 0, "manager", queries_n, query_size, std::move(workers)).release();
+ sched.create_actor_unsafe<Manager>(threads_n ? 1 : 0, "Manager", queries_n, query_size, std::move(workers)).release();
sched.start();
while (sched.run_main(10)) {
@@ -129,7 +123,6 @@ void test_workers(int threads_n, int workers_n, int queries_n, int query_size) {
// sched.test_one_thread_run();
}
-} // namespace
TEST(Actors, workers_big_query_one_thread) {
test_workers(0, 10, 1000, 300000);
@@ -144,13 +137,13 @@ TEST(Actors, workers_big_query_nine_threads) {
}
TEST(Actors, workers_small_query_one_thread) {
- test_workers(0, 10, 1000000, 1);
+ test_workers(0, 10, 100000, 1);
}
TEST(Actors, workers_small_query_two_threads) {
- test_workers(2, 10, 1000000, 1);
+ test_workers(2, 10, 100000, 1);
}
TEST(Actors, workers_small_query_nine_threads) {
- test_workers(9, 10, 1000000, 1);
+ test_workers(9, 10, 10000, 1);
}