summaryrefslogtreecommitdiff
path: root/libs/tdlib/td/tdactor
diff options
context:
space:
mode:
Diffstat (limited to 'libs/tdlib/td/tdactor')
-rw-r--r--libs/tdlib/td/tdactor/CMakeLists.txt65
-rw-r--r--libs/tdlib/td/tdactor/example/example.cpp49
-rw-r--r--libs/tdlib/td/tdactor/td/actor/Condition.h47
-rw-r--r--libs/tdlib/td/tdactor/td/actor/MultiPromise.cpp90
-rw-r--r--libs/tdlib/td/tdactor/td/actor/MultiPromise.h116
-rw-r--r--libs/tdlib/td/tdactor/td/actor/PromiseFuture.h570
-rw-r--r--libs/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h70
-rw-r--r--libs/tdlib/td/tdactor/td/actor/SignalSlot.h108
-rw-r--r--libs/tdlib/td/tdactor/td/actor/SleepActor.h33
-rw-r--r--libs/tdlib/td/tdactor/td/actor/Timeout.cpp96
-rw-r--r--libs/tdlib/td/tdactor/td/actor/Timeout.h127
-rw-r--r--libs/tdlib/td/tdactor/td/actor/actor.h14
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/Actor-decl.h120
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/Actor.h153
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h169
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/ActorId.h200
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h119
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/ActorInfo.h201
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp102
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.h93
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/Event.h247
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h87
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/EventFull.h38
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h296
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp496
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl/Scheduler.h397
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h117
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h84
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl2/ActorState.h166
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp11
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.h1508
-rw-r--r--libs/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h32
-rw-r--r--libs/tdlib/td/tdactor/test/actors_bugs.cpp47
-rw-r--r--libs/tdlib/td/tdactor/test/actors_impl2.cpp535
-rw-r--r--libs/tdlib/td/tdactor/test/actors_main.cpp463
-rw-r--r--libs/tdlib/td/tdactor/test/actors_simple.cpp622
-rw-r--r--libs/tdlib/td/tdactor/test/actors_workers.cpp156
37 files changed, 7844 insertions, 0 deletions
diff --git a/libs/tdlib/td/tdactor/CMakeLists.txt b/libs/tdlib/td/tdactor/CMakeLists.txt
new file mode 100644
index 0000000000..c0c83025e5
--- /dev/null
+++ b/libs/tdlib/td/tdactor/CMakeLists.txt
@@ -0,0 +1,65 @@
+cmake_minimum_required(VERSION 3.0.2 FATAL_ERROR)
+
+#SOURCE SETS
+set(TDACTOR_SOURCE
+ td/actor/impl/ConcurrentScheduler.cpp
+ td/actor/impl/Scheduler.cpp
+ td/actor/MultiPromise.cpp
+ td/actor/Timeout.cpp
+
+ td/actor/impl2/Scheduler.cpp
+
+ td/actor/impl/Actor-decl.h
+ td/actor/impl/Actor.h
+ td/actor/impl/ActorId-decl.h
+ td/actor/impl/ActorId.h
+ td/actor/impl/ActorInfo-decl.h
+ td/actor/impl/ActorInfo.h
+ td/actor/impl/EventFull-decl.h
+ td/actor/impl/EventFull.h
+ td/actor/impl/ConcurrentScheduler.h
+ td/actor/impl/Event.h
+ td/actor/impl/Scheduler-decl.h
+ td/actor/impl/Scheduler.h
+ td/actor/Condition.h
+ td/actor/MultiPromise.h
+ td/actor/PromiseFuture.h
+ td/actor/SchedulerLocalStorage.h
+ td/actor/SignalSlot.h
+ td/actor/SleepActor.h
+ td/actor/Timeout.h
+ td/actor/actor.h
+
+ td/actor/impl2/ActorLocker.h
+ td/actor/impl2/ActorSignals.h
+ td/actor/impl2/ActorState.h
+ td/actor/impl2/Scheduler.h
+ td/actor/impl2/SchedulerId.h
+)
+
+set(TDACTOR_TEST_SOURCE
+ ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_impl2.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_main.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_simple.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_workers.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_bugs.cpp
+ PARENT_SCOPE
+)
+
+#RULES
+
+#LIBRARIES
+
+add_library(tdactor STATIC ${TDACTOR_SOURCE})
+target_include_directories(tdactor PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>)
+target_link_libraries(tdactor PUBLIC tdutils)
+
+add_executable(example example/example.cpp)
+target_link_libraries(example PRIVATE tdactor)
+
+install(TARGETS tdactor EXPORT TdTargets
+ LIBRARY DESTINATION lib
+ ARCHIVE DESTINATION lib
+ RUNTIME DESTINATION bin
+ INCLUDES DESTINATION include
+)
diff --git a/libs/tdlib/td/tdactor/example/example.cpp b/libs/tdlib/td/tdactor/example/example.cpp
new file mode 100644
index 0000000000..4c2415c5e2
--- /dev/null
+++ b/libs/tdlib/td/tdactor/example/example.cpp
@@ -0,0 +1,49 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/actor/actor.h"
+
+#include "td/utils/logging.h"
+
+class Worker : public td::Actor {
+ public:
+ void ping(int x) {
+ LOG(ERROR) << "got ping " << x;
+ }
+};
+
+class MainActor : public td::Actor {
+ public:
+ void start_up() override {
+ LOG(ERROR) << "start up";
+ set_timeout_in(10);
+ worker_ = td::create_actor_on_scheduler<Worker>("Worker", 1);
+ send_closure(worker_, &Worker::ping, 123);
+ }
+
+ void timeout_expired() override {
+ LOG(ERROR) << "timeout expired";
+ td::Scheduler::instance()->finish();
+ }
+
+ private:
+ td::ActorOwn<Worker> worker_;
+};
+
+int main(void) {
+ td::ConcurrentScheduler scheduler;
+ scheduler.init(4 /*threads_count*/);
+ scheduler.start();
+ {
+ auto guard = scheduler.get_current_guard();
+ td::create_actor_on_scheduler<MainActor>("Main actor", 0).release();
+ }
+ while (!scheduler.is_finished()) {
+ scheduler.run_main(10);
+ }
+ scheduler.finish();
+ return 0;
+}
diff --git a/libs/tdlib/td/tdactor/td/actor/Condition.h b/libs/tdlib/td/tdactor/td/actor/Condition.h
new file mode 100644
index 0000000000..c3799df487
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/Condition.h
@@ -0,0 +1,47 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/actor.h"
+
+#include "td/utils/logging.h"
+
+namespace td {
+class Condition {
+ class Helper : public Actor {
+ public:
+ void wait(Promise<> promise) {
+ pending_promises_.push_back(std::move(promise));
+ }
+
+ private:
+ std::vector<Promise<>> pending_promises_;
+ void tear_down() override {
+ for (auto &promise : pending_promises_) {
+ promise.set_value(Unit());
+ }
+ }
+ };
+
+ public:
+ Condition() {
+ own_actor_ = create_actor<Helper>("helper");
+ actor_ = own_actor_.get();
+ }
+ void wait(Promise<> promise) {
+ send_closure(actor_, &Helper::wait, std::move(promise));
+ }
+ void set_true() {
+ CHECK(!own_actor_.empty());
+ own_actor_.reset();
+ }
+
+ private:
+ ActorId<Helper> actor_;
+ ActorOwn<Helper> own_actor_;
+};
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/MultiPromise.cpp b/libs/tdlib/td/tdactor/td/actor/MultiPromise.cpp
new file mode 100644
index 0000000000..0d98f5cfb4
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/MultiPromise.cpp
@@ -0,0 +1,90 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/actor/MultiPromise.h"
+
+namespace td {
+void MultiPromiseActor::add_promise(Promise<Unit> &&promise) {
+ promises_.emplace_back(std::move(promise));
+}
+
+Promise<Unit> MultiPromiseActor::get_promise() {
+ if (empty()) {
+ register_actor("MultiPromise", this).release();
+ }
+ CHECK(!promises_.empty());
+
+ PromiseActor<Unit> promise;
+ FutureActor<Unit> future;
+ init_promise_future(&promise, &future);
+
+ future.set_event(EventCreator::raw(actor_id(), nullptr));
+ futures_.emplace_back(std::move(future));
+ return PromiseCreator::from_promise_actor(std::move(promise));
+}
+
+void MultiPromiseActor::raw_event(const Event::Raw &event) {
+ received_results_++;
+ if (received_results_ == futures_.size()) {
+ if (!ignore_errors_) {
+ for (auto &future : futures_) {
+ auto result = future.move_as_result();
+ if (result.is_error()) {
+ return set_result(result.move_as_error());
+ }
+ }
+ }
+ return set_result(Unit());
+ }
+}
+
+void MultiPromiseActor::set_ignore_errors(bool ignore_errors) {
+ ignore_errors_ = ignore_errors;
+}
+
+void MultiPromiseActor::set_result(Result<Unit> &&result) {
+ // MultiPromiseActor should be cleared before he begins to send out result
+ auto promises_copy = std::move(promises_);
+ promises_.clear();
+ auto futures_copy = std::move(futures_);
+ futures_.clear();
+ received_results_ = 0;
+ stop();
+
+ if (!promises_copy.empty()) {
+ for (size_t i = 0; i + 1 < promises_copy.size(); i++) {
+ promises_copy[i].set_result(result.clone());
+ }
+ promises_copy.back().set_result(std::move(result));
+ }
+}
+
+size_t MultiPromiseActor::promise_count() const {
+ return promises_.size();
+}
+
+void MultiPromiseActorSafe::add_promise(Promise<Unit> &&promise) {
+ multi_promise_->add_promise(std::move(promise));
+}
+
+Promise<Unit> MultiPromiseActorSafe::get_promise() {
+ return multi_promise_->get_promise();
+}
+
+void MultiPromiseActorSafe::set_ignore_errors(bool ignore_errors) {
+ multi_promise_->set_ignore_errors(ignore_errors);
+}
+
+size_t MultiPromiseActorSafe::promise_count() const {
+ return multi_promise_->promise_count();
+}
+
+MultiPromiseActorSafe::~MultiPromiseActorSafe() {
+ if (!multi_promise_->empty()) {
+ register_existing_actor(std::move(multi_promise_)).release();
+ }
+}
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/MultiPromise.h b/libs/tdlib/td/tdactor/td/actor/MultiPromise.h
new file mode 100644
index 0000000000..aa28947464
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/MultiPromise.h
@@ -0,0 +1,116 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/actor.h"
+#include "td/actor/PromiseFuture.h"
+
+#include "td/utils/common.h"
+#include "td/utils/logging.h"
+#include "td/utils/Status.h"
+
+namespace td {
+
+class MultiPromiseInterface {
+ public:
+ virtual void add_promise(Promise<> &&promise) = 0;
+ virtual Promise<> get_promise() = 0;
+
+ // deprecated?
+ virtual size_t promise_count() const = 0;
+ virtual void set_ignore_errors(bool ignore_errors) = 0;
+
+ MultiPromiseInterface() = default;
+ MultiPromiseInterface(const MultiPromiseInterface &) = delete;
+ MultiPromiseInterface &operator=(const MultiPromiseInterface &) = delete;
+ MultiPromiseInterface(MultiPromiseInterface &&) = default;
+ MultiPromiseInterface &operator=(MultiPromiseInterface &&) = default;
+ virtual ~MultiPromiseInterface() = default;
+};
+
+class MultiPromise : public MultiPromiseInterface {
+ public:
+ void add_promise(Promise<> &&promise) override {
+ impl_->add_promise(std::move(promise));
+ }
+ Promise<> get_promise() override {
+ return impl_->get_promise();
+ }
+
+ // deprecated?
+ size_t promise_count() const override {
+ return impl_->promise_count();
+ }
+ void set_ignore_errors(bool ignore_errors) override {
+ impl_->set_ignore_errors(ignore_errors);
+ }
+
+ MultiPromise() = default;
+ explicit MultiPromise(std::unique_ptr<MultiPromiseInterface> impl) : impl_(std::move(impl)) {
+ }
+
+ private:
+ std::unique_ptr<MultiPromiseInterface> impl_;
+};
+
+class MultiPromiseActor final
+ : public Actor
+ , public MultiPromiseInterface {
+ public:
+ MultiPromiseActor() = default;
+
+ void add_promise(Promise<Unit> &&promise) override;
+
+ Promise<Unit> get_promise() override;
+
+ void set_ignore_errors(bool ignore_errors) override;
+
+ size_t promise_count() const override;
+
+ private:
+ void set_result(Result<Unit> &&result);
+
+ vector<Promise<Unit>> promises_; // promises waiting for result
+ vector<FutureActor<Unit>> futures_; // futures waiting for result of the queries
+ size_t received_results_ = 0;
+ bool ignore_errors_ = false;
+
+ void raw_event(const Event::Raw &event) override;
+
+ void on_start_migrate(int32) override {
+ UNREACHABLE();
+ }
+ void on_finish_migrate() override {
+ UNREACHABLE();
+ }
+};
+
+class MultiPromiseActorSafe : public MultiPromiseInterface {
+ public:
+ void add_promise(Promise<Unit> &&promise) override;
+ Promise<Unit> get_promise() override;
+ void set_ignore_errors(bool ignore_errors) override;
+ size_t promise_count() const override;
+ MultiPromiseActorSafe() = default;
+ MultiPromiseActorSafe(const MultiPromiseActorSafe &other) = delete;
+ MultiPromiseActorSafe &operator=(const MultiPromiseActorSafe &other) = delete;
+ MultiPromiseActorSafe(MultiPromiseActorSafe &&other) = delete;
+ MultiPromiseActorSafe &operator=(MultiPromiseActorSafe &&other) = delete;
+ ~MultiPromiseActorSafe() override;
+
+ private:
+ std::unique_ptr<MultiPromiseActor> multi_promise_ = std::make_unique<MultiPromiseActor>();
+};
+
+class MultiPromiseCreator {
+ public:
+ static MultiPromise create() {
+ return MultiPromise(std::make_unique<MultiPromiseActor>());
+ }
+};
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/PromiseFuture.h b/libs/tdlib/td/tdactor/td/actor/PromiseFuture.h
new file mode 100644
index 0000000000..63156c3838
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/PromiseFuture.h
@@ -0,0 +1,570 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/actor.h"
+
+#include "td/utils/Closure.h"
+#include "td/utils/common.h"
+#include "td/utils/invoke.h" // for tuple_for_each
+#include "td/utils/logging.h"
+#include "td/utils/ScopeGuard.h"
+#include "td/utils/Status.h"
+
+#include <tuple>
+#include <type_traits>
+#include <utility>
+
+namespace td {
+template <class T = Unit>
+class PromiseInterface {
+ public:
+ PromiseInterface() = default;
+ PromiseInterface(const PromiseInterface &) = delete;
+ PromiseInterface &operator=(const PromiseInterface &) = delete;
+ PromiseInterface(PromiseInterface &&) = default;
+ PromiseInterface &operator=(PromiseInterface &&) = default;
+ virtual ~PromiseInterface() = default;
+ virtual void set_value(T &&value) {
+ set_result(std::move(value));
+ }
+ virtual void set_error(Status &&error) {
+ set_result(std::move(error));
+ }
+ virtual void set_result(Result<T> &&result) {
+ if (result.is_ok()) {
+ set_value(result.move_as_ok());
+ } else {
+ set_error(result.move_as_error());
+ }
+ }
+ virtual void start_migrate(int32 sched_id) {
+ }
+ virtual void finish_migrate() {
+ }
+};
+
+template <class T = Unit>
+class FutureInterface {
+ public:
+ FutureInterface() = default;
+ FutureInterface(const FutureInterface &) = delete;
+ FutureInterface &operator=(const FutureInterface &) = delete;
+ FutureInterface(FutureInterface &&) = default;
+ FutureInterface &operator=(FutureInterface &&) = default;
+ virtual ~FutureInterface() = default;
+ virtual bool is_ready() = 0;
+ virtual bool is_ok() = 0;
+ virtual bool is_error() = 0;
+ virtual const T &ok() = 0;
+ virtual T move_as_ok() = 0;
+ virtual const Status &error() = 0;
+ virtual Status move_as_error() TD_WARN_UNUSED_RESULT = 0;
+ virtual const Result<T> &result() = 0;
+ virtual Result<T> move_as_result() TD_WARN_UNUSED_RESULT = 0;
+};
+
+template <class T>
+class SafePromise;
+
+template <class T = Unit>
+class Promise {
+ public:
+ void set_value(T &&value) {
+ if (!promise_) {
+ return;
+ }
+ promise_->set_value(std::move(value));
+ promise_.reset();
+ }
+ void set_error(Status &&error) {
+ if (!promise_) {
+ return;
+ }
+ promise_->set_error(std::move(error));
+ promise_.reset();
+ }
+ void set_result(Result<T> &&result) {
+ if (!promise_) {
+ return;
+ }
+ promise_->set_result(std::move(result));
+ promise_.reset();
+ }
+ void reset() {
+ promise_.reset();
+ }
+ void start_migrate(int32 sched_id) {
+ if (!promise_) {
+ return;
+ }
+ promise_->start_migrate(sched_id);
+ }
+ void finish_migrate() {
+ if (!promise_) {
+ return;
+ }
+ promise_->finish_migrate();
+ }
+ std::unique_ptr<PromiseInterface<T>> release() {
+ return std::move(promise_);
+ }
+
+ Promise() = default;
+ explicit Promise(std::unique_ptr<PromiseInterface<T>> promise) : promise_(std::move(promise)) {
+ }
+ Promise(SafePromise<T> &&other);
+ Promise &operator=(SafePromise<T> &&other);
+
+ explicit operator bool() {
+ return static_cast<bool>(promise_);
+ }
+
+ private:
+ std::unique_ptr<PromiseInterface<T>> promise_;
+};
+
+template <class T>
+void start_migrate(Promise<T> &promise, int32 sched_id) {
+ // promise.start_migrate(sched_id);
+}
+template <class T>
+void finish_migrate(Promise<T> &promise) {
+ // promise.finish_migrate();
+}
+
+template <class T = Unit>
+class SafePromise {
+ public:
+ SafePromise(Promise<T> promise, Result<T> result) : promise_(std::move(promise)), result_(std::move(result)) {
+ }
+ SafePromise(const SafePromise &other) = delete;
+ SafePromise &operator=(const SafePromise &other) = delete;
+ SafePromise(SafePromise &&other) = default;
+ SafePromise &operator=(SafePromise &&other) = default;
+ ~SafePromise() {
+ if (promise_) {
+ promise_.set_result(std::move(result_));
+ }
+ }
+ Promise<T> release() {
+ return std::move(promise_);
+ }
+
+ private:
+ Promise<T> promise_;
+ Result<T> result_;
+};
+
+template <class T>
+Promise<T>::Promise(SafePromise<T> &&other) : Promise(other.release()) {
+}
+template <class T>
+Promise<T> &Promise<T>::operator=(SafePromise<T> &&other) {
+ *this = other.release();
+ return *this;
+}
+
+namespace detail {
+
+class EventPromise : public PromiseInterface<Unit> {
+ public:
+ void set_value(Unit &&) override {
+ ok_.try_emit();
+ fail_.clear();
+ }
+ void set_error(Status &&) override {
+ do_set_error();
+ }
+
+ EventPromise(const EventPromise &other) = delete;
+ EventPromise &operator=(const EventPromise &other) = delete;
+ EventPromise(EventPromise &&other) = delete;
+ EventPromise &operator=(EventPromise &&other) = delete;
+ ~EventPromise() override {
+ do_set_error();
+ }
+
+ EventPromise() = default;
+ explicit EventPromise(EventFull ok) : ok_(std::move(ok)), use_ok_as_fail_(true) {
+ }
+ EventPromise(EventFull ok, EventFull fail) : ok_(std::move(ok)), fail_(std::move(fail)), use_ok_as_fail_(false) {
+ }
+
+ private:
+ EventFull ok_;
+ EventFull fail_;
+ bool use_ok_as_fail_ = false;
+ void do_set_error() {
+ if (use_ok_as_fail_) {
+ ok_.try_emit();
+ } else {
+ ok_.clear();
+ fail_.try_emit();
+ }
+ }
+};
+
+template <typename T>
+struct GetArg : public GetArg<decltype(&T::operator())> {};
+
+template <class C, class R, class Arg>
+class GetArg<R (C::*)(Arg)> {
+ public:
+ using type = Arg;
+};
+template <class C, class R, class Arg>
+class GetArg<R (C::*)(Arg) const> {
+ public:
+ using type = Arg;
+};
+
+template <class T>
+using get_arg_t = std::decay_t<typename GetArg<T>::type>;
+
+template <class T>
+struct DropResult {
+ using type = T;
+};
+
+template <class T>
+struct DropResult<Result<T>> {
+ using type = T;
+};
+
+template <class T>
+using drop_result_t = typename DropResult<T>::type;
+
+template <class ValueT, class FunctionOkT, class FunctionFailT>
+class LambdaPromise : public PromiseInterface<ValueT> {
+ enum OnFail { None, Ok, Fail };
+
+ public:
+ void set_value(ValueT &&value) override {
+ ok_(std::move(value));
+ on_fail_ = None;
+ }
+ void set_error(Status &&error) override {
+ do_error(std::move(error));
+ }
+ LambdaPromise(const LambdaPromise &other) = delete;
+ LambdaPromise &operator=(const LambdaPromise &other) = delete;
+ LambdaPromise(LambdaPromise &&other) = delete;
+ LambdaPromise &operator=(LambdaPromise &&other) = delete;
+ ~LambdaPromise() override {
+ do_error(Status::Error("Lost promise"));
+ }
+
+ template <class FromOkT, class FromFailT>
+ LambdaPromise(FromOkT &&ok, FromFailT &&fail, bool use_ok_as_fail)
+ : ok_(std::forward<FromOkT>(ok)), fail_(std::forward<FromFailT>(fail)), on_fail_(use_ok_as_fail ? Ok : Fail) {
+ }
+
+ private:
+ FunctionOkT ok_;
+ FunctionFailT fail_;
+ OnFail on_fail_ = None;
+
+ template <class FuncT, class ArgT = detail::get_arg_t<FuncT>>
+ std::enable_if_t<std::is_assignable<ArgT, Status>::value> do_error_impl(FuncT &func, Status &&status) {
+ func(std::move(status));
+ }
+
+ template <class FuncT, class ArgT = detail::get_arg_t<FuncT>>
+ std::enable_if_t<!std::is_assignable<ArgT, Status>::value> do_error_impl(FuncT &func, Status &&status) {
+ func(Auto());
+ }
+
+ void do_error(Status &&error) {
+ switch (on_fail_) {
+ case None:
+ break;
+ case Ok:
+ do_error_impl(ok_, std::move(error));
+ break;
+ case Fail:
+ fail_(std::move(error));
+ break;
+ }
+ on_fail_ = None;
+ }
+};
+
+template <class... ArgsT>
+class JoinPromise : public PromiseInterface<Unit> {
+ public:
+ explicit JoinPromise(ArgsT &&... arg) : promises_(std::forward<ArgsT>(arg)...) {
+ }
+ void set_value(Unit &&) override {
+ tuple_for_each(promises_, [](auto &promise) { promise.set_value(Unit()); });
+ }
+ void set_error(Status &&error) override {
+ tuple_for_each(promises_, [&error](auto &promise) { promise.set_error(error.clone()); });
+ }
+
+ private:
+ std::tuple<std::decay_t<ArgsT>...> promises_;
+};
+} // namespace detail
+
+/*** FutureActor and PromiseActor ***/
+template <class T>
+class FutureActor;
+
+template <class T>
+class PromiseActor;
+
+template <class T>
+class ActorTraits<FutureActor<T>> {
+ public:
+ static constexpr bool is_lite = true;
+};
+
+template <class T>
+class PromiseActor final : public PromiseInterface<T> {
+ friend class FutureActor<T>;
+ enum State { Waiting, Hangup };
+
+ public:
+ PromiseActor() = default;
+ PromiseActor(const PromiseActor &other) = delete;
+ PromiseActor &operator=(const PromiseActor &other) = delete;
+ PromiseActor(PromiseActor &&) = default;
+ PromiseActor &operator=(PromiseActor &&) = default;
+ ~PromiseActor() override {
+ close();
+ }
+
+ void set_value(T &&value) override;
+ void set_error(Status &&error) override;
+
+ void close() {
+ future_id_.reset();
+ }
+
+ // NB: if true is returned no further events will be sent
+ bool is_hangup() {
+ if (state_ == State::Hangup) {
+ return true;
+ }
+ if (!future_id_.is_alive()) {
+ state_ = State::Hangup;
+ future_id_.release();
+ event_.clear();
+ return true;
+ }
+ return false;
+ }
+
+ template <class S>
+ friend void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future);
+
+ bool empty_promise() {
+ return future_id_.empty();
+ }
+ bool empty() {
+ return future_id_.empty();
+ }
+
+ private:
+ ActorOwn<FutureActor<T>> future_id_;
+ EventFull event_;
+ State state_;
+
+ void init() {
+ state_ = State::Waiting;
+ event_.clear();
+ }
+};
+
+template <class T>
+class FutureActor final : public Actor {
+ friend class PromiseActor<T>;
+ enum State { Waiting, Ready };
+
+ public:
+ FutureActor() = default;
+
+ FutureActor(const FutureActor &other) = delete;
+ FutureActor &operator=(const FutureActor &other) = delete;
+
+ FutureActor(FutureActor &&other) = default;
+ FutureActor &operator=(FutureActor &&other) = default;
+
+ ~FutureActor() override = default;
+
+ bool is_ok() const {
+ return is_ready() && result_.is_ok();
+ }
+ bool is_error() const {
+ CHECK(is_ready());
+ return is_ready() && result_.is_error();
+ }
+ T move_as_ok() {
+ return move_as_result().move_as_ok();
+ }
+ Status move_as_error() TD_WARN_UNUSED_RESULT {
+ return move_as_result().move_as_error();
+ }
+ Result<T> move_as_result() TD_WARN_UNUSED_RESULT {
+ CHECK(is_ready());
+ SCOPE_EXIT {
+ do_stop();
+ };
+ return std::move(result_);
+ }
+ bool is_ready() const {
+ return !empty() && state_ == State::Ready;
+ }
+
+ void close() {
+ event_.clear();
+ result_.clear();
+ do_stop();
+ }
+
+ void set_event(EventFull &&event) {
+ CHECK(!empty());
+ event_ = std::move(event);
+ if (state_ != State::Waiting) {
+ event_.try_emit_later();
+ }
+ }
+
+ template <class S>
+ friend void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future);
+
+ private:
+ EventFull event_;
+ Result<T> result_;
+ State state_;
+
+ void set_value(T &&value) {
+ set_result(std::move(value));
+ }
+
+ void set_error(Status &&error) {
+ set_result(std::move(error));
+ }
+
+ void set_result(Result<T> &&result) {
+ CHECK(state_ == State::Waiting);
+ result_ = std::move(result);
+ state_ = State::Ready;
+
+ event_.try_emit_later();
+ }
+
+ void hangup() override {
+ set_error(Status::Hangup());
+ }
+
+ void start_up() override {
+ // empty
+ }
+
+ void init() {
+ CHECK(empty());
+ state_ = State::Waiting;
+ event_.clear();
+ }
+};
+
+template <class T>
+void PromiseActor<T>::set_value(T &&value) {
+ if (state_ == State::Waiting && !future_id_.empty()) {
+ send_closure(std::move(future_id_), &FutureActor<T>::set_value, std::move(value));
+ }
+}
+template <class T>
+void PromiseActor<T>::set_error(Status &&error) {
+ if (state_ == State::Waiting && !future_id_.empty()) {
+ send_closure(std::move(future_id_), &FutureActor<T>::set_error, std::move(error));
+ }
+}
+
+template <class S>
+void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future) {
+ promise->init();
+ future->init();
+ promise->future_id_ = register_actor("FutureActor", future);
+
+ CHECK(future->get_info() != nullptr);
+}
+
+template <class T>
+class PromiseFuture {
+ public:
+ PromiseFuture() {
+ init_promise_future(&promise_, &future_);
+ }
+ PromiseActor<T> &promise() {
+ return promise_;
+ }
+ FutureActor<T> &future() {
+ return future_;
+ }
+ PromiseActor<T> &&move_promise() {
+ return std::move(promise_);
+ }
+ FutureActor<T> &&move_future() {
+ return std::move(future_);
+ }
+
+ private:
+ PromiseActor<T> promise_;
+ FutureActor<T> future_;
+};
+
+template <class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, class... ArgsT>
+FutureActor<T> send_promise(ActorId<ActorAT> actor_id, Send::Flags flags,
+ ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...), ArgsT &&... args) {
+ PromiseFuture<T> pf;
+ ::td::Scheduler::instance()->send_closure(
+ std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...), flags);
+ return pf.move_future();
+}
+
+class PromiseCreator {
+ public:
+ struct Ignore {
+ void operator()(Status &&error) {
+ error.ignore();
+ }
+ };
+
+ template <class OkT, class ArgT = detail::drop_result_t<detail::get_arg_t<OkT>>>
+ static Promise<ArgT> lambda(OkT &&ok) {
+ return Promise<ArgT>(std::make_unique<detail::LambdaPromise<ArgT, std::decay_t<OkT>, Ignore>>(std::forward<OkT>(ok),
+ Ignore(), true));
+ }
+
+ template <class OkT, class FailT, class ArgT = detail::get_arg_t<OkT>>
+ static Promise<ArgT> lambda(OkT &&ok, FailT &&fail) {
+ return Promise<ArgT>(std::make_unique<detail::LambdaPromise<ArgT, std::decay_t<OkT>, std::decay_t<FailT>>>(
+ std::forward<OkT>(ok), std::forward<FailT>(fail), false));
+ }
+
+ static Promise<> event(EventFull &&ok) {
+ return Promise<>(std::make_unique<detail::EventPromise>(std::move(ok)));
+ }
+
+ static Promise<> event(EventFull ok, EventFull fail) {
+ return Promise<>(std::make_unique<detail::EventPromise>(std::move(ok), std::move(fail)));
+ }
+
+ template <class... ArgsT>
+ static Promise<> join(ArgsT &&... args) {
+ return Promise<>(std::make_unique<detail::JoinPromise<ArgsT...>>(std::forward<ArgsT>(args)...));
+ }
+
+ template <class T>
+ static Promise<T> from_promise_actor(PromiseActor<T> &&from) {
+ return Promise<T>(std::make_unique<PromiseActor<T>>(std::move(from)));
+ }
+};
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h b/libs/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h
new file mode 100644
index 0000000000..f505836a16
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h
@@ -0,0 +1,70 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/actor.h"
+
+#include "td/utils/logging.h"
+#include "td/utils/optional.h"
+
+#include <functional>
+
+namespace td {
+template <class T>
+class SchedulerLocalStorage {
+ public:
+ SchedulerLocalStorage() : data_(Scheduler::instance()->sched_count()) {
+ }
+ T &get() {
+ return data_[Scheduler::instance()->sched_id()];
+ }
+ template <class F>
+ void for_each(F &&f) {
+ for (auto &value : data_) {
+ f(value);
+ }
+ }
+ template <class F>
+ void for_each(F &&f) const {
+ for (const auto &value : data_) {
+ f(value);
+ }
+ }
+
+ private:
+ std::vector<T> data_;
+};
+
+template <class T>
+class LazySchedulerLocalStorage {
+ public:
+ LazySchedulerLocalStorage() = default;
+ explicit LazySchedulerLocalStorage(std::function<T()> create_func) : create_func_(std::move(create_func)) {
+ }
+
+ T &get() {
+ auto &optional_value_ = sls_optional_value_.get();
+ if (!optional_value_) {
+ CHECK(create_func_);
+ optional_value_ = create_func_();
+ }
+ return *optional_value_;
+ }
+ void clear_values() {
+ sls_optional_value_.for_each([&](auto &optional_value) {
+ if (optional_value) {
+ optional_value = optional<T>();
+ }
+ });
+ }
+
+ private:
+ std::function<T()> create_func_;
+ SchedulerLocalStorage<optional<T>> sls_optional_value_;
+};
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/SignalSlot.h b/libs/tdlib/td/tdactor/td/actor/SignalSlot.h
new file mode 100644
index 0000000000..73b48f58ed
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/SignalSlot.h
@@ -0,0 +1,108 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/actor.h"
+
+namespace td {
+class Slot;
+class Signal {
+ public:
+ void emit();
+
+ explicit Signal(ActorId<Slot> slot_id) : slot_id_(slot_id) {
+ }
+
+ private:
+ ActorId<Slot> slot_id_;
+};
+class Slot final : public Actor {
+ public:
+ Slot() = default;
+ Slot(const Slot &other) = delete;
+ Slot &operator=(const Slot &other) = delete;
+ Slot(Slot &&) = default;
+ Slot &operator=(Slot &&) = default;
+ ~Slot() override {
+ close();
+ }
+ void set_event(EventFull &&event) {
+ was_signal_ = false;
+ event_ = std::move(event);
+ }
+
+ bool has_event() {
+ return !event_.empty();
+ }
+
+ bool was_signal() {
+ return was_signal_;
+ }
+
+ void clear_event() {
+ event_.clear();
+ }
+
+ void close() {
+ if (!empty()) {
+ do_stop();
+ }
+ }
+
+ void set_timeout_in(double timeout_in) {
+ register_if_empty();
+ Actor::set_timeout_in(timeout_in);
+ }
+ void set_timeout_at(double timeout_at) {
+ register_if_empty();
+ Actor::set_timeout_at(timeout_at);
+ }
+
+ friend class Signal;
+ Signal get_signal() {
+ register_if_empty();
+ return Signal(actor_id(this));
+ }
+ ActorShared<> get_signal_new() {
+ register_if_empty();
+ return actor_shared();
+ }
+
+ private:
+ bool was_signal_ = false;
+ EventFull event_;
+
+ void timeout_expired() override {
+ signal();
+ }
+
+ void start_up() override {
+ empty();
+ }
+
+ void register_if_empty() {
+ if (empty()) {
+ register_actor("Slot", this).release();
+ }
+ }
+
+ // send event only once
+ void signal() {
+ if (!was_signal_) {
+ was_signal_ = true;
+ event_.try_emit_later();
+ }
+ }
+ void hangup_shared() override {
+ signal();
+ }
+};
+inline void Signal::emit() {
+ send_closure(slot_id_, &Slot::signal);
+}
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/SleepActor.h b/libs/tdlib/td/tdactor/td/actor/SleepActor.h
new file mode 100644
index 0000000000..9b9981ec38
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/SleepActor.h
@@ -0,0 +1,33 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/actor.h"
+
+#include "td/actor/PromiseFuture.h"
+
+namespace td {
+
+class SleepActor : public Actor {
+ public:
+ SleepActor(double timeout, Promise<> promise) : timeout_(timeout), promise_(std::move(promise)) {
+ }
+
+ private:
+ double timeout_;
+ Promise<> promise_;
+
+ void start_up() override {
+ set_timeout_in(timeout_);
+ }
+ void timeout_expired() override {
+ promise_.set_value(Unit());
+ stop();
+ }
+};
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/Timeout.cpp b/libs/tdlib/td/tdactor/td/actor/Timeout.cpp
new file mode 100644
index 0000000000..fa2e5ffff3
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/Timeout.cpp
@@ -0,0 +1,96 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/actor/Timeout.h"
+
+#include "td/utils/Time.h"
+
+namespace td {
+
+bool MultiTimeout::has_timeout(int64 key) const {
+ return items_.find(Item(key)) != items_.end();
+}
+
+void MultiTimeout::set_timeout_at(int64 key, double timeout) {
+ LOG(DEBUG) << "Set timeout for " << key << " in " << timeout - Time::now();
+ auto item = items_.emplace(key);
+ auto heap_node = static_cast<HeapNode *>(const_cast<Item *>(&*item.first));
+ if (heap_node->in_heap()) {
+ CHECK(!item.second);
+ bool need_update_timeout = heap_node->is_top();
+ timeout_queue_.fix(timeout, heap_node);
+ if (need_update_timeout || heap_node->is_top()) {
+ update_timeout();
+ }
+ } else {
+ CHECK(item.second);
+ timeout_queue_.insert(timeout, heap_node);
+ if (heap_node->is_top()) {
+ update_timeout();
+ }
+ }
+}
+
+void MultiTimeout::add_timeout_at(int64 key, double timeout) {
+ LOG(DEBUG) << "Add timeout for " << key << " in " << timeout - Time::now();
+ auto item = items_.emplace(key);
+ auto heap_node = static_cast<HeapNode *>(const_cast<Item *>(&*item.first));
+ if (heap_node->in_heap()) {
+ CHECK(!item.second);
+ } else {
+ CHECK(item.second);
+ timeout_queue_.insert(timeout, heap_node);
+ if (heap_node->is_top()) {
+ update_timeout();
+ }
+ }
+}
+
+void MultiTimeout::cancel_timeout(int64 key) {
+ LOG(DEBUG) << "Cancel timeout for " << key;
+ auto item = items_.find(Item(key));
+ if (item != items_.end()) {
+ auto heap_node = static_cast<HeapNode *>(const_cast<Item *>(&*item));
+ CHECK(heap_node->in_heap());
+ bool need_update_timeout = heap_node->is_top();
+ timeout_queue_.erase(heap_node);
+ items_.erase(item);
+
+ if (need_update_timeout) {
+ update_timeout();
+ }
+ }
+}
+
+void MultiTimeout::update_timeout() {
+ if (items_.empty()) {
+ LOG(DEBUG) << "Cancel timeout";
+ CHECK(timeout_queue_.empty());
+ CHECK(Actor::has_timeout());
+ Actor::cancel_timeout();
+ } else {
+ LOG(DEBUG) << "Set timeout in " << timeout_queue_.top_key() - Time::now_cached();
+ Actor::set_timeout_at(timeout_queue_.top_key());
+ }
+}
+
+void MultiTimeout::timeout_expired() {
+ double now = Time::now_cached();
+ while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) {
+ int64 key = static_cast<Item *>(timeout_queue_.pop())->key;
+ items_.erase(Item(key));
+ expired_.push_back(key);
+ }
+ if (!items_.empty()) {
+ update_timeout();
+ }
+ for (auto key : expired_) {
+ callback_(data_, key);
+ }
+ expired_.clear();
+}
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/Timeout.h b/libs/tdlib/td/tdactor/td/actor/Timeout.h
new file mode 100644
index 0000000000..a3a9ba1913
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/Timeout.h
@@ -0,0 +1,127 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/actor.h"
+
+#include "td/utils/Heap.h"
+#include "td/utils/logging.h"
+#include "td/utils/Time.h"
+
+#include <set>
+
+namespace td {
+class Timeout final : public Actor {
+ public:
+ using Data = void *;
+ using Callback = void (*)(Data);
+ Timeout() {
+ register_actor("Timeout", this).release();
+ }
+
+ void set_callback(Callback callback) {
+ callback_ = callback;
+ }
+ void set_callback_data(Data &&data) {
+ data_ = data;
+ }
+
+ bool has_timeout() const {
+ return Actor::has_timeout();
+ }
+ void set_timeout_in(double timeout) {
+ Actor::set_timeout_in(timeout);
+ }
+ void cancel_timeout() {
+ if (has_timeout()) {
+ Actor::cancel_timeout();
+ callback_ = Callback();
+ data_ = Data();
+ }
+ }
+
+ private:
+ friend class Scheduler;
+
+ Callback callback_;
+ Data data_;
+
+ void set_timeout_at(double timeout) {
+ Actor::set_timeout_at(timeout);
+ }
+
+ void timeout_expired() override {
+ CHECK(!has_timeout());
+ CHECK(callback_ != Callback());
+ Callback callback = callback_;
+ Data data = data_;
+ callback_ = Callback();
+ data_ = Data();
+
+ callback(data);
+ }
+};
+
+// TODO optimize
+class MultiTimeout final : public Actor {
+ struct Item : public HeapNode {
+ int64 key;
+
+ explicit Item(int64 key) : key(key) {
+ }
+
+ bool operator<(const Item &other) const {
+ return key < other.key;
+ }
+ };
+
+ public:
+ using Data = void *;
+ using Callback = void (*)(Data, int64);
+ MultiTimeout() {
+ register_actor("MultiTimeout", this).release();
+ }
+
+ void set_callback(Callback callback) {
+ callback_ = callback;
+ }
+ void set_callback_data(Data data) {
+ data_ = data;
+ }
+
+ bool has_timeout(int64 key) const;
+
+ void set_timeout_in(int64 key, double timeout) {
+ set_timeout_at(key, Time::now() + timeout);
+ }
+
+ void add_timeout_in(int64 key, double timeout) {
+ add_timeout_at(key, Time::now() + timeout);
+ }
+
+ void set_timeout_at(int64 key, double timeout);
+
+ void add_timeout_at(int64 key, double timeout); // memcache semantics, doesn't replace old timeout
+
+ void cancel_timeout(int64 key);
+
+ private:
+ friend class Scheduler;
+
+ Callback callback_;
+ Data data_;
+
+ KHeap<double> timeout_queue_;
+ std::set<Item> items_;
+ std::vector<int64> expired_;
+
+ void update_timeout();
+
+ void timeout_expired() override;
+};
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/actor.h b/libs/tdlib/td/tdactor/td/actor/actor.h
new file mode 100644
index 0000000000..dadfadc055
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/actor.h
@@ -0,0 +1,14 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl/Actor.h"
+#include "td/actor/impl/ActorId.h"
+#include "td/actor/impl/ActorInfo.h"
+#include "td/actor/impl/ConcurrentScheduler.h"
+#include "td/actor/impl/EventFull.h"
+#include "td/actor/impl/Scheduler.h"
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/Actor-decl.h b/libs/tdlib/td/tdactor/td/actor/impl/Actor-decl.h
new file mode 100644
index 0000000000..4342214800
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/Actor-decl.h
@@ -0,0 +1,120 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl/ActorId-decl.h"
+#include "td/actor/impl/ActorInfo-decl.h"
+#include "td/actor/impl/Event.h"
+
+#include "td/utils/ObjectPool.h"
+#include "td/utils/Observer.h"
+#include "td/utils/Slice.h"
+
+#include <memory>
+
+namespace td {
+
+class Actor : public ObserverBase {
+ public:
+ using Deleter = ActorInfo::Deleter;
+ Actor() = default;
+ Actor(const Actor &) = delete;
+ Actor &operator=(const Actor &) = delete;
+ Actor(Actor &&other);
+ Actor &operator=(Actor &&other);
+ ~Actor() override {
+ if (!empty()) {
+ do_stop();
+ }
+ }
+
+ virtual void start_up() {
+ yield();
+ }
+ virtual void tear_down() {
+ }
+ virtual void wakeup() {
+ loop();
+ }
+ virtual void hangup() {
+ stop();
+ }
+ virtual void hangup_shared() {
+ // ignore
+ }
+ virtual void timeout_expired() {
+ loop();
+ }
+ virtual void raw_event(const Event::Raw &event) {
+ }
+ virtual void loop() {
+ }
+
+ // TODO: not called in events. Can't use stop, or migrate inside of them
+ virtual void on_start_migrate(int32 sched_id) {
+ }
+ virtual void on_finish_migrate() {
+ }
+
+ void notify() override;
+
+ // proxy to scheduler
+ void yield();
+ void stop();
+ void do_stop();
+ bool has_timeout() const;
+ void set_timeout_in(double timeout_in);
+ void set_timeout_at(double timeout_at);
+ void cancel_timeout();
+ void migrate(int32 sched_id);
+ void do_migrate(int32 sched_id);
+
+ uint64 get_link_token();
+ void set_context(std::shared_ptr<ActorContext> context);
+ void set_tag(CSlice tag);
+
+ void always_wait_for_mailbox();
+
+ // for ActorInfo mostly
+ void init(ObjectPool<ActorInfo>::OwnerPtr &&info);
+ ActorInfo *get_info();
+ const ActorInfo *get_info() const;
+ ObjectPool<ActorInfo>::OwnerPtr clear();
+
+ bool empty() const;
+
+ template <class FuncT, class... ArgsT>
+ auto self_closure(FuncT &&func, ArgsT &&... args);
+
+ template <class SelfT, class FuncT, class... ArgsT>
+ auto self_closure(SelfT *self, FuncT &&func, ArgsT &&... args);
+
+ template <class LambdaT>
+ auto self_lambda(LambdaT &&lambda);
+
+ // proxy to info_
+ ActorId<> actor_id();
+ template <class SelfT>
+ ActorId<SelfT> actor_id(SelfT *self);
+
+ ActorShared<> actor_shared();
+ template <class SelfT>
+ ActorShared<SelfT> actor_shared(SelfT *self, uint64 id = static_cast<uint64>(-1));
+
+ Slice get_name() const;
+
+ private:
+ ObjectPool<ActorInfo>::OwnerPtr info_;
+};
+
+template <class ActorT>
+class ActorTraits {
+ public:
+ static constexpr bool is_lite = false;
+};
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/Actor.h b/libs/tdlib/td/tdactor/td/actor/impl/Actor.h
new file mode 100644
index 0000000000..3fe5e20abf
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/Actor.h
@@ -0,0 +1,153 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl/Actor-decl.h"
+#include "td/actor/impl/EventFull-decl.h"
+#include "td/actor/impl/Scheduler-decl.h"
+
+#include "td/utils/logging.h"
+#include "td/utils/ObjectPool.h"
+#include "td/utils/Slice.h"
+
+#include <memory>
+#include <type_traits>
+#include <utility>
+
+namespace td {
+inline Actor::Actor(Actor &&other) {
+ CHECK(info_.empty());
+ info_ = std::move(other.info_);
+ if (!empty()) {
+ info_->on_actor_moved(this);
+ }
+}
+inline Actor &Actor::operator=(Actor &&other) {
+ CHECK(info_.empty());
+ info_ = std::move(other.info_);
+ if (!empty()) {
+ info_->on_actor_moved(this);
+ }
+ return *this;
+}
+
+inline void Actor::notify() {
+ yield();
+}
+
+// proxy to scheduler
+inline void Actor::yield() {
+ Scheduler::instance()->yield_actor(this);
+}
+inline void Actor::stop() {
+ Scheduler::instance()->stop_actor(this);
+}
+inline void Actor::do_stop() {
+ Scheduler::instance()->do_stop_actor(this);
+ CHECK(empty());
+}
+inline bool Actor::has_timeout() const {
+ return Scheduler::instance()->has_actor_timeout(this);
+}
+inline void Actor::set_timeout_in(double timeout_in) {
+ Scheduler::instance()->set_actor_timeout_in(this, timeout_in);
+}
+inline void Actor::set_timeout_at(double timeout_at) {
+ Scheduler::instance()->set_actor_timeout_at(this, timeout_at);
+}
+inline void Actor::cancel_timeout() {
+ Scheduler::instance()->cancel_actor_timeout(this);
+}
+inline void Actor::migrate(int32 sched_id) {
+ Scheduler::instance()->migrate_actor(this, sched_id);
+}
+inline void Actor::do_migrate(int32 sched_id) {
+ Scheduler::instance()->do_migrate_actor(this, sched_id);
+}
+
+template <class ActorType>
+std::enable_if_t<std::is_base_of<Actor, ActorType>::value> start_migrate(ActorType &obj, int32 sched_id) {
+ if (!obj.empty()) {
+ Scheduler::instance()->start_migrate_actor(&obj, sched_id);
+ }
+}
+template <class ActorType>
+std::enable_if_t<std::is_base_of<Actor, ActorType>::value> finish_migrate(ActorType &obj) {
+ if (!obj.empty()) {
+ Scheduler::instance()->finish_migrate_actor(&obj);
+ }
+}
+inline uint64 Actor::get_link_token() {
+ return Scheduler::instance()->get_link_token(this);
+}
+inline void Actor::set_context(std::shared_ptr<ActorContext> context) {
+ info_->set_context(std::move(context));
+}
+inline void Actor::set_tag(CSlice tag) {
+ info_->get_context()->tag_ = tag.c_str();
+ Scheduler::on_context_updated();
+}
+
+inline void Actor::init(ObjectPool<ActorInfo>::OwnerPtr &&info) {
+ info_ = std::move(info);
+}
+inline ActorInfo *Actor::get_info() {
+ return &*info_;
+}
+inline const ActorInfo *Actor::get_info() const {
+ return &*info_;
+}
+inline ObjectPool<ActorInfo>::OwnerPtr Actor::clear() {
+ return std::move(info_);
+}
+
+inline bool Actor::empty() const {
+ return info_.empty();
+}
+
+inline ActorId<> Actor::actor_id() {
+ return actor_id(this);
+}
+template <class SelfT>
+ActorId<SelfT> Actor::actor_id(SelfT *self) {
+ CHECK(static_cast<Actor *>(self) == this);
+ return ActorId<SelfT>(info_.get_weak());
+}
+
+inline ActorShared<> Actor::actor_shared() {
+ return actor_shared(this);
+}
+template <class SelfT>
+ActorShared<SelfT> Actor::actor_shared(SelfT *self, uint64 id) {
+ CHECK(static_cast<Actor *>(self) == this);
+ return ActorShared<SelfT>(actor_id(self), id);
+}
+
+template <class FuncT, class... ArgsT>
+auto Actor::self_closure(FuncT &&func, ArgsT &&... args) {
+ return self_closure(this, std::forward<FuncT>(func), std::forward<ArgsT>(args)...);
+}
+
+template <class SelfT, class FuncT, class... ArgsT>
+auto Actor::self_closure(SelfT *self, FuncT &&func, ArgsT &&... args) {
+ return EventCreator::closure(actor_id(self), std::forward<FuncT>(func), std::forward<ArgsT>(args)...);
+}
+
+template <class LambdaT>
+auto Actor::self_lambda(LambdaT &&lambda) {
+ return EventCreator::lambda(actor_id(), std::forward<LambdaT>(lambda));
+}
+
+inline Slice Actor::get_name() const {
+ return info_->get_name();
+}
+
+inline void Actor::always_wait_for_mailbox() {
+ info_->always_wait_for_mailbox();
+}
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h b/libs/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h
new file mode 100644
index 0000000000..5e82ed6a05
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h
@@ -0,0 +1,169 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/utils/ObjectPool.h"
+#include "td/utils/Slice.h"
+
+#include <type_traits>
+
+namespace td {
+class ActorInfo;
+class Actor;
+template <class ActorType = Actor>
+class ActorId {
+ public:
+ using ActorT = ActorType;
+ explicit ActorId(ObjectPool<ActorInfo>::WeakPtr ptr) : ptr_(ptr) {
+ }
+ ActorId() = default;
+ ActorId(const ActorId &) = default;
+ ActorId &operator=(const ActorId &) = default;
+ ActorId(ActorId &&other) : ptr_(other.ptr_) {
+ other.ptr_.clear();
+ }
+ ActorId &operator=(ActorId &&other) {
+ if (&other == this) {
+ return *this;
+ }
+ ptr_ = other.ptr_;
+ other.clear();
+ return *this;
+ }
+ ~ActorId() = default;
+
+ bool empty() const {
+ return ptr_.empty();
+ }
+ void clear() {
+ ptr_.clear();
+ }
+
+ bool is_alive() const {
+ return ptr_.is_alive_unsafe();
+ }
+
+ ActorInfo *get_actor_info() const;
+ ActorType *get_actor_unsafe() const;
+
+ // returns pointer to actor if it is on current thread. nullptr otherwise
+ ActorType *try_get_actor() const;
+
+ Slice get_name() const;
+
+ template <class ToActorType, class = std::enable_if_t<std::is_base_of<ToActorType, ActorType>::value>>
+ explicit operator ActorId<ToActorType>() const {
+ return ActorId<ToActorType>(ptr_);
+ }
+
+ template <class AsActorType>
+ ActorId<AsActorType> as() const {
+ return ActorId<AsActorType>(ptr_);
+ }
+
+ private:
+ ObjectPool<ActorInfo>::WeakPtr ptr_;
+};
+
+// threat ActorId as pointer and ActorOwn as
+// unique_ptr<ActorId>
+template <class ActorType = Actor>
+class ActorOwn {
+ public:
+ using ActorT = ActorType;
+ ActorOwn() = default;
+ explicit ActorOwn(ActorId<ActorType>);
+ template <class OtherActorType>
+ explicit ActorOwn(ActorId<OtherActorType> id);
+ template <class OtherActorType>
+ explicit ActorOwn(ActorOwn<OtherActorType> &&);
+ template <class OtherActorType>
+ ActorOwn &operator=(ActorOwn<OtherActorType> &&);
+ ActorOwn(ActorOwn &&);
+ ActorOwn &operator=(ActorOwn &&);
+ ActorOwn(const ActorOwn &) = delete;
+ ActorOwn &operator=(const ActorOwn &) = delete;
+ ~ActorOwn();
+
+ bool empty() const;
+ bool is_alive() const {
+ return id_.is_alive();
+ }
+ ActorId<ActorType> get() const;
+ ActorId<ActorType> release();
+ void reset(ActorId<ActorType> other = ActorId<ActorType>());
+ void hangup() const;
+ const ActorId<ActorType> *operator->() const;
+
+ using ActorIdConstRef = const ActorId<ActorType> &;
+ // operator ActorIdConstRef();
+
+ private:
+ ActorId<ActorType> id_;
+};
+
+template <class ActorType = Actor>
+class ActorShared {
+ public:
+ using ActorT = ActorType;
+ ActorShared() = default;
+ template <class OtherActorType>
+ ActorShared(ActorId<OtherActorType>, uint64 token);
+ template <class OtherActorType>
+ ActorShared(ActorShared<OtherActorType> &&);
+ template <class OtherActorType>
+ ActorShared(ActorOwn<OtherActorType> &&);
+ template <class OtherActorType>
+ ActorShared &operator=(ActorShared<OtherActorType> &&);
+ ActorShared(ActorShared &&);
+ ActorShared &operator=(ActorShared &&);
+ ActorShared(const ActorShared &) = delete;
+ ActorShared &operator=(const ActorShared &) = delete;
+ ~ActorShared();
+
+ uint64 token() const;
+ bool empty() const;
+ bool is_alive() const {
+ return id_.is_alive();
+ }
+ ActorId<ActorType> get() const;
+ ActorId<ActorType> release();
+ void reset(ActorId<ActorType> other = ActorId<ActorType>());
+ template <class OtherActorType>
+ void reset(ActorId<OtherActorType> other);
+ const ActorId<ActorType> *operator->() const;
+
+ private:
+ ActorId<ActorType> id_;
+ uint64 token_;
+};
+
+class ActorRef {
+ public:
+ ActorRef() = default;
+ template <class T>
+ ActorRef(const ActorId<T> &actor_id);
+ template <class T>
+ ActorRef(const ActorShared<T> &actor_id);
+ template <class T>
+ ActorRef(ActorShared<T> &&actor_id);
+ template <class T>
+ ActorRef(const ActorOwn<T> &actor_id);
+ template <class T>
+ ActorRef(ActorOwn<T> &&actor_id);
+ ActorId<> get() const {
+ return actor_id_;
+ }
+ uint64 token() const {
+ return token_;
+ }
+
+ private:
+ ActorId<> actor_id_;
+ uint64 token_ = 0;
+};
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/ActorId.h b/libs/tdlib/td/tdactor/td/actor/impl/ActorId.h
new file mode 100644
index 0000000000..34d7970633
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/ActorId.h
@@ -0,0 +1,200 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl/ActorId-decl.h"
+#include "td/actor/impl/ActorInfo-decl.h"
+#include "td/actor/impl/Scheduler-decl.h"
+
+#include "td/utils/Slice.h"
+
+namespace td {
+/*** ActorId ***/
+
+// If actor is on our scheduler(thread) result will be valid
+// If actor is on another scheduler we will see it in migrate_dest_flags
+template <class ActorType>
+ActorInfo *ActorId<ActorType>::get_actor_info() const {
+ if (ptr_.is_alive()) {
+ return &*ptr_;
+ }
+ return nullptr;
+}
+
+template <class ActorType>
+ActorType *ActorId<ActorType>::get_actor_unsafe() const {
+ return static_cast<ActorType *>(ptr_->get_actor_unsafe());
+}
+
+template <class ActorType>
+ActorType *ActorId<ActorType>::try_get_actor() const {
+ auto info = get_actor_info();
+ if (info && !info->is_migrating() && Scheduler::instance()->sched_id() == info->migrate_dest()) {
+ return static_cast<ActorType *>(info->get_actor_unsafe());
+ }
+ return nullptr;
+}
+
+template <class ActorType>
+Slice ActorId<ActorType>::get_name() const {
+ return ptr_->get_name();
+}
+
+// ActorOwn
+template <class ActorType>
+ActorOwn<ActorType>::ActorOwn(ActorId<ActorType> id) : id_(std::move(id)) {
+}
+template <class ActorType>
+template <class OtherActorType>
+ActorOwn<ActorType>::ActorOwn(ActorId<OtherActorType> id) : id_(std::move(id)) {
+}
+template <class ActorType>
+template <class OtherActorType>
+ActorOwn<ActorType>::ActorOwn(ActorOwn<OtherActorType> &&other) : id_(other.release()) {
+}
+template <class ActorType>
+template <class OtherActorType>
+ActorOwn<ActorType> &ActorOwn<ActorType>::operator=(ActorOwn<OtherActorType> &&other) {
+ reset(static_cast<ActorId<ActorType>>(other.release()));
+ return *this;
+}
+
+template <class ActorType>
+ActorOwn<ActorType>::ActorOwn(ActorOwn &&other) : id_(other.release()) {
+}
+template <class ActorType>
+ActorOwn<ActorType> &ActorOwn<ActorType>::operator=(ActorOwn &&other) {
+ reset(other.release());
+ return *this;
+}
+
+template <class ActorType>
+ActorOwn<ActorType>::~ActorOwn() {
+ reset();
+}
+
+template <class ActorType>
+bool ActorOwn<ActorType>::empty() const {
+ return id_.empty();
+}
+template <class ActorType>
+ActorId<ActorType> ActorOwn<ActorType>::get() const {
+ return id_;
+}
+
+template <class ActorType>
+ActorId<ActorType> ActorOwn<ActorType>::release() {
+ return std::move(id_);
+}
+template <class ActorType>
+void ActorOwn<ActorType>::reset(ActorId<ActorType> other) {
+ static_assert(sizeof(ActorType) > 0, "Can't use ActorOwn with incomplete type");
+ hangup();
+ id_ = std::move(other);
+}
+
+template <class ActorType>
+void ActorOwn<ActorType>::hangup() const {
+ if (!id_.empty()) {
+ send_event(id_, Event::hangup());
+ }
+}
+template <class ActorType>
+const ActorId<ActorType> *ActorOwn<ActorType>::operator->() const {
+ return &id_;
+}
+
+// ActorShared
+template <class ActorType>
+template <class OtherActorType>
+ActorShared<ActorType>::ActorShared(ActorId<OtherActorType> id, uint64 token) : id_(std::move(id)), token_(token) {
+}
+template <class ActorType>
+template <class OtherActorType>
+ActorShared<ActorType>::ActorShared(ActorShared<OtherActorType> &&other) : id_(other.release()), token_(other.token()) {
+}
+template <class ActorType>
+template <class OtherActorType>
+ActorShared<ActorType>::ActorShared(ActorOwn<OtherActorType> &&other) : id_(other.release()), token_(0) {
+}
+template <class ActorType>
+template <class OtherActorType>
+ActorShared<ActorType> &ActorShared<ActorType>::operator=(ActorShared<OtherActorType> &&other) {
+ reset(other.release());
+ token_ = other.token();
+ return *this;
+}
+
+template <class ActorType>
+ActorShared<ActorType>::ActorShared(ActorShared &&other) : id_(other.release()), token_(other.token_) {
+}
+template <class ActorType>
+ActorShared<ActorType> &ActorShared<ActorType>::operator=(ActorShared &&other) {
+ reset(other.release());
+ token_ = other.token_;
+ return *this;
+}
+
+template <class ActorType>
+ActorShared<ActorType>::~ActorShared() {
+ reset();
+}
+
+template <class ActorType>
+uint64 ActorShared<ActorType>::token() const {
+ return token_;
+}
+template <class ActorType>
+bool ActorShared<ActorType>::empty() const {
+ return id_.empty();
+}
+template <class ActorType>
+ActorId<ActorType> ActorShared<ActorType>::get() const {
+ return id_;
+}
+
+template <class ActorType>
+ActorId<ActorType> ActorShared<ActorType>::release() {
+ return std::move(id_);
+}
+template <class ActorType>
+void ActorShared<ActorType>::reset(ActorId<ActorType> other) {
+ reset<ActorType>(std::move(other));
+}
+
+template <class ActorType>
+template <class OtherActorType>
+void ActorShared<ActorType>::reset(ActorId<OtherActorType> other) {
+ static_assert(sizeof(ActorType) > 0, "Can't use ActorShared with incomplete type");
+ if (!id_.empty()) {
+ send_event(*this, Event::hangup());
+ }
+ id_ = static_cast<ActorId<ActorType>>(other);
+}
+template <class ActorType>
+const ActorId<ActorType> *ActorShared<ActorType>::operator->() const {
+ return &id_;
+}
+
+/*** ActorRef ***/
+template <class T>
+ActorRef::ActorRef(const ActorId<T> &actor_id) : actor_id_(actor_id) {
+}
+template <class T>
+ActorRef::ActorRef(const ActorShared<T> &actor_id) : actor_id_(actor_id.get()), token_(actor_id.token()) {
+}
+template <class T>
+ActorRef::ActorRef(ActorShared<T> &&actor_id) : actor_id_(actor_id.release()), token_(actor_id.token()) {
+}
+template <class T>
+ActorRef::ActorRef(const ActorOwn<T> &actor_id) : actor_id_(actor_id.get()) {
+}
+template <class T>
+ActorRef::ActorRef(ActorOwn<T> &&actor_id) : actor_id_(actor_id.release()) {
+}
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h b/libs/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h
new file mode 100644
index 0000000000..de9fba794e
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h
@@ -0,0 +1,119 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl/ActorId-decl.h"
+#include "td/actor/impl/Event.h"
+
+#include "td/utils/common.h"
+#include "td/utils/Heap.h"
+#include "td/utils/List.h"
+#include "td/utils/ObjectPool.h"
+#include "td/utils/Slice.h"
+#include "td/utils/StringBuilder.h"
+
+#include <atomic>
+#include <memory>
+#include <utility>
+
+namespace td {
+
+class Actor;
+
+class ActorContext {
+ public:
+ ActorContext() = default;
+ ActorContext(const ActorContext &) = delete;
+ ActorContext &operator=(const ActorContext &) = delete;
+ ActorContext(ActorContext &&) = delete;
+ ActorContext &operator=(ActorContext &&) = delete;
+ virtual ~ActorContext() = default;
+ const char *tag_ = nullptr;
+ std::weak_ptr<ActorContext> this_ptr_;
+};
+
+class ActorInfo
+ : private ListNode
+ , HeapNode {
+ public:
+ enum class Deleter : uint8 { Destroy, None };
+
+ ActorInfo() = default;
+ ~ActorInfo() = default;
+
+ ActorInfo(ActorInfo &&) = delete;
+ ActorInfo &operator=(ActorInfo &&) = delete;
+
+ ActorInfo(const ActorInfo &) = delete;
+ ActorInfo &operator=(const ActorInfo &) = delete;
+
+ void init(int32 sched_id, Slice name, ObjectPool<ActorInfo>::OwnerPtr &&this_ptr, Actor *actor_ptr, Deleter deleter,
+ bool is_lite);
+ void on_actor_moved(Actor *actor_new_ptr);
+
+ template <class ActorT>
+ ActorOwn<ActorT> transfer_ownership_to_scheduler(std::unique_ptr<ActorT> actor);
+ void clear();
+ void destroy_actor();
+
+ bool empty() const;
+ void start_migrate(int32 to_sched_id);
+ bool is_migrating() const;
+ int32 migrate_dest() const;
+ std::pair<int32, bool> migrate_dest_flag_atomic() const;
+
+ void finish_migrate();
+
+ ActorId<> actor_id();
+ template <class SelfT>
+ ActorId<SelfT> actor_id(SelfT *self);
+ Actor *get_actor_unsafe();
+ const Actor *get_actor_unsafe() const;
+
+ void set_context(std::shared_ptr<ActorContext> context);
+ ActorContext *get_context();
+ const ActorContext *get_context() const;
+ CSlice get_name() const;
+
+ HeapNode *get_heap_node();
+ const HeapNode *get_heap_node() const;
+ static ActorInfo *from_heap_node(HeapNode *node);
+
+ ListNode *get_list_node();
+ const ListNode *get_list_node() const;
+ static ActorInfo *from_list_node(ListNode *node);
+
+ void start_run();
+ bool is_running() const;
+ void finish_run();
+
+ vector<Event> mailbox_;
+
+ bool is_lite() const;
+
+ void set_wait_generation(uint32 wait_generation);
+ bool must_wait(uint32 wait_generation) const;
+ void always_wait_for_mailbox();
+
+ private:
+ Deleter deleter_;
+ bool is_lite_;
+ bool is_running_;
+ bool always_wait_for_mailbox_{false};
+ uint32 wait_generation_{0};
+
+ std::atomic<int32> sched_id_{0};
+ Actor *actor_ = nullptr;
+
+#ifdef TD_DEBUG
+ string name_;
+#endif
+ std::shared_ptr<ActorContext> context_;
+};
+
+StringBuilder &operator<<(StringBuilder &sb, const ActorInfo &info);
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/ActorInfo.h b/libs/tdlib/td/tdactor/td/actor/impl/ActorInfo.h
new file mode 100644
index 0000000000..df0b0dfd81
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/ActorInfo.h
@@ -0,0 +1,201 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl/Actor-decl.h"
+#include "td/actor/impl/ActorInfo-decl.h"
+#include "td/actor/impl/Scheduler-decl.h"
+
+#include "td/utils/common.h"
+#include "td/utils/format.h"
+#include "td/utils/Heap.h"
+#include "td/utils/List.h"
+#include "td/utils/logging.h"
+#include "td/utils/ObjectPool.h"
+#include "td/utils/Slice.h"
+#include "td/utils/StringBuilder.h"
+
+#include <atomic>
+#include <memory>
+#include <utility>
+
+namespace td {
+/*** ActorInfo ***/
+inline StringBuilder &operator<<(StringBuilder &sb, const ActorInfo &info) {
+ sb << info.get_name() << ":" << const_cast<void *>(static_cast<const void *>(&info)) << ":"
+ << const_cast<void *>(static_cast<const void *>(info.get_context()));
+ return sb;
+}
+inline void ActorInfo::init(int32 sched_id, Slice name, ObjectPool<ActorInfo>::OwnerPtr &&this_ptr, Actor *actor_ptr,
+ Deleter deleter, bool is_lite) {
+ CHECK(!is_running());
+ CHECK(!is_migrating());
+ sched_id_.store(sched_id, std::memory_order_relaxed);
+ actor_ = actor_ptr;
+
+ if (!is_lite) {
+ context_ = Scheduler::context()->this_ptr_.lock();
+#ifdef TD_DEBUG
+ name_ = name.str();
+#endif
+ }
+
+ actor_->init(std::move(this_ptr));
+ deleter_ = deleter;
+ is_lite_ = is_lite;
+ is_running_ = false;
+ wait_generation_ = 0;
+}
+inline bool ActorInfo::is_lite() const {
+ return is_lite_;
+}
+inline void ActorInfo::set_wait_generation(uint32 wait_generation) {
+ wait_generation_ = wait_generation;
+}
+inline bool ActorInfo::must_wait(uint32 wait_generation) const {
+ return wait_generation_ == wait_generation || (always_wait_for_mailbox_ && !mailbox_.empty());
+}
+inline void ActorInfo::always_wait_for_mailbox() {
+ always_wait_for_mailbox_ = true;
+}
+inline void ActorInfo::on_actor_moved(Actor *actor_new_ptr) {
+ actor_ = actor_new_ptr;
+}
+
+inline void ActorInfo::clear() {
+ // LOG_IF(WARNING, !mailbox_.empty()) << "Destroy actor with non-empty mailbox: " << get_name()
+ // << format::as_array(mailbox_);
+ mailbox_.clear();
+ CHECK(!is_running());
+ CHECK(!is_migrating());
+ // NB: must be in non migrating state
+ // store invalid scheduler id.
+ sched_id_.store((1 << 30) - 1, std::memory_order_relaxed);
+ destroy_actor();
+ // Destroy context only after destructor.
+ context_.reset();
+}
+
+inline void ActorInfo::destroy_actor() {
+ if (!actor_) {
+ return;
+ }
+ switch (deleter_) {
+ case Deleter::Destroy:
+ std::default_delete<Actor>()(actor_);
+ break;
+ case Deleter::None:
+ break;
+ }
+ actor_ = nullptr;
+}
+
+template <class ActorT>
+ActorOwn<ActorT> ActorInfo::transfer_ownership_to_scheduler(std::unique_ptr<ActorT> actor) {
+ CHECK(!empty());
+ CHECK(deleter_ == Deleter::None);
+ ActorT *actor_ptr = actor.release();
+ CHECK(actor_ == static_cast<Actor *>(actor_ptr));
+ actor_ = static_cast<Actor *>(actor_ptr);
+ deleter_ = Deleter::Destroy;
+ return ActorOwn<ActorT>(actor_id(actor_ptr));
+}
+
+inline bool ActorInfo::empty() const {
+ return actor_ == nullptr;
+}
+
+inline void ActorInfo::start_migrate(int32 to_sched_id) {
+ sched_id_.store(to_sched_id | (1 << 30), std::memory_order_relaxed);
+}
+inline std::pair<int32, bool> ActorInfo::migrate_dest_flag_atomic() const {
+ int32 sched_id = sched_id_.load(std::memory_order_relaxed);
+ return std::make_pair(sched_id & ~(1 << 30), (sched_id & (1 << 30)) != 0);
+}
+inline void ActorInfo::finish_migrate() {
+ sched_id_.store(migrate_dest(), std::memory_order_relaxed);
+}
+inline bool ActorInfo::is_migrating() const {
+ return migrate_dest_flag_atomic().second;
+}
+inline int32 ActorInfo::migrate_dest() const {
+ return migrate_dest_flag_atomic().first;
+}
+
+inline ActorId<> ActorInfo::actor_id() {
+ return actor_id(actor_);
+}
+
+template <class SelfT>
+ActorId<SelfT> ActorInfo::actor_id(SelfT *self) {
+ return actor_->actor_id(self);
+}
+
+inline Actor *ActorInfo::get_actor_unsafe() {
+ return actor_;
+}
+inline const Actor *ActorInfo::get_actor_unsafe() const {
+ return actor_;
+}
+
+inline void ActorInfo::set_context(std::shared_ptr<ActorContext> context) {
+ CHECK(is_running());
+ context->this_ptr_ = context;
+ context->tag_ = Scheduler::context()->tag_;
+ context_ = std::move(context);
+ Scheduler::context() = context_.get();
+ Scheduler::on_context_updated();
+}
+inline const ActorContext *ActorInfo::get_context() const {
+ return context_.get();
+}
+
+inline ActorContext *ActorInfo::get_context() {
+ return context_.get();
+}
+
+inline CSlice ActorInfo::get_name() const {
+#ifdef TD_DEBUG
+ return name_;
+#else
+ return "";
+#endif
+}
+
+inline void ActorInfo::start_run() {
+ VLOG(actor) << "start_run: " << *this;
+ CHECK(!is_running_) << "Recursive call of actor " << tag("name", get_name());
+ is_running_ = true;
+}
+inline void ActorInfo::finish_run() {
+ is_running_ = false;
+ VLOG(actor) << "stop_run: " << *this;
+}
+
+inline bool ActorInfo::is_running() const {
+ return is_running_;
+}
+
+inline HeapNode *ActorInfo::get_heap_node() {
+ return this;
+}
+inline const HeapNode *ActorInfo::get_heap_node() const {
+ return this;
+}
+inline ActorInfo *ActorInfo::from_heap_node(HeapNode *node) {
+ return static_cast<ActorInfo *>(node);
+}
+inline ListNode *ActorInfo::get_list_node() {
+ return this;
+}
+inline const ListNode *ActorInfo::get_list_node() const {
+ return this;
+}
+inline ActorInfo *ActorInfo::from_list_node(ListNode *node) {
+ return static_cast<ActorInfo *>(node);
+}
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp b/libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp
new file mode 100644
index 0000000000..47593db90b
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp
@@ -0,0 +1,102 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/actor/impl/ConcurrentScheduler.h"
+
+#include "td/actor/impl/Actor.h"
+#include "td/actor/impl/ActorId.h"
+#include "td/actor/impl/ActorInfo.h"
+#include "td/actor/impl/Scheduler.h"
+
+#include "td/utils/MpscPollableQueue.h"
+#include "td/utils/port/thread_local.h"
+
+#include <memory>
+
+namespace td {
+
+void ConcurrentScheduler::init(int32 threads_n) {
+#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
+ threads_n = 0;
+#endif
+ threads_n++;
+ std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound(threads_n);
+ for (int32 i = 0; i < threads_n; i++) {
+#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
+#else
+ auto queue = std::make_shared<MpscPollableQueue<EventFull>>();
+ queue->init();
+ outbound[i] = queue;
+#endif
+ }
+
+ schedulers_.resize(threads_n);
+ for (int32 i = 0; i < threads_n; i++) {
+ auto &sched = schedulers_[i];
+ sched = make_unique<Scheduler>();
+ sched->init(i, outbound, static_cast<Scheduler::Callback *>(this));
+ }
+
+ state_ = State::Start;
+}
+
+void ConcurrentScheduler::test_one_thread_run() {
+ do {
+ for (auto &sched : schedulers_) {
+ sched->run(0);
+ }
+ } while (!is_finished_.load(std::memory_order_relaxed));
+}
+
+void ConcurrentScheduler::start() {
+ CHECK(state_ == State::Start);
+ is_finished_.store(false, std::memory_order_relaxed);
+ set_thread_id(0);
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+ for (size_t i = 1; i < schedulers_.size(); i++) {
+ auto &sched = schedulers_[i];
+ threads_.push_back(td::thread([&, tid = i]() {
+ set_thread_id(static_cast<int32>(tid));
+ while (!is_finished()) {
+ sched->run(10);
+ }
+ }));
+ }
+#endif
+ state_ = State::Run;
+}
+
+bool ConcurrentScheduler::run_main(double timeout) {
+ CHECK(state_ == State::Run);
+ // run main scheduler in same thread
+ auto &main_sched = schedulers_[0];
+ if (!is_finished()) {
+ main_sched->run(timeout);
+ }
+ return !is_finished();
+}
+
+void ConcurrentScheduler::finish() {
+ CHECK(state_ == State::Run);
+ if (!is_finished()) {
+ on_finish();
+ }
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+ for (auto &thread : threads_) {
+ thread.join();
+ }
+ threads_.clear();
+#endif
+ schedulers_.clear();
+ for (auto &f : at_finish_) {
+ f();
+ }
+ at_finish_.clear();
+
+ state_ = State::Start;
+}
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.h b/libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.h
new file mode 100644
index 0000000000..1e9793eab4
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.h
@@ -0,0 +1,93 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl/Scheduler-decl.h"
+
+#include "td/utils/common.h"
+#include "td/utils/logging.h"
+#include "td/utils/port/thread.h"
+#include "td/utils/Slice.h"
+
+#include <atomic>
+#include <functional>
+#include <mutex>
+#include <utility>
+
+namespace td {
+
+class ConcurrentScheduler : private Scheduler::Callback {
+ public:
+ void init(int32 threads_n);
+
+ void finish_async() {
+ schedulers_[0]->finish();
+ }
+ void wakeup() {
+ schedulers_[0]->wakeup();
+ }
+ SchedulerGuard get_current_guard() {
+ return schedulers_[0]->get_guard();
+ }
+
+ void test_one_thread_run();
+
+ bool is_finished() {
+ return is_finished_.load(std::memory_order_relaxed);
+ }
+
+ void start();
+
+ bool run_main(double timeout);
+
+ void finish();
+
+ template <class ActorT, class... Args>
+ ActorOwn<ActorT> create_actor_unsafe(int32 sched_id, Slice name, Args &&... args) {
+#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
+ sched_id = 0;
+#endif
+ CHECK(0 <= sched_id && sched_id < static_cast<int32>(schedulers_.size()));
+ auto guard = schedulers_[sched_id]->get_guard();
+ return schedulers_[sched_id]->create_actor<ActorT>(name, std::forward<Args>(args)...);
+ }
+
+ template <class ActorT>
+ ActorOwn<ActorT> register_actor_unsafe(int32 sched_id, Slice name, ActorT *actor) {
+#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
+ sched_id = 0;
+#endif
+ CHECK(0 <= sched_id && sched_id < static_cast<int32>(schedulers_.size()));
+ auto guard = schedulers_[sched_id]->get_guard();
+ return schedulers_[sched_id]->register_actor<ActorT>(name, actor);
+ }
+
+ private:
+ enum class State { Start, Run };
+ State state_;
+ std::vector<unique_ptr<Scheduler>> schedulers_;
+ std::atomic<bool> is_finished_;
+ std::mutex at_finish_mutex_;
+ std::vector<std::function<void()>> at_finish_;
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+ std::vector<thread> threads_;
+#endif
+
+ void on_finish() override {
+ is_finished_.store(true, std::memory_order_relaxed);
+ for (auto &it : schedulers_) {
+ it->wakeup();
+ }
+ }
+
+ void register_at_finish(std::function<void()> f) override {
+ std::lock_guard<std::mutex> lock(at_finish_mutex_);
+ at_finish_.push_back(std::move(f));
+ }
+};
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/Event.h b/libs/tdlib/td/tdactor/td/actor/impl/Event.h
new file mode 100644
index 0000000000..fac66dd120
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/Event.h
@@ -0,0 +1,247 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/utils/Closure.h"
+#include "td/utils/common.h"
+#include "td/utils/format.h"
+#include "td/utils/logging.h"
+#include "td/utils/StringBuilder.h"
+
+#include <type_traits>
+#include <utility>
+
+namespace td {
+
+class Actor;
+
+// Events
+//
+// Small structure (up to 16 bytes) used to send events between actors.
+//
+// There are some predefined types of events:
+// NoType -- unitialized event
+// Start -- start actor
+// Stop -- stop actor
+// Yield -- wake up actor
+// Timeout -- some timeout has expired
+// Hangup -- hang up called
+// Raw -- just pass 8 bytes (union Raw is used for convenience)
+// Custom -- Send CustomEvent
+
+template <class T>
+std::enable_if_t<!std::is_base_of<Actor, T>::value> start_migrate(T &obj, int32 sched_id) {
+}
+template <class T>
+std::enable_if_t<!std::is_base_of<Actor, T>::value> finish_migrate(T &obj) {
+}
+
+class CustomEvent {
+ public:
+ CustomEvent() = default;
+ CustomEvent(const CustomEvent &) = delete;
+ CustomEvent &operator=(const CustomEvent &) = delete;
+ CustomEvent(CustomEvent &&) = delete;
+ CustomEvent &operator=(CustomEvent &&) = delete;
+ virtual ~CustomEvent() = default;
+
+ virtual void run(Actor *actor) = 0;
+ virtual CustomEvent *clone() const = 0;
+ virtual void start_migrate(int32 sched_id) {
+ }
+ virtual void finish_migrate() {
+ }
+};
+
+template <class ClosureT>
+class ClosureEvent : public CustomEvent {
+ public:
+ void run(Actor *actor) override {
+ closure_.run(static_cast<typename ClosureT::ActorType *>(actor));
+ }
+ CustomEvent *clone() const override {
+ return new ClosureEvent<ClosureT>(closure_.clone());
+ }
+ template <class... ArgsT>
+ explicit ClosureEvent(ArgsT &&... args) : closure_(std::forward<ArgsT>(args)...) {
+ }
+
+ void start_migrate(int32 sched_id) override {
+ closure_.for_each([sched_id](auto &obj) {
+ using ::td::start_migrate;
+ start_migrate(obj, sched_id);
+ });
+ }
+
+ void finish_migrate() override {
+ closure_.for_each([](auto &obj) {
+ using ::td::finish_migrate;
+ finish_migrate(obj);
+ });
+ }
+
+ private:
+ ClosureT closure_;
+};
+
+template <class LambdaT>
+class LambdaEvent : public CustomEvent {
+ public:
+ void run(Actor *actor) override {
+ f_();
+ }
+ CustomEvent *clone() const override {
+ LOG(FATAL) << "Not supported";
+ return nullptr;
+ }
+ template <class FromLambdaT>
+ explicit LambdaEvent(FromLambdaT &&lambda) : f_(std::forward<FromLambdaT>(lambda)) {
+ }
+
+ private:
+ LambdaT f_;
+};
+
+class Event {
+ public:
+ enum class Type { NoType, Start, Stop, Yield, Timeout, Hangup, Raw, Custom };
+ Type type;
+ uint64 link_token = 0;
+ union Raw {
+ void *ptr;
+ CustomEvent *custom_event;
+ uint32 u32;
+ uint64 u64;
+ } data{};
+
+ // factory functions
+ static Event start() {
+ return Event(Type::Start);
+ }
+ static Event stop() {
+ return Event(Type::Stop);
+ }
+ static Event yield() {
+ return Event(Type::Yield);
+ }
+ static Event timeout() {
+ return Event(Type::Timeout);
+ }
+ static Event hangup() {
+ return Event(Type::Hangup);
+ }
+ static Event raw(void *ptr) {
+ return Event(Type::Raw, ptr);
+ }
+ static Event raw(uint32 u32) {
+ return Event(Type::Raw, u32);
+ }
+ static Event raw(uint64 u64) {
+ return Event(Type::Raw, u64);
+ }
+ static Event custom(CustomEvent *custom_event) {
+ return Event(Type::Custom, custom_event);
+ }
+
+ template <class FromImmediateClosureT>
+ static Event immediate_closure(FromImmediateClosureT &&closure) {
+ return custom(
+ new ClosureEvent<typename FromImmediateClosureT::Delayed>(std::forward<FromImmediateClosureT>(closure)));
+ }
+ template <class... ArgsT>
+ static Event delayed_closure(ArgsT &&... args) {
+ using DelayedClosureT = decltype(create_delayed_closure(std::forward<ArgsT>(args)...));
+ return custom(new ClosureEvent<DelayedClosureT>(std::forward<ArgsT>(args)...));
+ }
+
+ template <class FromLambdaT>
+ static Event lambda(FromLambdaT &&lambda) {
+ return custom(new LambdaEvent<std::decay_t<FromLambdaT>>(std::forward<FromLambdaT>(lambda)));
+ }
+
+ Event() : Event(Type::NoType) {
+ }
+ Event(const Event &other) = delete;
+ Event &operator=(const Event &) = delete;
+ Event(Event &&other) : type(other.type), link_token(other.link_token), data(other.data) {
+ other.type = Type::NoType;
+ }
+ Event &operator=(Event &&other) {
+ destroy();
+ type = other.type;
+ link_token = other.link_token;
+ data = other.data;
+ other.type = Type::NoType;
+ return *this;
+ }
+ ~Event() {
+ destroy();
+ }
+
+ Event clone() const {
+ Event res;
+ res.type = type;
+ if (type == Type::Custom) {
+ res.data.custom_event = data.custom_event->clone();
+ } else {
+ res.data = data;
+ }
+ return res;
+ }
+
+ bool empty() const {
+ return type == Type::NoType;
+ }
+
+ void clear() {
+ destroy();
+ type = Type::NoType;
+ }
+
+ Event &set_link_token(uint64 new_link_token) {
+ link_token = new_link_token;
+ return *this;
+ }
+
+ friend void start_migrate(Event &obj, int32 sched_id) {
+ if (obj.type == Type::Custom) {
+ obj.data.custom_event->start_migrate(sched_id);
+ }
+ }
+ friend void finish_migrate(Event &obj) {
+ if (obj.type == Type::Custom) {
+ obj.data.custom_event->finish_migrate();
+ }
+ }
+
+ private:
+ explicit Event(Type type) : type(type) {
+ }
+
+ Event(Type type, void *ptr) : Event(type) {
+ data.ptr = ptr;
+ }
+ Event(Type type, CustomEvent *custom_event) : Event(type) {
+ data.custom_event = custom_event;
+ }
+ Event(Type type, uint32 u32) : Event(type) {
+ data.u32 = u32;
+ }
+ Event(Type type, uint64 u64) : Event(type) {
+ data.u64 = u64;
+ }
+
+ void destroy() {
+ if (type == Type::Custom) {
+ delete data.custom_event;
+ }
+ }
+};
+inline StringBuilder &operator<<(StringBuilder &sb, const Event &e) {
+ return sb << tag("Event", static_cast<int32>(e.type));
+}
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h b/libs/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h
new file mode 100644
index 0000000000..ef2f1c2dcb
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h
@@ -0,0 +1,87 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl/ActorId-decl.h"
+#include "td/actor/impl/Event.h"
+
+#include "td/utils/type_traits.h"
+
+#include <type_traits>
+#include <utility>
+
+namespace td {
+
+class EventFull {
+ public:
+ EventFull() = default;
+
+ bool empty() const {
+ return data_.empty();
+ }
+
+ void clear() {
+ data_.clear();
+ }
+
+ ActorId<> actor_id() const {
+ return actor_id_;
+ }
+ Event &data() {
+ return data_;
+ }
+
+ void try_emit_later();
+ void try_emit();
+
+ private:
+ friend class EventCreator;
+
+ EventFull(ActorRef actor_ref, Event &&data) : actor_id_(actor_ref.get()), data_(std::move(data)) {
+ data_.link_token = actor_ref.token();
+ }
+ template <class T>
+ EventFull(ActorId<T> actor_id, Event &&data) : actor_id_(actor_id), data_(std::move(data)) {
+ }
+
+ ActorId<> actor_id_;
+
+ Event data_;
+};
+
+class EventCreator {
+ public:
+ template <class ActorIdT, class FunctionT, class... ArgsT>
+ static EventFull closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
+ using ActorT = typename std::decay_t<ActorIdT>::ActorT;
+ using FunctionClassT = member_function_class_t<FunctionT>;
+ static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
+
+ return EventFull(std::forward<ActorIdT>(actor_id), Event::delayed_closure(function, std::forward<ArgsT>(args)...));
+ }
+
+ template <class LambdaT>
+ static EventFull lambda(ActorRef actor_ref, LambdaT &&lambda) {
+ return EventFull(actor_ref, Event::lambda(std::forward<LambdaT>(lambda)));
+ }
+
+ static EventFull yield(ActorRef actor_ref) {
+ return EventFull(actor_ref, Event::yield());
+ }
+ static EventFull raw(ActorRef actor_ref, uint64 data) {
+ return EventFull(actor_ref, Event::raw(data));
+ }
+ static EventFull raw(ActorRef actor_ref, void *ptr) {
+ return EventFull(actor_ref, Event::raw(ptr));
+ }
+
+ static EventFull event_unsafe(ActorId<> actor_id, Event &&event) {
+ return EventFull(actor_id, std::move(event));
+ }
+};
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/EventFull.h b/libs/tdlib/td/tdactor/td/actor/impl/EventFull.h
new file mode 100644
index 0000000000..1e997ee4b3
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/EventFull.h
@@ -0,0 +1,38 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl/EventFull-decl.h"
+#include "td/actor/impl/Scheduler-decl.h"
+
+#include "td/utils/logging.h"
+
+#include <utility>
+
+namespace td {
+
+inline void EventFull::try_emit_later() {
+ if (empty()) {
+ return;
+ }
+ auto link_token = data_.link_token;
+ send_event_later(ActorShared<>(actor_id_, link_token), std::move(data_));
+ data_.clear();
+ CHECK(empty());
+}
+
+inline void EventFull::try_emit() {
+ if (empty()) {
+ return;
+ }
+ auto link_token = data_.link_token;
+ send_event(ActorShared<>(actor_id_, link_token), std::move(data_));
+ data_.clear();
+ CHECK(empty());
+}
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h b/libs/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h
new file mode 100644
index 0000000000..4b51c102a5
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h
@@ -0,0 +1,296 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl/Actor-decl.h"
+#include "td/actor/impl/ActorId-decl.h"
+#include "td/actor/impl/EventFull-decl.h"
+
+#include "td/utils/Closure.h"
+#include "td/utils/Heap.h"
+#include "td/utils/List.h"
+#include "td/utils/MovableValue.h"
+#include "td/utils/MpscPollableQueue.h"
+#include "td/utils/ObjectPool.h"
+#include "td/utils/port/EventFd.h"
+#include "td/utils/port/Fd.h"
+#include "td/utils/port/Poll.h"
+#include "td/utils/port/thread_local.h"
+#include "td/utils/Slice.h"
+#include "td/utils/type_traits.h"
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <type_traits>
+#include <utility>
+
+namespace td {
+class ActorInfo;
+struct Send {
+ using Flags = uint32;
+ static const Flags immediate = 0x001;
+ static const Flags later = 0x002;
+ static const Flags later_weak = 0x004;
+};
+
+class Scheduler;
+class SchedulerGuard {
+ public:
+ explicit SchedulerGuard(Scheduler *scheduler);
+ ~SchedulerGuard();
+ SchedulerGuard(const SchedulerGuard &other) = delete;
+ SchedulerGuard &operator=(const SchedulerGuard &other) = delete;
+ SchedulerGuard(SchedulerGuard &&other) = default;
+ SchedulerGuard &operator=(SchedulerGuard &&other) = delete;
+
+ private:
+ MovableValue<bool> is_valid_ = true;
+ Scheduler *scheduler_;
+ ActorContext *save_context_;
+ Scheduler *save_scheduler_;
+ const char *save_tag_;
+};
+
+class Scheduler {
+ public:
+ class Callback {
+ public:
+ Callback() = default;
+ Callback(const Callback &) = delete;
+ Callback &operator=(const Callback &) = delete;
+ virtual ~Callback() = default;
+ virtual void on_finish() = 0;
+ virtual void register_at_finish(std::function<void()>) = 0;
+ };
+ Scheduler() = default;
+ Scheduler(const Scheduler &) = delete;
+ Scheduler &operator=(const Scheduler &) = delete;
+ Scheduler(Scheduler &&) = delete;
+ Scheduler &operator=(Scheduler &&) = delete;
+ ~Scheduler();
+
+ void init();
+ void init(int32 id, std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound, Callback *callback);
+ void clear();
+
+ int32 sched_id() const;
+ int32 sched_count() const;
+
+ template <class ActorT, class... Args>
+ TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&... args);
+ template <class ActorT, class... Args>
+ TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args);
+ template <class ActorT>
+ TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, ActorT *actor_ptr, int32 sched_id = -1);
+ template <class ActorT>
+ TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, unique_ptr<ActorT> actor_ptr, int32 sched_id = -1);
+
+ template <class ActorT>
+ TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_existing_actor(unique_ptr<ActorT> actor_ptr);
+
+ void send_to_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
+ void send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event);
+
+ template <class EventT>
+ void send_lambda(ActorRef actor_ref, EventT &&lambda, Send::Flags flags = 0);
+
+ template <class EventT>
+ void send_closure(ActorRef actor_ref, EventT &&closure, Send::Flags flags = 0);
+
+ void send(ActorRef actor_ref, Event &&event, Send::Flags flags = 0);
+
+ void hack(const ActorId<> &actor_id, Event &&event) {
+ actor_id.get_actor_unsafe()->raw_event(event.data);
+ }
+ void before_tail_send(const ActorId<> &actor_id);
+
+ void subscribe(const Fd &fd, Fd::Flags flags = Fd::Write | Fd::Read);
+ void unsubscribe(const Fd &fd);
+ void unsubscribe_before_close(const Fd &fd);
+
+ void yield_actor(Actor *actor);
+ void stop_actor(Actor *actor);
+ void do_stop_actor(Actor *actor);
+ uint64 get_link_token(Actor *actor);
+ void migrate_actor(Actor *actor, int32 dest_sched_id);
+ void do_migrate_actor(Actor *actor, int32 dest_sched_id);
+ void start_migrate_actor(Actor *actor, int32 dest_sched_id);
+ void finish_migrate_actor(Actor *actor);
+
+ bool has_actor_timeout(const Actor *actor) const;
+ void set_actor_timeout_in(Actor *actor, double timeout);
+ void set_actor_timeout_at(Actor *actor, double timeout_at);
+ void cancel_actor_timeout(Actor *actor);
+
+ void finish();
+ void yield();
+ void run(double timeout);
+ void run_no_guard(double timeout);
+
+ void wakeup();
+
+ static Scheduler *instance();
+ static ActorContext *&context();
+ static void on_context_updated();
+
+ SchedulerGuard get_guard();
+
+ private:
+ static void set_scheduler(Scheduler *scheduler);
+ /*** ServiceActor ***/
+ class ServiceActor final : public Actor {
+ public:
+ void set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues);
+ void start_up() override;
+
+ private:
+ std::shared_ptr<MpscPollableQueue<EventFull>> inbound_;
+ void loop() override;
+ };
+ friend class ServiceActor;
+
+ void do_custom_event(ActorInfo *actor, CustomEvent &event);
+ void do_event(ActorInfo *actor, Event &&event);
+
+ void enter_actor(ActorInfo *actor_info);
+ void exit_actor(ActorInfo *actor_info);
+
+ void yield_actor(ActorInfo *actor_info);
+ void stop_actor(ActorInfo *actor_info);
+ void do_stop_actor(ActorInfo *actor_info);
+ uint64 get_link_token(ActorInfo *actor_info);
+ void migrate_actor(ActorInfo *actor_info, int32 dest_sched_id);
+ void do_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id);
+ void start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id);
+
+ bool has_actor_timeout(const ActorInfo *actor_info) const;
+ void set_actor_timeout_in(ActorInfo *actor_info, double timeout);
+ void set_actor_timeout_at(ActorInfo *actor_info, double timeout_at);
+ void cancel_actor_timeout(ActorInfo *actor_info);
+
+ void register_migrated_actor(ActorInfo *actor_info);
+ void add_to_mailbox(ActorInfo *actor_info, Event &&event);
+ void clear_mailbox(ActorInfo *actor_info);
+
+ template <class RunFuncT, class EventFuncT>
+ void flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func);
+
+ template <class RunFuncT, class EventFuncT>
+ void send_impl(const ActorId<> &actor_id, Send::Flags flags, const RunFuncT &run_func, const EventFuncT &event_func);
+
+ void inc_wait_generation();
+
+ double run_timeout();
+ void run_mailbox();
+ double run_events();
+ void run_poll(double timeout);
+
+ template <class ActorT>
+ ActorOwn<ActorT> register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id);
+ void destroy_actor(ActorInfo *actor_info);
+
+ static TD_THREAD_LOCAL Scheduler *scheduler_;
+ static TD_THREAD_LOCAL ActorContext *context_;
+
+ Callback *callback_ = nullptr;
+ std::unique_ptr<ObjectPool<ActorInfo>> actor_info_pool_;
+
+ int32 actor_count_;
+ ListNode pending_actors_list_;
+ ListNode ready_actors_list_;
+ KHeap<double> timeout_queue_;
+
+ std::map<ActorInfo *, std::vector<Event>> pending_events_;
+
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+ EventFd event_fd_;
+#endif
+ ServiceActor service_actor_;
+ Poll poll_;
+
+ bool yield_flag_;
+ bool has_guard_ = false;
+ bool close_flag_ = false;
+
+ uint32 wait_generation_ = 0;
+ int32 sched_id_;
+ int32 sched_n_;
+ std::shared_ptr<MpscPollableQueue<EventFull>> inbound_queue_;
+ std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound_queues_;
+
+ std::shared_ptr<ActorContext> save_context_;
+
+ struct EventContext {
+ int32 dest_sched_id;
+ enum Flags { Stop = 1, Migrate = 2 };
+ int32 flags{0};
+ uint64 link_token;
+
+ ActorInfo *actor_info;
+ };
+ EventContext *event_context_ptr_;
+
+ friend class GlobalScheduler;
+ friend class SchedulerGuard;
+ friend class EventGuard;
+};
+
+/*** Interface to current scheduler ***/
+void subscribe(const Fd &fd, Fd::Flags flags = Fd::Write | Fd::Read);
+void unsubscribe(const Fd &fd);
+void unsubscribe_before_close(const Fd &fd);
+
+template <class ActorT, class... Args>
+TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&... args);
+template <class ActorT, class... Args>
+TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args);
+template <class ActorT>
+TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, ActorT *actor_ptr, int32 sched_id = -1);
+template <class ActorT>
+TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, unique_ptr<ActorT> actor_ptr, int32 sched_id = -1);
+
+template <class ActorT>
+TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_existing_actor(unique_ptr<ActorT> actor_ptr);
+
+template <class ActorIdT, class FunctionT, class... ArgsT>
+void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
+ using ActorT = typename std::decay_t<ActorIdT>::ActorT;
+ using FunctionClassT = member_function_class_t<FunctionT>;
+ static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
+
+ Scheduler::instance()->send_closure(std::forward<ActorIdT>(actor_id),
+ create_immediate_closure(function, std::forward<ArgsT>(args)...));
+}
+
+template <class ActorIdT, class FunctionT, class... ArgsT>
+void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
+ using ActorT = typename std::decay_t<ActorIdT>::ActorT;
+ using FunctionClassT = member_function_class_t<FunctionT>;
+ static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
+
+ Scheduler::instance()->send(std::forward<ActorIdT>(actor_id),
+ Event::delayed_closure(function, std::forward<ArgsT>(args)...), Send::later);
+}
+
+template <class... ArgsT>
+void send_lambda(ActorRef actor_ref, ArgsT &&... args) {
+ Scheduler::instance()->send_lambda(actor_ref, std::forward<ArgsT>(args)...);
+}
+
+template <class... ArgsT>
+void send_event(ActorRef actor_ref, ArgsT &&... args) {
+ Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)...);
+}
+
+template <class... ArgsT>
+void send_event_later(ActorRef actor_ref, ArgsT &&... args) {
+ Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)..., Send::later);
+}
+
+void yield_scheduler();
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp b/libs/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp
new file mode 100644
index 0000000000..479e419d62
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp
@@ -0,0 +1,496 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/actor/impl/Scheduler.h"
+
+#include "td/actor/impl/Actor.h"
+#include "td/actor/impl/ActorId.h"
+#include "td/actor/impl/ActorInfo.h"
+#include "td/actor/impl/Event.h"
+#include "td/actor/impl/EventFull.h"
+
+#include "td/utils/common.h"
+#include "td/utils/format.h"
+#include "td/utils/List.h"
+#include "td/utils/logging.h"
+#include "td/utils/ObjectPool.h"
+#include "td/utils/port/thread_local.h"
+#include "td/utils/ScopeGuard.h"
+#include "td/utils/Time.h"
+
+#include <functional>
+#include <utility>
+
+namespace td {
+
+TD_THREAD_LOCAL Scheduler *Scheduler::scheduler_; // static zero-initialized
+TD_THREAD_LOCAL ActorContext *Scheduler::context_; // static zero-initialized
+
+Scheduler::~Scheduler() {
+ clear();
+}
+
+Scheduler *Scheduler::instance() {
+ return scheduler_;
+}
+
+ActorContext *&Scheduler::context() {
+ return context_;
+}
+
+void Scheduler::on_context_updated() {
+ LOG_TAG = context_->tag_;
+}
+
+void Scheduler::set_scheduler(Scheduler *scheduler) {
+ scheduler_ = scheduler;
+}
+
+void Scheduler::ServiceActor::start_up() {
+#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
+ CHECK(!inbound_);
+#else
+ if (!inbound_) {
+ return;
+ }
+ auto &fd = inbound_->reader_get_event_fd();
+
+ fd.get_fd().set_observer(this);
+ ::td::subscribe(fd.get_fd(), Fd::Read);
+ yield();
+#endif
+}
+
+void Scheduler::ServiceActor::loop() {
+ auto &queue = inbound_;
+ int ready_n = queue->reader_wait_nonblock();
+ if (ready_n == 0) {
+ return;
+ }
+ while (ready_n-- > 0) {
+ EventFull event = queue->reader_get_unsafe();
+ if (event.actor_id().empty()) {
+ Scheduler::instance()->register_migrated_actor(static_cast<ActorInfo *>(event.data().data.ptr));
+ } else {
+ VLOG(actor) << "Receive " << event.data();
+ finish_migrate(event.data());
+ event.try_emit();
+ }
+ }
+ queue->reader_flush();
+ yield();
+}
+
+/*** SchedlerGuard ***/
+SchedulerGuard::SchedulerGuard(Scheduler *scheduler) : scheduler_(scheduler) {
+ CHECK(!scheduler_->has_guard_);
+ scheduler_->has_guard_ = true;
+ save_scheduler_ = Scheduler::instance();
+ Scheduler::set_scheduler(scheduler_);
+
+ // Scheduler::context() must be not null
+ save_context_ = scheduler_->save_context_.get();
+ save_tag_ = LOG_TAG;
+ LOG_TAG = save_context_->tag_;
+ std::swap(save_context_, Scheduler::context());
+}
+
+SchedulerGuard::~SchedulerGuard() {
+ if (is_valid_.get()) {
+ std::swap(save_context_, scheduler_->context());
+ Scheduler::set_scheduler(save_scheduler_);
+ CHECK(scheduler_->has_guard_);
+ scheduler_->has_guard_ = false;
+ LOG_TAG = save_tag_;
+ }
+}
+
+/*** EventGuard ***/
+EventGuard::EventGuard(Scheduler *scheduler, ActorInfo *actor_info) : scheduler_(scheduler) {
+ actor_info->start_run();
+ event_context_.actor_info = actor_info;
+ event_context_ptr_ = &event_context_;
+
+ save_context_ = actor_info->get_context();
+#ifdef TD_DEBUG
+ save_log_tag2_ = actor_info->get_name().c_str();
+#endif
+ swap_context(actor_info);
+}
+
+EventGuard::~EventGuard() {
+ auto info = event_context_.actor_info;
+ auto node = info->get_list_node();
+ node->remove();
+ if (info->mailbox_.empty()) {
+ scheduler_->pending_actors_list_.put(node);
+ } else {
+ scheduler_->ready_actors_list_.put(node);
+ }
+ info->finish_run();
+ swap_context(info);
+ CHECK(info->is_lite() || save_context_ == info->get_context());
+#ifdef TD_DEBUG
+ CHECK(info->is_lite() || save_log_tag2_ == info->get_name().c_str());
+#endif
+ if (event_context_.flags & Scheduler::EventContext::Stop) {
+ scheduler_->do_stop_actor(info);
+ return;
+ }
+ if (event_context_.flags & Scheduler::EventContext::Migrate) {
+ scheduler_->do_migrate_actor(info, event_context_.dest_sched_id);
+ }
+}
+
+void EventGuard::swap_context(ActorInfo *info) {
+ std::swap(scheduler_->event_context_ptr_, event_context_ptr_);
+
+ if (info->is_lite()) {
+ return;
+ }
+
+#ifdef TD_DEBUG
+ std::swap(LOG_TAG2, save_log_tag2_);
+#endif
+
+ auto *current_context_ptr = &Scheduler::context();
+ if (save_context_ != *current_context_ptr) {
+ std::swap(save_context_, *current_context_ptr);
+ Scheduler::on_context_updated();
+ }
+}
+
+void Scheduler::init(int32 id, std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound,
+ Callback *callback) {
+ save_context_ = std::make_shared<ActorContext>();
+ save_context_->this_ptr_ = save_context_;
+ save_context_->tag_ = LOG_TAG;
+
+ auto guard = get_guard();
+
+ callback_ = callback;
+ actor_info_pool_ = make_unique<ObjectPool<ActorInfo>>();
+
+ yield_flag_ = false;
+ actor_count_ = 0;
+ sched_id_ = 0;
+
+ poll_.init();
+
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+ event_fd_.init();
+ subscribe(event_fd_.get_fd(), Fd::Read);
+#endif
+
+ if (!outbound.empty()) {
+ inbound_queue_ = std::move(outbound[id]);
+ }
+ outbound_queues_ = std::move(outbound);
+ sched_id_ = id;
+ sched_n_ = static_cast<int32>(outbound_queues_.size());
+ service_actor_.set_queue(inbound_queue_);
+ register_actor("ServiceActor", &service_actor_).release();
+}
+
+void Scheduler::clear() {
+ if (service_actor_.empty()) {
+ return;
+ }
+ close_flag_ = true;
+ auto guard = get_guard();
+
+ // Stop all actors
+ if (!service_actor_.empty()) {
+ service_actor_.do_stop();
+ }
+ while (!pending_actors_list_.empty()) {
+ auto actor_info = ActorInfo::from_list_node(pending_actors_list_.get());
+ do_stop_actor(actor_info);
+ }
+ while (!ready_actors_list_.empty()) {
+ auto actor_info = ActorInfo::from_list_node(ready_actors_list_.get());
+ do_stop_actor(actor_info);
+ }
+ LOG_IF(FATAL, !ready_actors_list_.empty()) << ActorInfo::from_list_node(ready_actors_list_.next)->get_name();
+ CHECK(ready_actors_list_.empty());
+ poll_.clear();
+
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+ if (!event_fd_.empty()) {
+ event_fd_.close();
+ }
+#endif
+
+ if (callback_) {
+ // can't move lambda with unique_ptr inside into std::function
+ auto ptr = actor_info_pool_.release();
+ callback_->register_at_finish([=]() { delete ptr; });
+ } else {
+ actor_info_pool_.reset();
+ }
+}
+
+void Scheduler::do_event(ActorInfo *actor_info, Event &&event) {
+ event_context_ptr_->link_token = event.link_token;
+ auto actor = actor_info->get_actor_unsafe();
+ switch (event.type) {
+ case Event::Type::Start: {
+ VLOG(actor) << *actor_info << " Event::Start";
+ actor->start_up();
+ break;
+ }
+ case Event::Type::Stop: {
+ VLOG(actor) << *actor_info << " Event::Stop";
+ actor->tear_down();
+ break;
+ }
+ case Event::Type::Yield: {
+ VLOG(actor) << *actor_info << " Event::Yield";
+ actor->wakeup();
+ break;
+ }
+ case Event::Type::Hangup: {
+ auto token = get_link_token(actor);
+ VLOG(actor) << *actor_info << " Event::Hangup " << tag("token", format::as_hex(token));
+ if (token != 0) {
+ actor->hangup_shared();
+ } else {
+ actor->hangup();
+ }
+ break;
+ }
+ case Event::Type::Timeout: {
+ VLOG(actor) << *actor_info << " Event::Timeout";
+ actor->timeout_expired();
+ break;
+ }
+ case Event::Type::Raw: {
+ VLOG(actor) << *actor_info << " Event::Raw";
+ actor->raw_event(event.data);
+ break;
+ }
+ case Event::Type::Custom: {
+ do_custom_event(actor_info, *event.data.custom_event);
+ break;
+ }
+ case Event::Type::NoType: {
+ UNREACHABLE();
+ break;
+ }
+ }
+ // can't clear event here. It may be already destroyed during destory_actor
+}
+
+void Scheduler::register_migrated_actor(ActorInfo *actor_info) {
+ VLOG(actor) << "Register migrated actor: " << tag("name", *actor_info) << tag("ptr", actor_info)
+ << tag("actor_count", actor_count_);
+ actor_count_++;
+ CHECK(actor_info->is_migrating());
+ CHECK(sched_id_ == actor_info->migrate_dest());
+ // CHECK(!actor_info->is_running());
+ actor_info->finish_migrate();
+ for (auto &event : actor_info->mailbox_) {
+ finish_migrate(event);
+ }
+ auto it = pending_events_.find(actor_info);
+ if (it != pending_events_.end()) {
+ actor_info->mailbox_.insert(actor_info->mailbox_.end(), make_move_iterator(begin(it->second)),
+ make_move_iterator(end(it->second)));
+ pending_events_.erase(it);
+ }
+ if (actor_info->mailbox_.empty()) {
+ pending_actors_list_.put(actor_info->get_list_node());
+ } else {
+ ready_actors_list_.put(actor_info->get_list_node());
+ }
+ actor_info->get_actor_unsafe()->on_finish_migrate();
+}
+
+void Scheduler::send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event) {
+ if (sched_id < sched_count()) {
+ auto actor_info = actor_id.get_actor_info();
+ if (actor_info) {
+ VLOG(actor) << "Send to " << *actor_info << " on scheduler " << sched_id << ": " << event;
+ } else {
+ VLOG(actor) << "Send to scheduler " << sched_id << ": " << event;
+ }
+ start_migrate(event, sched_id);
+ outbound_queues_[sched_id]->writer_put(EventCreator::event_unsafe(actor_id, std::move(event)));
+ outbound_queues_[sched_id]->writer_flush();
+ }
+}
+
+void Scheduler::add_to_mailbox(ActorInfo *actor_info, Event &&event) {
+ if (!actor_info->is_running()) {
+ auto node = actor_info->get_list_node();
+ node->remove();
+ ready_actors_list_.put(node);
+ }
+ VLOG(actor) << "Add to mailbox: " << *actor_info << " " << event;
+ actor_info->mailbox_.push_back(std::move(event));
+}
+
+void Scheduler::do_stop_actor(Actor *actor) {
+ return do_stop_actor(actor->get_info());
+}
+void Scheduler::do_stop_actor(ActorInfo *actor_info) {
+ CHECK(!actor_info->is_migrating());
+ CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_;
+ ObjectPool<ActorInfo>::OwnerPtr owner_ptr;
+ if (!actor_info->is_lite()) {
+ EventGuard guard(this, actor_info);
+ do_event(actor_info, Event::stop());
+ owner_ptr = actor_info->get_actor_unsafe()->clear();
+ // Actor context is visible in destructor
+ actor_info->destroy_actor();
+ event_context_ptr_->flags = 0;
+ } else {
+ owner_ptr = actor_info->get_actor_unsafe()->clear();
+ }
+ destroy_actor(actor_info);
+}
+
+void Scheduler::migrate_actor(Actor *actor, int32 dest_sched_id) {
+ migrate_actor(actor->get_info(), dest_sched_id);
+}
+void Scheduler::migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
+ CHECK(event_context_ptr_->actor_info == actor_info);
+ if (sched_id_ == dest_sched_id) {
+ return;
+ }
+ event_context_ptr_->flags |= EventContext::Migrate;
+ event_context_ptr_->dest_sched_id = dest_sched_id;
+}
+
+void Scheduler::do_migrate_actor(Actor *actor, int32 dest_sched_id) {
+ do_migrate_actor(actor->get_info(), dest_sched_id);
+}
+void Scheduler::do_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
+#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
+ dest_sched_id = 0;
+#endif
+ if (sched_id_ == dest_sched_id) {
+ return;
+ }
+ start_migrate_actor(actor_info, dest_sched_id);
+ send_to_other_scheduler(dest_sched_id, ActorId<>(), Event::raw(actor_info));
+}
+
+void Scheduler::start_migrate_actor(Actor *actor, int32 dest_sched_id) {
+ start_migrate_actor(actor->get_info(), dest_sched_id);
+}
+void Scheduler::start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) {
+ VLOG(actor) << "Start migrate actor: " << tag("name", actor_info) << tag("ptr", actor_info)
+ << tag("actor_count", actor_count_);
+ actor_count_--;
+ CHECK(actor_count_ >= 0);
+ actor_info->get_actor_unsafe()->on_start_migrate(dest_sched_id);
+ for (auto &event : actor_info->mailbox_) {
+ start_migrate(event, dest_sched_id);
+ }
+ actor_info->start_migrate(dest_sched_id);
+ actor_info->get_list_node()->remove();
+ cancel_actor_timeout(actor_info);
+}
+
+void Scheduler::set_actor_timeout_in(ActorInfo *actor_info, double timeout) {
+ if (timeout > 1e10) {
+ timeout = 1e10;
+ }
+ if (timeout < 0) {
+ timeout = 0;
+ }
+ double expire_at = Time::now() + timeout;
+ set_actor_timeout_at(actor_info, expire_at);
+}
+
+void Scheduler::set_actor_timeout_at(ActorInfo *actor_info, double timeout_at) {
+ HeapNode *heap_node = actor_info->get_heap_node();
+ VLOG(actor) << "set actor " << *actor_info << " " << tag("timeout", timeout_at) << timeout_at - Time::now_cached();
+ if (heap_node->in_heap()) {
+ timeout_queue_.fix(timeout_at, heap_node);
+ } else {
+ timeout_queue_.insert(timeout_at, heap_node);
+ }
+}
+
+void Scheduler::run_poll(double timeout) {
+ // LOG(DEBUG) << "run poll [timeout:" << format::as_time(timeout) << "]";
+ // we can't wait for less than 1ms
+ poll_.run(static_cast<int32>(timeout * 1000 + 1));
+
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+ if (can_read(event_fd_.get_fd())) {
+ std::atomic_thread_fence(std::memory_order_acquire);
+ event_fd_.acquire();
+ }
+#endif
+}
+
+void Scheduler::run_mailbox() {
+ VLOG(actor) << "run mailbox : begin";
+ ListNode actors_list = std::move(ready_actors_list_);
+ while (!actors_list.empty()) {
+ ListNode *node = actors_list.get();
+ CHECK(node);
+ auto actor_info = ActorInfo::from_list_node(node);
+ inc_wait_generation();
+ flush_mailbox(actor_info, static_cast<void (*)(ActorInfo *)>(nullptr), static_cast<Event (*)()>(nullptr));
+ }
+ VLOG(actor) << "run mailbox : finish " << actor_count_;
+
+ //Useful for debug, but O(ActorsCount) check
+
+ //int cnt = 0;
+ //for (ListNode *end = &pending_actors_list_, *it = pending_actors_list_.next; it != end; it = it->next) {
+ //cnt++;
+ //auto actor_info = ActorInfo::from_list_node(it);
+ //LOG(ERROR) << *actor_info;
+ //CHECK(actor_info->mailbox_.empty());
+ //CHECK(!actor_info->is_running());
+ //}
+ //for (ListNode *end = &ready_actors_list_, *it = ready_actors_list_.next; it != end; it = it->next) {
+ //auto actor_info = ActorInfo::from_list_node(it);
+ //LOG(ERROR) << *actor_info;
+ //cnt++;
+ //}
+ //CHECK(cnt == actor_count_) << cnt << " vs " << actor_count_;
+}
+
+double Scheduler::run_timeout() {
+ double now = Time::now();
+ while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) {
+ HeapNode *node = timeout_queue_.pop();
+ ActorInfo *actor_info = ActorInfo::from_heap_node(node);
+ inc_wait_generation();
+ send(actor_info->actor_id(), Event::timeout(), Send::immediate);
+ }
+ if (timeout_queue_.empty()) {
+ return 10000;
+ }
+ double timeout = timeout_queue_.top_key() - now;
+ // LOG(DEBUG) << "Timeout [cnt:" << timeout_queue_.size() << "] in " << format::as_time(timeout);
+ return timeout;
+}
+
+void Scheduler::run_no_guard(double timeout) {
+ CHECK(has_guard_);
+ SCOPE_EXIT {
+ yield_flag_ = false;
+ };
+
+ double next_timeout = run_events();
+ if (next_timeout < timeout) {
+ timeout = next_timeout;
+ }
+ if (yield_flag_) {
+ return;
+ }
+ run_poll(timeout);
+ run_events();
+}
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl/Scheduler.h b/libs/tdlib/td/tdactor/td/actor/impl/Scheduler.h
new file mode 100644
index 0000000000..7edf3f1d2d
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl/Scheduler.h
@@ -0,0 +1,397 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl/ActorInfo-decl.h"
+#include "td/actor/impl/Scheduler-decl.h"
+
+#include "td/utils/format.h"
+#include "td/utils/Heap.h"
+#include "td/utils/logging.h"
+#include "td/utils/MpscPollableQueue.h"
+#include "td/utils/ObjectPool.h"
+#include "td/utils/port/Fd.h"
+#include "td/utils/Slice.h"
+
+#include <atomic>
+#include <memory>
+#include <tuple>
+#include <utility>
+
+namespace td {
+
+/*** ServiceActor ***/
+inline void Scheduler::ServiceActor::set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues) {
+ inbound_ = std::move(queues);
+}
+
+/*** EventGuard ***/
+class EventGuard {
+ public:
+ EventGuard(Scheduler *scheduler, ActorInfo *actor_info);
+
+ bool can_run() const {
+ return event_context_.flags == 0;
+ }
+
+ EventGuard(const EventGuard &other) = delete;
+ EventGuard &operator=(const EventGuard &other) = delete;
+ EventGuard(EventGuard &&other) = delete;
+ EventGuard &operator=(EventGuard &&other) = delete;
+ ~EventGuard();
+
+ private:
+ Scheduler::EventContext event_context_;
+ Scheduler::EventContext *event_context_ptr_;
+ Scheduler *scheduler_;
+ ActorContext *save_context_;
+ const char *save_log_tag2_;
+
+ void swap_context(ActorInfo *info);
+};
+
+/*** Scheduler ***/
+inline SchedulerGuard Scheduler::get_guard() {
+ return SchedulerGuard(this);
+}
+
+inline void Scheduler::init() {
+ init(0, {}, nullptr);
+}
+
+inline int32 Scheduler::sched_id() const {
+ return sched_id_;
+}
+inline int32 Scheduler::sched_count() const {
+ return sched_n_;
+}
+
+template <class ActorT, class... Args>
+ActorOwn<ActorT> Scheduler::create_actor(Slice name, Args &&... args) {
+ return register_actor_impl(name, new ActorT(std::forward<Args>(args)...), Actor::Deleter::Destroy, sched_id_);
+}
+
+template <class ActorT, class... Args>
+ActorOwn<ActorT> Scheduler::create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args) {
+ return register_actor_impl(name, new ActorT(std::forward<Args>(args)...), Actor::Deleter::Destroy, sched_id);
+}
+
+template <class ActorT>
+ActorOwn<ActorT> Scheduler::register_actor(Slice name, ActorT *actor_ptr, int32 sched_id) {
+ return register_actor_impl(name, actor_ptr, Actor::Deleter::None, sched_id);
+}
+
+template <class ActorT>
+ActorOwn<ActorT> Scheduler::register_actor(Slice name, unique_ptr<ActorT> actor_ptr, int32 sched_id) {
+ return register_actor_impl(name, actor_ptr.release(), Actor::Deleter::Destroy, sched_id);
+}
+
+template <class ActorT>
+ActorOwn<ActorT> Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id) {
+ if (sched_id == -1) {
+ sched_id = sched_id_;
+ }
+#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
+ sched_id = 0;
+#endif
+ CHECK(sched_id == sched_id_ || (0 <= sched_id && sched_id < static_cast<int32>(outbound_queues_.size()))) << sched_id;
+ auto info = actor_info_pool_->create_empty();
+ VLOG(actor) << "Create actor: " << tag("name", name) << tag("ptr", *info) << tag("context", context())
+ << tag("this", this) << tag("actor_count", actor_count_);
+ actor_count_++;
+ auto weak_info = info.get_weak();
+ auto actor_info = info.get();
+ info->init(sched_id_, name, std::move(info), static_cast<Actor *>(actor_ptr), deleter, ActorTraits<ActorT>::is_lite);
+
+ ActorId<ActorT> actor_id = weak_info->actor_id(actor_ptr);
+ if (sched_id != sched_id_) {
+ send(actor_id, Event::start(), Send::later_weak);
+ do_migrate_actor(actor_info, sched_id);
+ } else {
+ pending_actors_list_.put(weak_info->get_list_node());
+ if (!ActorTraits<ActorT>::is_lite) {
+ send(actor_id, Event::start(), Send::later_weak);
+ }
+ }
+
+ return ActorOwn<ActorT>(actor_id);
+}
+
+template <class ActorT>
+ActorOwn<ActorT> Scheduler::register_existing_actor(std::unique_ptr<ActorT> actor_ptr) {
+ CHECK(!actor_ptr->empty());
+ auto actor_info = actor_ptr->get_info();
+ CHECK(actor_info->migrate_dest_flag_atomic().first == sched_id_);
+ return actor_info->transfer_ownership_to_scheduler(std::move(actor_ptr));
+}
+
+inline void Scheduler::destroy_actor(ActorInfo *actor_info) {
+ VLOG(actor) << "Destroy actor: " << tag("name", *actor_info) << tag("ptr", actor_info)
+ << tag("actor_count", actor_count_);
+
+ CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_;
+ cancel_actor_timeout(actor_info);
+ actor_info->get_list_node()->remove();
+ // called by ObjectPool
+ // actor_info->clear();
+ actor_count_--;
+ CHECK(actor_count_ >= 0);
+}
+
+inline void Scheduler::do_custom_event(ActorInfo *actor_info, CustomEvent &event) {
+ VLOG(actor) << *actor_info << " Event::Custom";
+ event.run(actor_info->get_actor_unsafe());
+}
+
+template <class RunFuncT, class EventFuncT>
+void Scheduler::flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func) {
+ auto &mailbox = actor_info->mailbox_;
+ size_t mailbox_size = mailbox.size();
+ CHECK(mailbox_size != 0);
+ EventGuard guard(this, actor_info);
+ size_t i = 0;
+ for (; i < mailbox_size && guard.can_run(); i++) {
+ do_event(actor_info, std::move(mailbox[i]));
+ }
+ if (run_func) {
+ if (guard.can_run()) {
+ (*run_func)(actor_info);
+ } else {
+ mailbox.insert(begin(mailbox) + i, (*event_func)());
+ }
+ }
+ mailbox.erase(begin(mailbox), begin(mailbox) + i);
+}
+
+inline void Scheduler::send_to_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event) {
+ if (sched_id == sched_id_) {
+ ActorInfo *actor_info = actor_id.get_actor_info();
+ pending_events_[actor_info].push_back(std::move(event));
+ } else {
+ send_to_other_scheduler(sched_id, actor_id, std::move(event));
+ }
+}
+
+inline void Scheduler::before_tail_send(const ActorId<> &actor_id) {
+ // TODO
+}
+
+inline void Scheduler::inc_wait_generation() {
+ wait_generation_++;
+}
+
+template <class RunFuncT, class EventFuncT>
+void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const RunFuncT &run_func,
+ const EventFuncT &event_func) {
+ CHECK(has_guard_);
+ ActorInfo *actor_info = actor_id.get_actor_info();
+ if (unlikely(actor_info == nullptr || close_flag_)) {
+ // LOG(ERROR) << "Invalid actor id";
+ return;
+ }
+
+ CHECK(actor_info != nullptr);
+ int32 actor_sched_id;
+ bool is_migrating;
+ std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic();
+ bool on_current_sched = !is_migrating && sched_id_ == actor_sched_id;
+
+ if (likely(!(flags & Send::later) && !(flags & Send::later_weak) && on_current_sched && !actor_info->is_running() &&
+ !actor_info->must_wait(wait_generation_))) { // run immediately
+ if (likely(actor_info->mailbox_.empty())) {
+ EventGuard guard(this, actor_info);
+ run_func(actor_info);
+ } else {
+ flush_mailbox(actor_info, &run_func, &event_func);
+ }
+ } else {
+ if (on_current_sched) {
+ add_to_mailbox(actor_info, event_func());
+ if (flags & Send::later) {
+ actor_info->set_wait_generation(wait_generation_);
+ }
+ } else {
+ send_to_scheduler(actor_sched_id, actor_id, event_func());
+ }
+ }
+}
+
+template <class EventT>
+void Scheduler::send_lambda(ActorRef actor_ref, EventT &&lambda, Send::Flags flags) {
+ return send_impl(actor_ref.get(), flags,
+ [&](ActorInfo *actor_info) {
+ event_context_ptr_->link_token = actor_ref.token();
+ lambda();
+ },
+ [&]() {
+ auto event = Event::lambda(std::forward<EventT>(lambda));
+ event.set_link_token(actor_ref.token());
+ return std::move(event);
+ });
+}
+
+template <class EventT>
+void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure, Send::Flags flags) {
+ return send_impl(actor_ref.get(), flags,
+ [&](ActorInfo *actor_info) {
+ event_context_ptr_->link_token = actor_ref.token();
+ closure.run(static_cast<typename EventT::ActorType *>(actor_info->get_actor_unsafe()));
+ },
+ [&]() {
+ auto event = Event::immediate_closure(std::forward<EventT>(closure));
+ event.set_link_token(actor_ref.token());
+ return std::move(event);
+ });
+}
+
+inline void Scheduler::send(ActorRef actor_ref, Event &&event, Send::Flags flags) {
+ event.set_link_token(actor_ref.token());
+ return send_impl(actor_ref.get(), flags, [&](ActorInfo *actor_info) { do_event(actor_info, std::move(event)); },
+ [&]() { return std::move(event); });
+}
+
+inline void Scheduler::subscribe(const Fd &fd, Fd::Flags flags) {
+ poll_.subscribe(fd, flags);
+}
+
+inline void Scheduler::unsubscribe(const Fd &fd) {
+ poll_.unsubscribe(fd);
+}
+
+inline void Scheduler::unsubscribe_before_close(const Fd &fd) {
+ poll_.unsubscribe_before_close(fd);
+}
+
+inline void Scheduler::yield_actor(Actor *actor) {
+ yield_actor(actor->get_info());
+}
+inline void Scheduler::yield_actor(ActorInfo *actor_info) {
+ send(actor_info->actor_id(), Event::yield(), Send::later_weak);
+}
+
+inline void Scheduler::stop_actor(Actor *actor) {
+ stop_actor(actor->get_info());
+}
+inline void Scheduler::stop_actor(ActorInfo *actor_info) {
+ CHECK(event_context_ptr_->actor_info == actor_info);
+ event_context_ptr_->flags |= EventContext::Stop;
+}
+
+inline uint64 Scheduler::get_link_token(Actor *actor) {
+ return get_link_token(actor->get_info());
+}
+inline uint64 Scheduler::get_link_token(ActorInfo *actor_info) {
+ CHECK(event_context_ptr_->actor_info == actor_info);
+ return event_context_ptr_->link_token;
+}
+
+inline void Scheduler::finish_migrate_actor(Actor *actor) {
+ register_migrated_actor(actor->get_info());
+}
+
+inline bool Scheduler::has_actor_timeout(const Actor *actor) const {
+ return has_actor_timeout(actor->get_info());
+}
+inline void Scheduler::set_actor_timeout_in(Actor *actor, double timeout) {
+ set_actor_timeout_in(actor->get_info(), timeout);
+}
+inline void Scheduler::set_actor_timeout_at(Actor *actor, double timeout_at) {
+ set_actor_timeout_at(actor->get_info(), timeout_at);
+}
+inline void Scheduler::cancel_actor_timeout(Actor *actor) {
+ cancel_actor_timeout(actor->get_info());
+}
+
+inline bool Scheduler::has_actor_timeout(const ActorInfo *actor_info) const {
+ const HeapNode *heap_node = actor_info->get_heap_node();
+ return heap_node->in_heap();
+}
+
+inline void Scheduler::cancel_actor_timeout(ActorInfo *actor_info) {
+ HeapNode *heap_node = actor_info->get_heap_node();
+ if (heap_node->in_heap()) {
+ timeout_queue_.erase(heap_node);
+ }
+}
+
+inline void Scheduler::finish() {
+ if (callback_) {
+ callback_->on_finish();
+ }
+ yield();
+}
+
+inline void Scheduler::yield() {
+ yield_flag_ = true;
+}
+
+inline void Scheduler::wakeup() {
+ std::atomic_thread_fence(std::memory_order_release);
+#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
+ event_fd_.release();
+#endif
+}
+
+inline double Scheduler::run_events() {
+ double res;
+ VLOG(actor) << "run events " << sched_id_ << " " << tag("pending", pending_events_.size())
+ << tag("actors", actor_count_);
+ do {
+ run_mailbox();
+ res = run_timeout();
+ } while (!ready_actors_list_.empty());
+ return res;
+}
+
+inline void Scheduler::run(double timeout) {
+ auto guard = get_guard();
+ run_no_guard(timeout);
+}
+
+/*** Interface to current scheduler ***/
+inline void subscribe(const Fd &fd, Fd::Flags flags) {
+ Scheduler::instance()->subscribe(fd, flags);
+}
+
+inline void unsubscribe(const Fd &fd) {
+ Scheduler::instance()->unsubscribe(fd);
+}
+
+inline void unsubscribe_before_close(const Fd &fd) {
+ Scheduler::instance()->unsubscribe_before_close(fd);
+}
+
+template <class ActorT, class... Args>
+ActorOwn<ActorT> create_actor(Slice name, Args &&... args) {
+ return Scheduler::instance()->create_actor<ActorT>(name, std::forward<Args>(args)...);
+}
+
+template <class ActorT, class... Args>
+ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args) {
+ return Scheduler::instance()->create_actor_on_scheduler<ActorT>(name, sched_id, std::forward<Args>(args)...);
+}
+
+template <class ActorT>
+ActorOwn<ActorT> register_actor(Slice name, ActorT *actor_ptr, int32 sched_id) {
+ return Scheduler::instance()->register_actor<ActorT>(name, actor_ptr, sched_id);
+}
+
+template <class ActorT>
+ActorOwn<ActorT> register_actor(Slice name, unique_ptr<ActorT> actor_ptr, int32 sched_id) {
+ return Scheduler::instance()->register_actor<ActorT>(name, std::move(actor_ptr), sched_id);
+}
+
+template <class ActorT>
+ActorOwn<ActorT> register_existing_actor(unique_ptr<ActorT> actor_ptr) {
+ return Scheduler::instance()->register_existing_actor(std::move(actor_ptr));
+}
+
+inline void yield_scheduler() {
+ Scheduler::instance()->yield();
+}
+
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h b/libs/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h
new file mode 100644
index 0000000000..2cb5cb2127
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h
@@ -0,0 +1,117 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl2/ActorSignals.h"
+#include "td/actor/impl2/ActorState.h"
+
+#include "td/utils/logging.h"
+
+#include <atomic>
+
+namespace td {
+namespace actor2 {
+class ActorLocker {
+ public:
+ struct Options {
+ Options() {
+ }
+ bool can_execute_paused = false;
+ bool is_shared = true;
+ Options &with_can_execute_paused(bool new_can_execute_paused) {
+ can_execute_paused = new_can_execute_paused;
+ return *this;
+ }
+ Options &with_is_shared(bool new_is_shared) {
+ is_shared = new_is_shared;
+ return *this;
+ }
+ };
+ explicit ActorLocker(ActorState *state, Options options = {})
+ : state_(state), flags_(state->get_flags_unsafe()), new_flags_{}, options_{options} {
+ }
+ bool try_lock() {
+ CHECK(!own_lock());
+ while (!can_try_add_signals()) {
+ new_flags_ = flags_;
+ new_flags_.set_locked(true);
+ new_flags_.clear_signals();
+ if (state_->state_.compare_exchange_strong(flags_.raw_ref(), new_flags_.raw(), std::memory_order_acq_rel)) {
+ own_lock_ = true;
+ return true;
+ }
+ }
+ return false;
+ }
+ bool try_unlock(ActorState::Flags flags) {
+ CHECK(!flags.is_locked());
+ CHECK(own_lock());
+ // can't unlock with signals set
+ //CHECK(!flags.has_signals());
+
+ flags_ = flags;
+ //try unlock
+ if (state_->state_.compare_exchange_strong(new_flags_.raw_ref(), flags.raw(), std::memory_order_acq_rel)) {
+ own_lock_ = false;
+ return true;
+ }
+
+ // read all signals
+ flags.set_locked(true);
+ flags.clear_signals();
+ do {
+ flags_.add_signals(new_flags_.get_signals());
+ } while (!state_->state_.compare_exchange_strong(new_flags_.raw_ref(), flags.raw(), std::memory_order_acq_rel));
+ new_flags_ = flags;
+ return false;
+ }
+
+ bool try_add_signals(ActorSignals signals) {
+ CHECK(!own_lock());
+ CHECK(can_try_add_signals());
+ new_flags_ = flags_;
+ new_flags_.add_signals(signals);
+ return state_->state_.compare_exchange_strong(flags_.raw_ref(), new_flags_.raw(), std::memory_order_acq_rel);
+ }
+ bool add_signals(ActorSignals signals) {
+ CHECK(!own_lock());
+ while (true) {
+ if (can_try_add_signals()) {
+ if (try_add_signals(signals)) {
+ return false;
+ }
+ } else {
+ if (try_lock()) {
+ flags_.add_signals(signals);
+ return true;
+ }
+ }
+ }
+ }
+ bool own_lock() const {
+ return own_lock_;
+ }
+ ActorState::Flags flags() const {
+ return flags_;
+ }
+ bool can_execute() const {
+ return flags_.is_shared() == options_.is_shared && (options_.can_execute_paused || !flags_.is_pause());
+ }
+
+ private:
+ ActorState *state_{nullptr};
+ ActorState::Flags flags_;
+ ActorState::Flags new_flags_;
+ bool own_lock_{false};
+ Options options_;
+
+ bool can_try_add_signals() const {
+ return flags_.is_locked() || (flags_.is_in_queue() && !can_execute());
+ }
+};
+} // namespace actor2
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h b/libs/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h
new file mode 100644
index 0000000000..b7a7483022
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h
@@ -0,0 +1,84 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/utils/common.h"
+
+namespace td {
+namespace actor2 {
+class ActorSignals {
+ public:
+ ActorSignals() = default;
+ uint32 raw() const {
+ return raw_;
+ }
+ bool empty() const {
+ return raw_ == 0;
+ }
+ bool has_signal(uint32 signal) const {
+ return (raw_ & (1u << signal)) != 0;
+ }
+ void add_signal(uint32 signal) {
+ raw_ |= (1u << signal);
+ }
+ void add_signals(ActorSignals signals) {
+ raw_ |= signals.raw();
+ }
+ void clear_signal(uint32 signal) {
+ raw_ &= ~(1u << signal);
+ }
+ uint32 first_signal() {
+ if (!raw_) {
+ return 0;
+ }
+#if TD_MSVC
+ int res = 0;
+ int bit = 1;
+ while ((raw_ & bit) == 0) {
+ res++;
+ bit <<= 1;
+ }
+ return res;
+#else
+ return __builtin_ctz(raw_);
+#endif
+ }
+ enum Signal : uint32 {
+ // Signals in order of priority
+ Wakeup = 1,
+ Alarm = 2,
+ Kill = 3, // immediate kill
+ Io = 4, // move to io thread
+ Cpu = 5, // move to cpu thread
+ StartUp = 6,
+ TearDown = 7,
+ // Two signals for mpmc queue logic
+ //
+ // PopSignal is set after actor is popped from queue
+ // When processed it should set InQueue and Pause flags to false.
+ //
+ // MessagesSignal is set after new messages was added to actor
+ // If owner of actor wish to delay message handling, she should set InQueue flag to true and
+ // add actor into mpmc queue.
+ Pop = 8, // got popped from queue
+ Message = 9, // got new message
+ };
+
+ static ActorSignals one(uint32 signal) {
+ ActorSignals res;
+ res.add_signal(signal);
+ return res;
+ }
+
+ private:
+ uint32 raw_{0};
+ friend class ActorState;
+ explicit ActorSignals(uint32 raw) : raw_(raw) {
+ }
+};
+} // namespace actor2
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl2/ActorState.h b/libs/tdlib/td/tdactor/td/actor/impl2/ActorState.h
new file mode 100644
index 0000000000..02ead6bcf6
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl2/ActorState.h
@@ -0,0 +1,166 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl2/ActorSignals.h"
+#include "td/actor/impl2/SchedulerId.h"
+
+#include "td/utils/common.h"
+
+#include <atomic>
+
+namespace td {
+namespace actor2 {
+class ActorState {
+ public:
+ class Flags {
+ public:
+ Flags() = default;
+ uint32 raw() const {
+ return raw_;
+ }
+ uint32 &raw_ref() {
+ return raw_;
+ }
+ SchedulerId get_scheduler_id() const {
+ return SchedulerId{static_cast<uint8>(raw_ & SchedulerMask)};
+ }
+ void set_scheduler_id(SchedulerId id) {
+ raw_ = (raw_ & ~SchedulerMask) | id.value();
+ }
+
+ bool is_shared() const {
+ return check_flag(SharedFlag);
+ }
+ void set_shared(bool shared) {
+ set_flag(SharedFlag, shared);
+ }
+
+ bool is_locked() const {
+ return check_flag(LockFlag);
+ }
+ void set_locked(bool locked) {
+ set_flag(LockFlag, locked);
+ }
+
+ bool is_migrate() const {
+ return check_flag(MigrateFlag);
+ }
+ void set_migrate(bool migrate) {
+ set_flag(MigrateFlag, migrate);
+ }
+
+ bool is_pause() const {
+ return check_flag(PauseFlag);
+ }
+ void set_pause(bool pause) {
+ set_flag(PauseFlag, pause);
+ }
+
+ bool is_closed() const {
+ return check_flag(ClosedFlag);
+ }
+ void set_closed(bool closed) {
+ set_flag(ClosedFlag, closed);
+ }
+
+ bool is_in_queue() const {
+ return check_flag(InQueueFlag);
+ }
+ void set_in_queue(bool in_queue) {
+ set_flag(InQueueFlag, in_queue);
+ }
+
+ bool has_signals() const {
+ return check_flag(SignalMask);
+ }
+ void clear_signals() {
+ set_flag(SignalMask, false);
+ }
+ void set_signals(ActorSignals signals) {
+ raw_ = (raw_ & ~SignalMask) | (signals.raw() << SignalOffset);
+ }
+ void add_signals(ActorSignals signals) {
+ raw_ = raw_ | (signals.raw() << SignalOffset);
+ }
+ ActorSignals get_signals() const {
+ return ActorSignals{(raw_ & SignalMask) >> SignalOffset};
+ }
+
+ private:
+ uint32 raw_{0};
+
+ friend class ActorState;
+ Flags(uint32 raw) : raw_(raw) {
+ }
+
+ bool check_flag(uint32 mask) const {
+ return (raw_ & mask) != 0;
+ }
+ void set_flag(uint32 mask, bool flag) {
+ raw_ = (raw_ & ~mask) | (flag * mask);
+ }
+ };
+
+ Flags get_flags_unsafe() {
+ return Flags(state_.load(std::memory_order_relaxed));
+ }
+ void set_flags_unsafe(Flags flags) {
+ state_.store(flags.raw(), std::memory_order_relaxed);
+ }
+
+ private:
+ friend class ActorLocker;
+ std::atomic<uint32> state_{0};
+ enum : uint32 {
+ SchedulerMask = 255,
+
+ // Actors can be shared or not.
+ // If actor is shared, than any thread may try to lock it
+ // If actor is not shared, than it is owned by its scheduler, and only
+ // its scheduler is allowed to access it
+ // This flag may NOT change during the lifetime of an actor
+ SharedFlag = 1 << 9,
+
+ // Only shared actors need lock
+ // Lock if somebody is going to unlock it eventually.
+ // For example actor is locked, when some scheduler is executing its mailbox
+ // Or it is locked when it is in Mpmc queue, so someone will pop it eventually.
+ LockFlag = 1 << 10,
+
+ // While actor is migrating from one scheduler to another no one is allowed to change it
+ // Could not be set for shared actors.
+ MigrateFlag = 1 << 11,
+
+ // While set all messages are delayed
+ // Dropped from flush_maibox
+ // PauseFlag => InQueueFlag
+ PauseFlag = 1 << 12,
+
+ ClosedFlag = 1 << 13,
+
+ InQueueFlag = 1 << 14,
+
+ // Signals
+ SignalOffset = 15,
+ Signal = 1 << SignalOffset,
+ WakeupSignalFlag = Signal << ActorSignals::Wakeup,
+ AlarmSignalFlag = Signal << ActorSignals::Alarm,
+ KillSignalFlag = Signal << ActorSignals::Kill, // immediate kill
+ IoSignalFlag = Signal << ActorSignals::Io, // move to io thread
+ CpuSignalFlag = Signal << ActorSignals::Cpu, // move to cpu thread
+ StartUpSignalFlag = Signal << ActorSignals::StartUp,
+ TearDownSignalFlag = Signal << ActorSignals::TearDown,
+ MessageSignalFlag = Signal << ActorSignals::Message,
+ PopSignalFlag = Signal << ActorSignals::Pop,
+
+ SignalMask = WakeupSignalFlag | AlarmSignalFlag | KillSignalFlag | IoSignalFlag | CpuSignalFlag |
+ StartUpSignalFlag | TearDownSignalFlag | MessageSignalFlag | PopSignalFlag
+ };
+};
+} // namespace actor2
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp b/libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp
new file mode 100644
index 0000000000..720bf6bc4f
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp
@@ -0,0 +1,11 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/actor/impl2/Scheduler.h"
+
+namespace td {
+namespace actor2 {}
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.h b/libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.h
new file mode 100644
index 0000000000..9d5783b165
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.h
@@ -0,0 +1,1508 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/actor/impl2/ActorLocker.h"
+#include "td/actor/impl2/SchedulerId.h"
+
+#include "td/utils/Closure.h"
+#include "td/utils/common.h"
+#include "td/utils/format.h"
+#include "td/utils/Heap.h"
+#include "td/utils/List.h"
+#include "td/utils/logging.h"
+#include "td/utils/MpmcQueue.h"
+#include "td/utils/MpmcWaiter.h"
+#include "td/utils/MpscLinkQueue.h"
+#include "td/utils/MpscPollableQueue.h"
+#include "td/utils/port/Fd.h"
+#include "td/utils/port/Poll.h"
+#include "td/utils/port/thread.h"
+#include "td/utils/port/thread_local.h"
+#include "td/utils/ScopeGuard.h"
+#include "td/utils/SharedObjectPool.h"
+#include "td/utils/Slice.h"
+#include "td/utils/Time.h"
+#include "td/utils/type_traits.h"
+
+#include <atomic>
+#include <condition_variable>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <type_traits>
+#include <utility>
+
+namespace td {
+namespace actor2 {
+class Actor;
+
+template <class Impl>
+class Context {
+ public:
+ static Impl *get() {
+ return context_;
+ }
+ class Guard {
+ public:
+ explicit Guard(Impl *new_context) {
+ old_context_ = context_;
+ context_ = new_context;
+ }
+ ~Guard() {
+ context_ = old_context_;
+ }
+ Guard(const Guard &) = delete;
+ Guard &operator=(const Guard &) = delete;
+ Guard(Guard &&) = delete;
+ Guard &operator=(Guard &&) = delete;
+
+ private:
+ Impl *old_context_;
+ };
+
+ private:
+ static TD_THREAD_LOCAL Impl *context_;
+};
+
+template <class Impl>
+TD_THREAD_LOCAL Impl *Context<Impl>::context_;
+
+enum : uint64 { EmptyLinkToken = std::numeric_limits<uint64>::max() };
+
+class ActorExecuteContext : public Context<ActorExecuteContext> {
+ public:
+ explicit ActorExecuteContext(Actor *actor, Timestamp alarm_timestamp = Timestamp::never())
+ : actor_(actor), alarm_timestamp_(alarm_timestamp) {
+ }
+ Actor &actor() const {
+ CHECK(actor_);
+ return *actor_;
+ }
+ bool has_flags() const {
+ return flags_ != 0;
+ }
+ void set_stop() {
+ flags_ |= 1 << Stop;
+ }
+ bool get_stop() const {
+ return (flags_ & (1 << Stop)) != 0;
+ }
+ void set_pause() {
+ flags_ |= 1 << Pause;
+ }
+ bool get_pause() const {
+ return (flags_ & (1 << Pause)) != 0;
+ }
+ void clear_actor() {
+ actor_ = nullptr;
+ }
+ void set_link_token(uint64 link_token) {
+ link_token_ = link_token;
+ }
+ uint64 get_link_token() const {
+ return link_token_;
+ }
+ Timestamp &alarm_timestamp() {
+ flags_ |= 1 << Alarm;
+ return alarm_timestamp_;
+ }
+ bool get_alarm_flag() const {
+ return (flags_ & (1 << Alarm)) != 0;
+ }
+ Timestamp get_alarm_timestamp() const {
+ return alarm_timestamp_;
+ }
+
+ private:
+ Actor *actor_;
+ uint32 flags_{0};
+ uint64 link_token_{EmptyLinkToken};
+ Timestamp alarm_timestamp_;
+ enum { Stop, Pause, Alarm };
+};
+
+class ActorMessageImpl : private MpscLinkQueueImpl::Node {
+ public:
+ ActorMessageImpl() = default;
+ ActorMessageImpl(const ActorMessageImpl &) = delete;
+ ActorMessageImpl &operator=(const ActorMessageImpl &) = delete;
+ ActorMessageImpl(ActorMessageImpl &&other) = delete;
+ ActorMessageImpl &operator=(ActorMessageImpl &&other) = delete;
+ virtual ~ActorMessageImpl() = default;
+ virtual void run() = 0;
+ //virtual void run_anonymous() = 0;
+
+ // ActorMessage <--> MpscLintQueue::Node
+ // Each actor's mailbox will be a queue
+ static ActorMessageImpl *from_mpsc_link_queue_node(MpscLinkQueueImpl::Node *node) {
+ return static_cast<ActorMessageImpl *>(node);
+ }
+ MpscLinkQueueImpl::Node *to_mpsc_link_queue_node() {
+ return static_cast<MpscLinkQueueImpl::Node *>(this);
+ }
+
+ uint64 link_token_{EmptyLinkToken};
+ bool is_big_{false};
+};
+
+class ActorMessage {
+ public:
+ ActorMessage() = default;
+ explicit ActorMessage(std::unique_ptr<ActorMessageImpl> impl) : impl_(std::move(impl)) {
+ }
+ void run() {
+ CHECK(impl_);
+ impl_->run();
+ }
+ explicit operator bool() {
+ return bool(impl_);
+ }
+ friend class ActorMailbox;
+
+ void set_link_token(uint64 link_token) {
+ impl_->link_token_ = link_token;
+ }
+ uint64 get_link_token() const {
+ return impl_->link_token_;
+ }
+ bool is_big() const {
+ return impl_->is_big_;
+ }
+ void set_big() {
+ impl_->is_big_ = true;
+ }
+
+ private:
+ std::unique_ptr<ActorMessageImpl> impl_;
+
+ template <class T>
+ friend class td::MpscLinkQueue;
+
+ static ActorMessage from_mpsc_link_queue_node(MpscLinkQueueImpl::Node *node) {
+ return ActorMessage(std::unique_ptr<ActorMessageImpl>(ActorMessageImpl::from_mpsc_link_queue_node(node)));
+ }
+ MpscLinkQueueImpl::Node *to_mpsc_link_queue_node() {
+ return impl_.release()->to_mpsc_link_queue_node();
+ }
+};
+
+class ActorMailbox {
+ public:
+ ActorMailbox() = default;
+ ActorMailbox(const ActorMailbox &) = delete;
+ ActorMailbox &operator=(const ActorMailbox &) = delete;
+ ActorMailbox(ActorMailbox &&other) = delete;
+ ActorMailbox &operator=(ActorMailbox &&other) = delete;
+ ~ActorMailbox() {
+ pop_all();
+ while (reader_.read()) {
+ // skip
+ }
+ }
+ class Reader;
+ void push(ActorMessage message) {
+ queue_.push(std::move(message));
+ }
+ void push_unsafe(ActorMessage message) {
+ queue_.push_unsafe(std::move(message));
+ }
+
+ td::MpscLinkQueue<ActorMessage>::Reader &reader() {
+ return reader_;
+ }
+
+ void pop_all() {
+ queue_.pop_all(reader_);
+ }
+ void pop_all_unsafe() {
+ queue_.pop_all_unsafe(reader_);
+ }
+
+ private:
+ td::MpscLinkQueue<ActorMessage> queue_;
+ td::MpscLinkQueue<ActorMessage>::Reader reader_;
+};
+
+class ActorInfo
+ : private HeapNode
+ , private ListNode {
+ public:
+ ActorInfo(std::unique_ptr<Actor> actor, ActorState::Flags state_flags, Slice name)
+ : actor_(std::move(actor)), name_(name.begin(), name.size()) {
+ state_.set_flags_unsafe(state_flags);
+ }
+
+ bool has_actor() const {
+ return bool(actor_);
+ }
+ Actor &actor() {
+ CHECK(has_actor());
+ return *actor_;
+ }
+ Actor *actor_ptr() const {
+ return actor_.get();
+ }
+ void destroy_actor() {
+ actor_.reset();
+ }
+ ActorState &state() {
+ return state_;
+ }
+ ActorMailbox &mailbox() {
+ return mailbox_;
+ }
+ CSlice get_name() const {
+ return name_;
+ }
+
+ HeapNode *as_heap_node() {
+ return this;
+ }
+ static ActorInfo *from_heap_node(HeapNode *node) {
+ return static_cast<ActorInfo *>(node);
+ }
+
+ Timestamp &alarm_timestamp() {
+ return alarm_timestamp_;
+ }
+
+ private:
+ std::unique_ptr<Actor> actor_;
+ ActorState state_;
+ ActorMailbox mailbox_;
+ std::string name_;
+ Timestamp alarm_timestamp_;
+};
+
+using ActorInfoPtr = SharedObjectPool<ActorInfo>::Ptr;
+
+class Actor {
+ public:
+ Actor() = default;
+ Actor(const Actor &) = delete;
+ Actor &operator=(const Actor &) = delete;
+ Actor(Actor &&other) = delete;
+ Actor &operator=(Actor &&other) = delete;
+ virtual ~Actor() = default;
+
+ void set_actor_info_ptr(ActorInfoPtr actor_info_ptr) {
+ actor_info_ptr_ = std::move(actor_info_ptr);
+ }
+ ActorInfoPtr get_actor_info_ptr() {
+ return actor_info_ptr_;
+ }
+
+ protected:
+ // Signal handlers
+ virtual void start_up(); // StartUp signal handler
+ virtual void tear_down(); // TearDown signal handler (or Kill)
+ virtual void hang_up(); // HangUp signal handler
+ virtual void wake_up(); // WakeUp signal handler
+ virtual void alarm(); // Alarm signal handler
+
+ friend class ActorMessageHangup;
+
+ // Event handlers
+ //virtual void hangup_shared();
+ // TODO: raw event?
+
+ virtual void loop(); // default handler
+
+ // Useful functions
+ void yield(); // send wakeup signal to itself
+ void stop(); // send Kill signal to itself
+ Timestamp &alarm_timestamp() {
+ return ActorExecuteContext::get()->alarm_timestamp();
+ }
+ Timestamp get_alarm_timestamp() {
+ return ActorExecuteContext::get()->get_alarm_timestamp();
+ }
+
+ CSlice get_name() {
+ return actor_info_ptr_->get_name();
+ }
+
+ // Inteface to scheduler
+ // Query will be just passed to current scheduler
+ // Timeout functions
+ //bool has_timeout() const;
+ //void set_timeout_in(double timeout_in);
+ //void set_timeout_at(double timeout_at);
+ //void cancel_timeout();
+ //uint64 get_link_token(); // get current request's link_token
+ //set context that will be inherited by all childrens
+ //void set_context(std::shared_ptr<ActorContext> context);
+
+ //ActorShared<> actor_shared(); // ActorShared to itself
+ //template <class SelfT>
+ //ActorShared<SelfT> actor_shared(SelfT *self, uint64 id = static_cast<uint64>(-1)); // ActorShared with type
+
+ // Create EventFull to itself
+ //template <class FuncT, class... ArgsT>
+ //auto self_closure(FuncT &&func, ArgsT &&... args);
+ //template <class SelfT, class FuncT, class... ArgsT>
+ //auto self_closure(SelfT *self, FuncT &&func, ArgsT &&... args);
+ //template <class LambdaT>
+ //auto self_lambda(LambdaT &&lambda);
+
+ //void do_stop(); // process Kill signal immediately
+
+ private:
+ friend class ActorExecutor;
+ ActorInfoPtr actor_info_ptr_;
+};
+// Signal handlers
+inline void Actor::start_up() {
+ yield();
+}
+inline void Actor::tear_down() {
+ // noop
+}
+inline void Actor::hang_up() {
+ stop();
+}
+inline void Actor::wake_up() {
+ loop();
+}
+inline void Actor::alarm() {
+ loop();
+}
+
+inline void Actor::loop() {
+ // noop
+}
+
+// Useful functions
+inline void Actor::yield() {
+ // TODO
+}
+inline void Actor::stop() {
+ ActorExecuteContext::get()->set_stop();
+}
+
+class ActorInfoCreator {
+ public:
+ class Options {
+ public:
+ Options() = default;
+
+ Options &with_name(Slice new_name) {
+ name = new_name;
+ return *this;
+ }
+
+ Options &on_scheduler(SchedulerId new_scheduler_id) {
+ scheduler_id = new_scheduler_id;
+ return *this;
+ }
+ bool has_scheduler() const {
+ return scheduler_id.is_valid();
+ }
+ Options &with_poll() {
+ is_shared = false;
+ return *this;
+ }
+
+ private:
+ friend class ActorInfoCreator;
+ Slice name;
+ SchedulerId scheduler_id;
+ bool is_shared{true};
+ bool in_queue{true};
+ //TODO: rename
+ };
+
+ //Create unlocked actor. One must send StartUp signal immediately.
+ ActorInfoPtr create(std::unique_ptr<Actor> actor, const Options &args) {
+ ActorState::Flags flags;
+ flags.set_scheduler_id(args.scheduler_id);
+ flags.set_shared(args.is_shared);
+ flags.set_in_queue(args.in_queue);
+ flags.set_signals(ActorSignals::one(ActorSignals::StartUp));
+
+ auto actor_info_ptr = pool_.alloc(std::move(actor), flags, args.name);
+ actor_info_ptr->actor().set_actor_info_ptr(actor_info_ptr);
+ return actor_info_ptr;
+ }
+
+ ActorInfoCreator() = default;
+ ActorInfoCreator(const ActorInfoCreator &) = delete;
+ ActorInfoCreator &operator=(const ActorInfoCreator &) = delete;
+ ActorInfoCreator(ActorInfoCreator &&other) = delete;
+ ActorInfoCreator &operator=(ActorInfoCreator &&other) = delete;
+ ~ActorInfoCreator() {
+ pool_.for_each([](auto &actor_info) { actor_info.destroy_actor(); });
+ }
+
+ private:
+ SharedObjectPool<ActorInfo> pool_;
+};
+
+using ActorOptions = ActorInfoCreator::Options;
+
+class SchedulerDispatcher {
+ public:
+ virtual SchedulerId get_scheduler_id() const = 0;
+ virtual void add_to_queue(ActorInfoPtr actor_info_ptr, SchedulerId scheduler_id, bool need_poll) = 0;
+ virtual void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) = 0;
+
+ SchedulerDispatcher() = default;
+ SchedulerDispatcher(const SchedulerDispatcher &) = delete;
+ SchedulerDispatcher &operator=(const SchedulerDispatcher &) = delete;
+ SchedulerDispatcher(SchedulerDispatcher &&other) = delete;
+ SchedulerDispatcher &operator=(SchedulerDispatcher &&other) = delete;
+ virtual ~SchedulerDispatcher() = default;
+};
+
+class ActorExecutor {
+ public:
+ struct Options {
+ Options &with_from_queue() {
+ from_queue = true;
+ return *this;
+ }
+ Options &with_has_poll(bool new_has_poll) {
+ this->has_poll = new_has_poll;
+ return *this;
+ }
+ bool from_queue{false};
+ bool has_poll{false};
+ };
+ ActorExecutor(ActorInfo &actor_info, SchedulerDispatcher &dispatcher, Options options)
+ : actor_info_(actor_info), dispatcher_(dispatcher), options_(options) {
+ //LOG(ERROR) << "START " << actor_info_.get_name() << " " << tag("from_queue", from_queue);
+ start();
+ }
+ ActorExecutor(const ActorExecutor &) = delete;
+ ActorExecutor &operator=(const ActorExecutor &) = delete;
+ ActorExecutor(ActorExecutor &&other) = delete;
+ ActorExecutor &operator=(ActorExecutor &&other) = delete;
+ ~ActorExecutor() {
+ //LOG(ERROR) << "FINISH " << actor_info_.get_name() << " " << tag("own_lock", actor_locker_.own_lock());
+ finish();
+ }
+
+ // our best guess if actor is closed or not
+ bool can_send() {
+ return !flags().is_closed();
+ }
+
+ bool can_send_immediate() {
+ return actor_locker_.own_lock() && !actor_execute_context_.has_flags() && actor_locker_.can_execute();
+ }
+
+ template <class F>
+ void send_immediate(F &&f, uint64 link_token) {
+ CHECK(can_send_immediate());
+ if (!can_send()) {
+ return;
+ }
+ actor_execute_context_.set_link_token(link_token);
+ f();
+ }
+ void send_immediate(ActorMessage message) {
+ CHECK(can_send_immediate());
+ if (message.is_big()) {
+ actor_info_.mailbox().reader().delay(std::move(message));
+ pending_signals_.add_signal(ActorSignals::Message);
+ actor_execute_context_.set_pause();
+ return;
+ }
+ actor_execute_context_.set_link_token(message.get_link_token());
+ message.run();
+ }
+ void send_immediate(ActorSignals signals) {
+ CHECK(can_send_immediate());
+ while (flush_one_signal(signals) && can_send_immediate()) {
+ }
+ pending_signals_.add_signals(signals);
+ }
+
+ void send(ActorMessage message) {
+ if (!can_send()) {
+ return;
+ }
+ if (can_send_immediate()) {
+ return send_immediate(std::move(message));
+ }
+ actor_info_.mailbox().push(std::move(message));
+ pending_signals_.add_signal(ActorSignals::Message);
+ }
+
+ void send(ActorSignals signals) {
+ if (!can_send()) {
+ return;
+ }
+
+ pending_signals_.add_signals(signals);
+ }
+
+ private:
+ ActorInfo &actor_info_;
+ SchedulerDispatcher &dispatcher_;
+ Options options_;
+ ActorLocker actor_locker_{
+ &actor_info_.state(),
+ ActorLocker::Options().with_can_execute_paused(options_.from_queue).with_is_shared(!options_.has_poll)};
+
+ ActorExecuteContext actor_execute_context_{actor_info_.actor_ptr(), actor_info_.alarm_timestamp()};
+ ActorExecuteContext::Guard guard{&actor_execute_context_};
+
+ ActorState::Flags flags_;
+ ActorSignals pending_signals_;
+
+ ActorState::Flags &flags() {
+ return flags_;
+ }
+
+ void start() {
+ if (!can_send()) {
+ return;
+ }
+
+ ActorSignals signals;
+ SCOPE_EXIT {
+ pending_signals_.add_signals(signals);
+ };
+
+ if (options_.from_queue) {
+ signals.add_signal(ActorSignals::Pop);
+ }
+
+ actor_locker_.try_lock();
+ flags_ = actor_locker_.flags();
+
+ if (!actor_locker_.own_lock()) {
+ return;
+ }
+
+ if (options_.from_queue) {
+ flags().set_pause(false);
+ }
+ if (!actor_locker_.can_execute()) {
+ CHECK(!options_.from_queue);
+ return;
+ }
+
+ signals.add_signals(flags().get_signals());
+ actor_info_.mailbox().pop_all();
+
+ while (!actor_execute_context_.has_flags() && flush_one(signals)) {
+ }
+ }
+
+ void finish() {
+ if (!actor_locker_.own_lock()) {
+ if (!pending_signals_.empty() && actor_locker_.add_signals(pending_signals_)) {
+ flags_ = actor_locker_.flags();
+ } else {
+ return;
+ }
+ }
+ CHECK(actor_locker_.own_lock());
+
+ if (actor_execute_context_.has_flags()) {
+ if (actor_execute_context_.get_stop()) {
+ if (actor_info_.alarm_timestamp()) {
+ dispatcher_.set_alarm_timestamp(actor_info_.actor().get_actor_info_ptr(), Timestamp::never());
+ }
+ flags_.set_closed(true);
+ actor_info_.actor().tear_down();
+ actor_info_.destroy_actor();
+ return;
+ }
+ if (actor_execute_context_.get_pause()) {
+ flags_.set_pause(true);
+ }
+ if (actor_execute_context_.get_alarm_flag()) {
+ auto old_timestamp = actor_info_.alarm_timestamp();
+ auto new_timestamp = actor_execute_context_.get_alarm_timestamp();
+ if (!(old_timestamp == new_timestamp)) {
+ actor_info_.alarm_timestamp() = new_timestamp;
+ dispatcher_.set_alarm_timestamp(actor_info_.actor().get_actor_info_ptr(), new_timestamp);
+ }
+ }
+ }
+ flags_.set_signals(pending_signals_);
+
+ bool add_to_queue = false;
+ while (true) {
+ // Drop InQueue flag if has pop signal
+ // Can't delay this signal
+ auto signals = flags().get_signals();
+ if (signals.has_signal(ActorSignals::Pop)) {
+ signals.clear_signal(ActorSignals::Pop);
+ flags().set_signals(signals);
+ flags().set_in_queue(false);
+ }
+
+ if (flags().has_signals() && !flags().is_in_queue()) {
+ add_to_queue = true;
+ flags().set_in_queue(true);
+ }
+ if (actor_locker_.try_unlock(flags())) {
+ if (add_to_queue) {
+ dispatcher_.add_to_queue(actor_info_.actor().get_actor_info_ptr(), flags().get_scheduler_id(),
+ !flags().is_shared());
+ }
+ break;
+ }
+ flags_ = actor_locker_.flags();
+ }
+ }
+
+ bool flush_one(ActorSignals &signals) {
+ return flush_one_signal(signals) || flush_one_message();
+ }
+
+ bool flush_one_signal(ActorSignals &signals) {
+ auto signal = signals.first_signal();
+ if (!signal) {
+ return false;
+ }
+ switch (signal) {
+ case ActorSignals::Wakeup:
+ actor_info_.actor().wake_up();
+ break;
+ case ActorSignals::Alarm:
+ if (actor_execute_context_.get_alarm_timestamp().is_in_past()) {
+ actor_execute_context_.alarm_timestamp() = Timestamp::never();
+ actor_info_.actor().alarm();
+ }
+ break;
+ case ActorSignals::Kill:
+ actor_execute_context_.set_stop();
+ break;
+ case ActorSignals::StartUp:
+ actor_info_.actor().start_up();
+ break;
+ case ActorSignals::TearDown:
+ actor_info_.actor().tear_down();
+ break;
+ case ActorSignals::Pop:
+ flags().set_in_queue(false);
+ break;
+
+ case ActorSignals::Message:
+ break;
+ case ActorSignals::Io:
+ case ActorSignals::Cpu:
+ LOG(FATAL) << "TODO";
+ default:
+ UNREACHABLE();
+ }
+ signals.clear_signal(signal);
+ return true;
+ }
+
+ bool flush_one_message() {
+ auto message = actor_info_.mailbox().reader().read();
+ if (!message) {
+ return false;
+ }
+ if (message.is_big() && !options_.from_queue) {
+ actor_info_.mailbox().reader().delay(std::move(message));
+ pending_signals_.add_signal(ActorSignals::Message);
+ actor_execute_context_.set_pause();
+ return false;
+ }
+
+ actor_execute_context_.set_link_token(message.get_link_token());
+ message.run();
+ return true;
+ }
+};
+
+using SchedulerMessage = ActorInfoPtr;
+
+struct WorkerInfo {
+ enum class Type { Io, Cpu } type{Type::Io};
+ WorkerInfo() = default;
+ explicit WorkerInfo(Type type) : type(type) {
+ }
+ ActorInfoCreator actor_info_creator;
+};
+
+struct SchedulerInfo {
+ SchedulerId id;
+ // will be read by all workers is any thread
+ std::unique_ptr<MpmcQueue<SchedulerMessage>> cpu_queue;
+ std::unique_ptr<MpmcWaiter> cpu_queue_waiter;
+ // only scheduler itself may read from io_queue_
+ std::unique_ptr<MpscPollableQueue<SchedulerMessage>> io_queue;
+ size_t cpu_threads_count{0};
+
+ std::unique_ptr<WorkerInfo> io_worker;
+ std::vector<std::unique_ptr<WorkerInfo>> cpu_workers;
+};
+
+struct SchedulerGroupInfo {
+ explicit SchedulerGroupInfo(size_t n) : schedulers(n) {
+ }
+ std::atomic<bool> is_stop_requested{false};
+
+ int active_scheduler_count{0};
+ std::mutex active_scheduler_count_mutex;
+ std::condition_variable active_scheduler_count_condition_variable;
+
+ std::vector<SchedulerInfo> schedulers;
+};
+
+class SchedulerContext
+ : public Context<SchedulerContext>
+ , public SchedulerDispatcher {
+ public:
+ // DispatcherInterface
+ SchedulerDispatcher &dispatcher() {
+ return *this;
+ }
+
+ // ActorCreator Interface
+ virtual ActorInfoCreator &get_actor_info_creator() = 0;
+
+ // Poll interface
+ virtual bool has_poll() = 0;
+ virtual Poll &get_poll() = 0;
+
+ // Timeout interface
+ virtual bool has_heap() = 0;
+ virtual KHeap<double> &get_heap() = 0;
+
+ // Stop all schedulers
+ virtual bool is_stop_requested() = 0;
+ virtual void stop() = 0;
+};
+
+#if !TD_THREAD_UNSUPPORTED
+class Scheduler {
+ public:
+ Scheduler(std::shared_ptr<SchedulerGroupInfo> scheduler_group_info, SchedulerId id, size_t cpu_threads_count)
+ : scheduler_group_info_(std::move(scheduler_group_info)), cpu_threads_(cpu_threads_count) {
+ scheduler_group_info_->active_scheduler_count++;
+ info_ = &scheduler_group_info_->schedulers.at(id.value());
+ info_->id = id;
+ if (cpu_threads_count != 0) {
+ info_->cpu_threads_count = cpu_threads_count;
+ info_->cpu_queue = std::make_unique<MpmcQueue<SchedulerMessage>>(1024, max_thread_count());
+ info_->cpu_queue_waiter = std::make_unique<MpmcWaiter>();
+ }
+ info_->io_queue = std::make_unique<MpscPollableQueue<SchedulerMessage>>();
+ info_->io_queue->init();
+
+ info_->cpu_workers.resize(cpu_threads_count);
+ for (auto &worker : info_->cpu_workers) {
+ worker = std::make_unique<WorkerInfo>(WorkerInfo::Type::Cpu);
+ }
+ info_->io_worker = std::make_unique<WorkerInfo>(WorkerInfo::Type::Io);
+
+ poll_.init();
+ io_worker_ = std::make_unique<IoWorker>(*info_->io_queue);
+ }
+
+ Scheduler(const Scheduler &) = delete;
+ Scheduler &operator=(const Scheduler &) = delete;
+ Scheduler(Scheduler &&other) = delete;
+ Scheduler &operator=(Scheduler &&other) = delete;
+ ~Scheduler() {
+ // should stop
+ stop();
+ do_stop();
+ }
+
+ void start() {
+ for (size_t i = 0; i < cpu_threads_.size(); i++) {
+ cpu_threads_[i] = td::thread([this, i] {
+ this->run_in_context_impl(*this->info_->cpu_workers[i],
+ [this] { CpuWorker(*info_->cpu_queue, *info_->cpu_queue_waiter).run(); });
+ });
+ }
+ this->run_in_context([this] { this->io_worker_->start_up(); });
+ }
+
+ template <class F>
+ void run_in_context(F &&f) {
+ run_in_context_impl(*info_->io_worker, std::forward<F>(f));
+ }
+
+ bool run(double timeout) {
+ bool res;
+ run_in_context_impl(*info_->io_worker, [this, timeout, &res] {
+ if (SchedulerContext::get()->is_stop_requested()) {
+ res = false;
+ } else {
+ res = io_worker_->run_once(timeout);
+ }
+ if (!res) {
+ io_worker_->tear_down();
+ }
+ });
+ if (!res) {
+ do_stop();
+ }
+ return res;
+ }
+
+ // Just syntactic sugar
+ void stop() {
+ run_in_context([] { SchedulerContext::get()->stop(); });
+ }
+
+ SchedulerId get_scheduler_id() const {
+ return info_->id;
+ }
+
+ private:
+ std::shared_ptr<SchedulerGroupInfo> scheduler_group_info_;
+ SchedulerInfo *info_;
+ std::vector<td::thread> cpu_threads_;
+ bool is_stopped_{false};
+ Poll poll_;
+ KHeap<double> heap_;
+ class IoWorker;
+ std::unique_ptr<IoWorker> io_worker_;
+
+ class SchedulerContextImpl : public SchedulerContext {
+ public:
+ SchedulerContextImpl(WorkerInfo *worker, SchedulerInfo *scheduler, SchedulerGroupInfo *scheduler_group, Poll *poll,
+ KHeap<double> *heap)
+ : worker_(worker), scheduler_(scheduler), scheduler_group_(scheduler_group), poll_(poll), heap_(heap) {
+ }
+
+ SchedulerId get_scheduler_id() const override {
+ return scheduler()->id;
+ }
+ void add_to_queue(ActorInfoPtr actor_info_ptr, SchedulerId scheduler_id, bool need_poll) override {
+ if (!scheduler_id.is_valid()) {
+ scheduler_id = scheduler()->id;
+ }
+ auto &info = scheduler_group()->schedulers.at(scheduler_id.value());
+ if (need_poll) {
+ info.io_queue->writer_put(std::move(actor_info_ptr));
+ } else {
+ info.cpu_queue->push(std::move(actor_info_ptr), get_thread_id());
+ info.cpu_queue_waiter->notify();
+ }
+ }
+
+ ActorInfoCreator &get_actor_info_creator() override {
+ return worker()->actor_info_creator;
+ }
+
+ bool has_poll() override {
+ return poll_ != nullptr;
+ }
+ Poll &get_poll() override {
+ CHECK(has_poll());
+ return *poll_;
+ }
+
+ bool has_heap() override {
+ return heap_ != nullptr;
+ }
+ KHeap<double> &get_heap() override {
+ CHECK(has_heap());
+ return *heap_;
+ }
+
+ void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) override {
+ // we are in PollWorker
+ CHECK(has_heap());
+ auto &heap = get_heap();
+ auto *heap_node = actor_info_ptr->as_heap_node();
+ if (timestamp) {
+ if (heap_node->in_heap()) {
+ heap.fix(timestamp.at(), heap_node);
+ } else {
+ heap.insert(timestamp.at(), heap_node);
+ }
+ } else {
+ if (heap_node->in_heap()) {
+ heap.erase(heap_node);
+ }
+ }
+
+ // TODO: do something in plain worker
+ }
+
+ bool is_stop_requested() override {
+ return scheduler_group()->is_stop_requested;
+ }
+
+ void stop() override {
+ bool expect_false = false;
+ // Trying to set close_flag_ to true with CAS
+ auto &group = *scheduler_group();
+ if (!group.is_stop_requested.compare_exchange_strong(expect_false, true)) {
+ return;
+ }
+
+ // Notify all workers of all schedulers
+ for (auto &scheduler_info : group.schedulers) {
+ scheduler_info.io_queue->writer_put({});
+ for (size_t i = 0; i < scheduler_info.cpu_threads_count; i++) {
+ scheduler_info.cpu_queue->push({}, get_thread_id());
+ scheduler_info.cpu_queue_waiter->notify();
+ }
+ }
+ }
+
+ private:
+ WorkerInfo *worker() const {
+ return worker_;
+ }
+ SchedulerInfo *scheduler() const {
+ return scheduler_;
+ }
+ SchedulerGroupInfo *scheduler_group() const {
+ return scheduler_group_;
+ }
+
+ WorkerInfo *worker_;
+ SchedulerInfo *scheduler_;
+ SchedulerGroupInfo *scheduler_group_;
+ Poll *poll_;
+
+ KHeap<double> *heap_;
+ };
+
+ template <class F>
+ void run_in_context_impl(WorkerInfo &worker_info, F &&f) {
+ bool is_io_worker = worker_info.type == WorkerInfo::Type::Io;
+ SchedulerContextImpl context(&worker_info, info_, scheduler_group_info_.get(), is_io_worker ? &poll_ : nullptr,
+ is_io_worker ? &heap_ : nullptr);
+ SchedulerContext::Guard guard(&context);
+ f();
+ }
+
+ class CpuWorker {
+ public:
+ CpuWorker(MpmcQueue<SchedulerMessage> &queue, MpmcWaiter &waiter) : queue_(queue), waiter_(waiter) {
+ }
+ void run() {
+ auto thread_id = get_thread_id();
+ auto &dispatcher = SchedulerContext::get()->dispatcher();
+
+ int yields = 0;
+ while (true) {
+ SchedulerMessage message;
+ if (queue_.try_pop(message, thread_id)) {
+ if (!message) {
+ return;
+ }
+ ActorExecutor executor(*message, dispatcher, ActorExecutor::Options().with_from_queue());
+ yields = waiter_.stop_wait(yields, thread_id);
+ } else {
+ yields = waiter_.wait(yields, thread_id);
+ }
+ }
+ }
+
+ private:
+ MpmcQueue<SchedulerMessage> &queue_;
+ MpmcWaiter &waiter_;
+ };
+
+ class IoWorker {
+ public:
+ explicit IoWorker(MpscPollableQueue<SchedulerMessage> &queue) : queue_(queue) {
+ }
+
+ void start_up() {
+ auto &poll = SchedulerContext::get()->get_poll();
+ poll.subscribe(queue_.reader_get_event_fd().get_fd(), Fd::Flag::Read);
+ }
+ void tear_down() {
+ auto &poll = SchedulerContext::get()->get_poll();
+ poll.unsubscribe(queue_.reader_get_event_fd().get_fd());
+ }
+
+ bool run_once(double timeout) {
+ auto &dispatcher = SchedulerContext::get()->dispatcher();
+ auto &poll = SchedulerContext::get()->get_poll();
+ auto &heap = SchedulerContext::get()->get_heap();
+
+ auto now = Time::now(); // update Time::now_cached()
+ while (!heap.empty() && heap.top_key() <= now) {
+ auto *heap_node = heap.pop();
+ auto *actor_info = ActorInfo::from_heap_node(heap_node);
+
+ ActorExecutor executor(*actor_info, dispatcher, ActorExecutor::Options().with_has_poll(true));
+ if (executor.can_send_immediate()) {
+ executor.send_immediate(ActorSignals::one(ActorSignals::Alarm));
+ } else {
+ executor.send(ActorSignals::one(ActorSignals::Alarm));
+ }
+ }
+
+ const int size = queue_.reader_wait_nonblock();
+ for (int i = 0; i < size; i++) {
+ auto message = queue_.reader_get_unsafe();
+ if (!message) {
+ return false;
+ }
+ ActorExecutor executor(*message, dispatcher, ActorExecutor::Options().with_from_queue().with_has_poll(true));
+ }
+ queue_.reader_flush();
+
+ bool can_sleep = size == 0 && timeout != 0;
+ int32 timeout_ms = 0;
+ if (can_sleep) {
+ auto wakeup_timestamp = Timestamp::in(timeout);
+ if (!heap.empty()) {
+ wakeup_timestamp.relax(Timestamp::at(heap.top_key()));
+ }
+ timeout_ms = static_cast<int>(wakeup_timestamp.in() * 1000) + 1;
+ if (timeout_ms < 0) {
+ timeout_ms = 0;
+ }
+ //const int thirty_seconds = 30 * 1000;
+ //if (timeout_ms > thirty_seconds) {
+ //timeout_ms = thirty_seconds;
+ //}
+ }
+ poll.run(timeout_ms);
+ return true;
+ }
+
+ private:
+ MpscPollableQueue<SchedulerMessage> &queue_;
+ };
+
+ void do_stop() {
+ if (is_stopped_) {
+ return;
+ }
+ // wait other threads to finish
+ for (auto &thread : cpu_threads_) {
+ thread.join();
+ }
+ // Can't do anything else, other schedulers may send queries to this one.
+ // Must wait till every scheduler is stopped first..
+ is_stopped_ = true;
+
+ io_worker_.reset();
+ poll_.clear();
+
+ std::unique_lock<std::mutex> lock(scheduler_group_info_->active_scheduler_count_mutex);
+ scheduler_group_info_->active_scheduler_count--;
+ scheduler_group_info_->active_scheduler_count_condition_variable.notify_all();
+ }
+
+ public:
+ static void close_scheduler_group(SchedulerGroupInfo &group_info) {
+ LOG(ERROR) << "close scheduler group";
+ // Cannot close scheduler group before somebody asked to stop them
+ CHECK(group_info.is_stop_requested);
+ {
+ std::unique_lock<std::mutex> lock(group_info.active_scheduler_count_mutex);
+ group_info.active_scheduler_count_condition_variable.wait(lock,
+ [&] { return group_info.active_scheduler_count == 0; });
+ }
+
+ // Drain all queues
+ // Just to destroy all elements should be ok.
+ for (auto &scheduler_info : group_info.schedulers) {
+ // Drain io queue
+ auto &io_queue = *scheduler_info.io_queue;
+ while (true) {
+ int n = io_queue.reader_wait_nonblock();
+ if (n == 0) {
+ break;
+ }
+ while (n-- > 0) {
+ auto message = io_queue.reader_get_unsafe();
+ // message's destructor is called
+ }
+ }
+ scheduler_info.io_queue.reset();
+
+ // Drain cpu queue
+ auto &cpu_queue = *scheduler_info.cpu_queue;
+ while (true) {
+ SchedulerMessage message;
+ if (!cpu_queue.try_pop(message, get_thread_id())) {
+ break;
+ }
+ // message's destructor is called
+ }
+ scheduler_info.cpu_queue.reset();
+
+ // Do not destroy worker infos. run_in_context will crash if they are empty
+ }
+ }
+};
+
+// Actor messages
+template <class LambdaT>
+class ActorMessageLambda : public ActorMessageImpl {
+ public:
+ template <class FromLambdaT>
+ explicit ActorMessageLambda(FromLambdaT &&lambda) : lambda_(std::forward<FromLambdaT>(lambda)) {
+ }
+ void run() override {
+ lambda_();
+ }
+
+ private:
+ LambdaT lambda_;
+};
+
+class ActorMessageHangup : public ActorMessageImpl {
+ public:
+ void run() override {
+ ActorExecuteContext::get()->actor().hang_up();
+ }
+};
+
+class ActorMessageCreator {
+ public:
+ template <class F>
+ static ActorMessage lambda(F &&f) {
+ return ActorMessage(std::make_unique<ActorMessageLambda<F>>(std::forward<F>(f)));
+ }
+
+ static ActorMessage hangup() {
+ return ActorMessage(std::make_unique<ActorMessageHangup>());
+ }
+
+ // Use faster allocation?
+};
+
+// SYNTAX SHUGAR
+namespace detail {
+struct ActorRef {
+ ActorRef(ActorInfo &actor_info, uint64 link_token = EmptyLinkToken) : actor_info(actor_info), link_token(link_token) {
+ }
+
+ ActorInfo &actor_info;
+ uint64 link_token;
+};
+
+template <class T>
+T &current_actor() {
+ return static_cast<T &>(ActorExecuteContext::get()->actor());
+}
+
+void send_message(ActorInfo &actor_info, ActorMessage message) {
+ ActorExecutor executor(actor_info, SchedulerContext::get()->dispatcher(), ActorExecutor::Options());
+ executor.send(std::move(message));
+}
+
+void send_message(ActorRef actor_ref, ActorMessage message) {
+ message.set_link_token(actor_ref.link_token);
+ send_message(actor_ref.actor_info, std::move(message));
+}
+void send_message_later(ActorInfo &actor_info, ActorMessage message) {
+ ActorExecutor executor(actor_info, SchedulerContext::get()->dispatcher(), ActorExecutor::Options());
+ executor.send(std::move(message));
+}
+
+void send_message_later(ActorRef actor_ref, ActorMessage message) {
+ message.set_link_token(actor_ref.link_token);
+ send_message_later(actor_ref.actor_info, std::move(message));
+}
+
+template <class ExecuteF, class ToMessageF>
+void send_immediate(ActorRef actor_ref, ExecuteF &&execute, ToMessageF &&to_message) {
+ auto &scheduler_context = *SchedulerContext::get();
+ ActorExecutor executor(actor_ref.actor_info, scheduler_context.dispatcher(),
+ ActorExecutor::Options().with_has_poll(scheduler_context.has_poll()));
+ if (executor.can_send_immediate()) {
+ return executor.send_immediate(execute, actor_ref.link_token);
+ }
+ auto message = to_message();
+ message.set_link_token(actor_ref.link_token);
+ executor.send(std::move(message));
+}
+
+template <class F>
+void send_lambda(ActorRef actor_ref, F &&lambda) {
+ send_immediate(actor_ref, lambda, [&lambda]() mutable { return ActorMessageCreator::lambda(std::move(lambda)); });
+}
+template <class F>
+void send_lambda_later(ActorRef actor_ref, F &&lambda) {
+ send_message_later(actor_ref, ActorMessageCreator::lambda(std::move(lambda)));
+}
+
+template <class ClosureT>
+void send_closure_impl(ActorRef actor_ref, ClosureT &&closure) {
+ using ActorType = typename ClosureT::ActorType;
+ send_immediate(actor_ref, [&closure]() mutable { closure.run(&current_actor<ActorType>()); },
+ [&closure]() mutable {
+ return ActorMessageCreator::lambda([closure = to_delayed_closure(std::move(closure))]() mutable {
+ closure.run(&current_actor<ActorType>());
+ });
+ });
+}
+
+template <class... ArgsT>
+void send_closure(ActorRef actor_ref, ArgsT &&... args) {
+ send_closure_impl(actor_ref, create_immediate_closure(std::forward<ArgsT>(args)...));
+}
+
+template <class ClosureT>
+void send_closure_later_impl(ActorRef actor_ref, ClosureT &&closure) {
+ using ActorType = typename ClosureT::ActorType;
+ send_message_later(actor_ref, ActorMessageCreator::lambda([closure = std::move(closure)]() mutable {
+ closure.run(&current_actor<ActorType>());
+ }));
+}
+
+template <class... ArgsT>
+void send_closure_later(ActorRef actor_ref, ArgsT &&... args) {
+ send_closure_later_impl(actor_ref, create_delayed_closure(std::forward<ArgsT>(args)...));
+}
+
+void register_actor_info_ptr(ActorInfoPtr actor_info_ptr) {
+ auto state = actor_info_ptr->state().get_flags_unsafe();
+ SchedulerContext::get()->add_to_queue(std::move(actor_info_ptr), state.get_scheduler_id(), !state.is_shared());
+}
+
+template <class T, class... ArgsT>
+ActorInfoPtr create_actor(ActorOptions &options, ArgsT &&... args) {
+ auto *scheduler_context = SchedulerContext::get();
+ if (!options.has_scheduler()) {
+ options.on_scheduler(scheduler_context->get_scheduler_id());
+ }
+ auto res =
+ scheduler_context->get_actor_info_creator().create(std::make_unique<T>(std::forward<ArgsT>(args)...), options);
+ register_actor_info_ptr(res);
+ return res;
+}
+} // namespace detail
+
+// Essentially ActorInfoWeakPtr with Type
+template <class ActorType = Actor>
+class ActorId {
+ public:
+ using ActorT = ActorType;
+ ActorId() = default;
+ ActorId(const ActorId &) = default;
+ ActorId &operator=(const ActorId &) = default;
+ ActorId(ActorId &&other) = default;
+ ActorId &operator=(ActorId &&other) = default;
+
+ // allow only conversion from child to parent
+ template <class ToActorType, class = std::enable_if_t<std::is_base_of<ToActorType, ActorType>::value>>
+ explicit operator ActorId<ToActorType>() const {
+ return ActorId<ToActorType>(ptr_);
+ }
+
+ const ActorInfoPtr &actor_info_ptr() const {
+ return ptr_;
+ }
+
+ ActorInfo &actor_info() const {
+ CHECK(ptr_);
+ return *ptr_;
+ }
+ bool empty() const {
+ return !ptr_;
+ }
+
+ template <class... ArgsT>
+ static ActorId<ActorType> create(ActorOptions &options, ArgsT &&... args) {
+ return ActorId<ActorType>(detail::create_actor<ActorType>(options, std::forward<ArgsT>(args)...));
+ }
+
+ detail::ActorRef as_actor_ref() const {
+ CHECK(!empty());
+ return detail::ActorRef(*actor_info_ptr());
+ }
+
+ private:
+ ActorInfoPtr ptr_;
+
+ explicit ActorId(ActorInfoPtr ptr) : ptr_(std::move(ptr)) {
+ }
+
+ template <class SelfT>
+ friend ActorId<SelfT> actor_id(SelfT *self);
+};
+
+template <class ActorType = Actor>
+class ActorOwn {
+ public:
+ using ActorT = ActorType;
+ ActorOwn() = default;
+ explicit ActorOwn(ActorId<ActorType> id) : id_(std::move(id)) {
+ }
+ template <class OtherActorType>
+ explicit ActorOwn(ActorId<OtherActorType> id) : id_(std::move(id)) {
+ }
+ template <class OtherActorType>
+ explicit ActorOwn(ActorOwn<OtherActorType> &&other) : id_(other.release()) {
+ }
+ template <class OtherActorType>
+ ActorOwn &operator=(ActorOwn<OtherActorType> &&other) {
+ reset(other.release());
+ }
+ ActorOwn(ActorOwn &&other) : id_(other.release()) {
+ }
+ ActorOwn &operator=(ActorOwn &&other) {
+ reset(other.release());
+ }
+ ActorOwn(const ActorOwn &) = delete;
+ ActorOwn &operator=(const ActorOwn &) = delete;
+ ~ActorOwn() {
+ reset();
+ }
+
+ bool empty() const {
+ return id_.empty();
+ }
+ bool is_alive() const {
+ return id_.is_alive();
+ }
+ ActorId<ActorType> get() const {
+ return id_;
+ }
+ ActorId<ActorType> release() {
+ return std::move(id_);
+ }
+ void reset(ActorId<ActorType> other = ActorId<ActorType>()) {
+ static_assert(sizeof(ActorType) > 0, "Can't use ActorOwn with incomplete type");
+ hangup();
+ id_ = std::move(other);
+ }
+ const ActorId<ActorType> *operator->() const {
+ return &id_;
+ }
+
+ detail::ActorRef as_actor_ref() const {
+ CHECK(!empty());
+ return detail::ActorRef(*id_.actor_info_ptr(), 0);
+ }
+
+ private:
+ ActorId<ActorType> id_;
+ void hangup() const {
+ if (empty()) {
+ return;
+ }
+ detail::send_message(as_actor_ref(), ActorMessageCreator::hangup());
+ }
+};
+
+template <class ActorType = Actor>
+class ActorShared {
+ public:
+ using ActorT = ActorType;
+ ActorShared() = default;
+ template <class OtherActorType>
+ ActorShared(ActorId<OtherActorType> id, uint64 token) : id_(std::move(id)), token_(token) {
+ CHECK(token_ != 0);
+ }
+ template <class OtherActorType>
+ ActorShared(ActorShared<OtherActorType> &&other) : id_(other.release()), token_(other.token()) {
+ }
+ template <class OtherActorType>
+ ActorShared(ActorOwn<OtherActorType> &&other) : id_(other.release()), token_(other.token()) {
+ }
+ template <class OtherActorType>
+ ActorShared &operator=(ActorShared<OtherActorType> &&other) {
+ reset(other.release(), other.token());
+ }
+ ActorShared(ActorShared &&other) : id_(other.release()), token_(other.token()) {
+ }
+ ActorShared &operator=(ActorShared &&other) {
+ reset(other.release(), other.token());
+ }
+ ActorShared(const ActorShared &) = delete;
+ ActorShared &operator=(const ActorShared &) = delete;
+ ~ActorShared() {
+ reset();
+ }
+
+ uint64 token() const {
+ return token_;
+ }
+ bool empty() const {
+ return id_.empty();
+ }
+ bool is_alive() const {
+ return id_.is_alive();
+ }
+ ActorId<ActorType> get() const {
+ return id_;
+ }
+ ActorId<ActorType> release();
+ void reset(ActorId<ActorType> other = ActorId<ActorType>(), uint64 link_token = EmptyLinkToken) {
+ static_assert(sizeof(ActorType) > 0, "Can't use ActorShared with incomplete type");
+ hangup();
+ id_ = other;
+ token_ = link_token;
+ }
+ const ActorId<ActorType> *operator->() const {
+ return &id_;
+ }
+
+ detail::ActorRef as_actor_ref() const {
+ CHECK(!empty());
+ return detail::ActorRef(*id_.actor_info_ptr(), token_);
+ }
+
+ private:
+ ActorId<ActorType> id_;
+ uint64 token_;
+
+ void hangup() const {
+ }
+};
+
+// common interface
+template <class SelfT>
+ActorId<SelfT> actor_id(SelfT *self) {
+ CHECK(self);
+ CHECK(static_cast<Actor *>(self) == &ActorExecuteContext::get()->actor());
+ return ActorId<SelfT>(ActorExecuteContext::get()->actor().get_actor_info_ptr());
+}
+
+inline ActorId<> actor_id() {
+ return actor_id(&ActorExecuteContext::get()->actor());
+}
+
+template <class T, class... ArgsT>
+ActorOwn<T> create_actor(ActorOptions options, ArgsT &&... args) {
+ return ActorOwn<T>(ActorId<T>::create(options, std::forward<ArgsT>(args)...));
+}
+
+template <class T, class... ArgsT>
+ActorOwn<T> create_actor(Slice name, ArgsT &&... args) {
+ return ActorOwn<T>(ActorId<T>::create(ActorOptions().with_name(name), std::forward<ArgsT>(args)...));
+}
+
+template <class ActorIdT, class FunctionT, class... ArgsT>
+void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
+ using ActorT = typename std::decay_t<ActorIdT>::ActorT;
+ using FunctionClassT = member_function_class_t<FunctionT>;
+ static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
+
+ ActorIdT id = std::forward<ActorIdT>(actor_id);
+ detail::send_closure(id.as_actor_ref(), function, std::forward<ArgsT>(args)...);
+}
+
+template <class ActorIdT, class FunctionT, class... ArgsT>
+void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) {
+ using ActorT = typename std::decay_t<ActorIdT>::ActorT;
+ using FunctionClassT = member_function_class_t<FunctionT>;
+ static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure");
+
+ ActorIdT id = std::forward<ActorIdT>(actor_id);
+ detail::send_closure_later(id.as_actor_ref(), function, std::forward<ArgsT>(args)...);
+}
+
+template <class ActorIdT, class... ArgsT>
+void send_lambda(ActorIdT &&actor_id, ArgsT &&... args) {
+ ActorIdT id = std::forward<ActorIdT>(actor_id);
+ detail::send_lambda(id.as_actor_ref(), std::forward<ArgsT>(args)...);
+}
+
+#endif //!TD_THREAD_UNSUPPORTED
+} // namespace actor2
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h b/libs/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h
new file mode 100644
index 0000000000..5850f1a94c
--- /dev/null
+++ b/libs/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h
@@ -0,0 +1,32 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#pragma once
+
+#include "td/utils/common.h"
+#include "td/utils/logging.h"
+
+namespace td {
+namespace actor2 {
+class SchedulerId {
+ public:
+ SchedulerId() : id_(-1) {
+ }
+ explicit SchedulerId(uint8 id) : id_(id) {
+ }
+ bool is_valid() const {
+ return id_ >= 0;
+ }
+ uint8 value() const {
+ CHECK(is_valid());
+ return static_cast<uint8>(id_);
+ }
+
+ private:
+ int32 id_{0};
+};
+} // namespace actor2
+} // namespace td
diff --git a/libs/tdlib/td/tdactor/test/actors_bugs.cpp b/libs/tdlib/td/tdactor/test/actors_bugs.cpp
new file mode 100644
index 0000000000..f4267f2818
--- /dev/null
+++ b/libs/tdlib/td/tdactor/test/actors_bugs.cpp
@@ -0,0 +1,47 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/utils/tests.h"
+
+#include "td/actor/Timeout.h"
+
+using namespace td;
+
+TEST(MultiTimeout, bug) {
+ ConcurrentScheduler sched;
+ int threads_n = 0;
+ sched.init(threads_n);
+
+ sched.start();
+ std::unique_ptr<MultiTimeout> multi_timeout;
+ struct Data {
+ MultiTimeout *multi_timeout;
+ };
+ Data data;
+
+ {
+ auto guard = sched.get_current_guard();
+ multi_timeout = std::make_unique<MultiTimeout>();
+ data.multi_timeout = multi_timeout.get();
+ multi_timeout->set_callback([](void *void_data, int64 key) {
+ auto &data = *static_cast<Data *>(void_data);
+ if (key == 1) {
+ data.multi_timeout->cancel_timeout(key + 1);
+ data.multi_timeout->set_timeout_in(key + 2, 1);
+ } else {
+ Scheduler::instance()->finish();
+ }
+ });
+ multi_timeout->set_callback_data(&data);
+ multi_timeout->set_timeout_in(1, 1);
+ multi_timeout->set_timeout_in(2, 2);
+ }
+
+ while (sched.run_main(10)) {
+ // empty
+ }
+ sched.finish();
+}
diff --git a/libs/tdlib/td/tdactor/test/actors_impl2.cpp b/libs/tdlib/td/tdactor/test/actors_impl2.cpp
new file mode 100644
index 0000000000..9185fe8858
--- /dev/null
+++ b/libs/tdlib/td/tdactor/test/actors_impl2.cpp
@@ -0,0 +1,535 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/actor/impl2/ActorLocker.h"
+#include "td/actor/impl2/Scheduler.h"
+
+#include "td/utils/format.h"
+#include "td/utils/logging.h"
+#include "td/utils/port/thread.h"
+#include "td/utils/Slice.h"
+#include "td/utils/StringBuilder.h"
+#include "td/utils/tests.h"
+#include "td/utils/Time.h"
+
+#include <array>
+#include <atomic>
+#include <deque>
+#include <memory>
+
+using td::actor2::ActorLocker;
+using td::actor2::ActorSignals;
+using td::actor2::ActorState;
+using td::actor2::SchedulerId;
+
+TEST(Actor2, signals) {
+ ActorSignals signals;
+ signals.add_signal(ActorSignals::Wakeup);
+ signals.add_signal(ActorSignals::Cpu);
+ signals.add_signal(ActorSignals::Kill);
+ signals.clear_signal(ActorSignals::Cpu);
+
+ bool was_kill = false;
+ bool was_wakeup = false;
+ while (!signals.empty()) {
+ auto s = signals.first_signal();
+ if (s == ActorSignals::Kill) {
+ was_kill = true;
+ } else if (s == ActorSignals::Wakeup) {
+ was_wakeup = true;
+ } else {
+ UNREACHABLE();
+ }
+ signals.clear_signal(s);
+ }
+ CHECK(was_kill && was_wakeup);
+}
+
+TEST(Actors2, flags) {
+ ActorState::Flags flags;
+ CHECK(!flags.is_locked());
+ flags.set_locked(true);
+ CHECK(flags.is_locked());
+ flags.set_locked(false);
+ CHECK(!flags.is_locked());
+ flags.set_pause(true);
+
+ flags.set_scheduler_id(SchedulerId{123});
+
+ auto signals = flags.get_signals();
+ CHECK(signals.empty());
+ signals.add_signal(ActorSignals::Cpu);
+ signals.add_signal(ActorSignals::Kill);
+ CHECK(signals.has_signal(ActorSignals::Cpu));
+ CHECK(signals.has_signal(ActorSignals::Kill));
+ flags.set_signals(signals);
+ CHECK(flags.get_signals().raw() == signals.raw()) << flags.get_signals().raw() << " " << signals.raw();
+
+ auto wakeup = ActorSignals{};
+ wakeup.add_signal(ActorSignals::Wakeup);
+
+ flags.add_signals(wakeup);
+ signals.add_signal(ActorSignals::Wakeup);
+ CHECK(flags.get_signals().raw() == signals.raw());
+
+ flags.clear_signals();
+ CHECK(flags.get_signals().empty());
+
+ CHECK(flags.get_scheduler_id().value() == 123);
+ CHECK(flags.is_pause());
+}
+
+TEST(Actor2, locker) {
+ ActorState state;
+
+ ActorSignals kill_signal;
+ kill_signal.add_signal(ActorSignals::Kill);
+
+ ActorSignals wakeup_signal;
+ kill_signal.add_signal(ActorSignals::Wakeup);
+
+ ActorSignals cpu_signal;
+ kill_signal.add_signal(ActorSignals::Cpu);
+
+ {
+ ActorLocker lockerA(&state);
+ ActorLocker lockerB(&state);
+ ActorLocker lockerC(&state);
+
+ CHECK(lockerA.try_lock());
+ CHECK(lockerA.own_lock());
+ auto flagsA = lockerA.flags();
+ CHECK(lockerA.try_unlock(flagsA));
+ CHECK(!lockerA.own_lock());
+
+ CHECK(lockerA.try_lock());
+ CHECK(!lockerB.try_lock());
+ CHECK(!lockerC.try_lock());
+
+ CHECK(lockerB.try_add_signals(kill_signal));
+ CHECK(!lockerC.try_add_signals(wakeup_signal));
+ CHECK(lockerC.try_add_signals(wakeup_signal));
+ CHECK(!lockerC.add_signals(cpu_signal));
+ CHECK(!lockerA.flags().has_signals());
+ CHECK(!lockerA.try_unlock(lockerA.flags()));
+ {
+ auto flags = lockerA.flags();
+ auto signals = flags.get_signals();
+ bool was_kill = false;
+ bool was_wakeup = false;
+ bool was_cpu = false;
+ while (!signals.empty()) {
+ auto s = signals.first_signal();
+ if (s == ActorSignals::Kill) {
+ was_kill = true;
+ } else if (s == ActorSignals::Wakeup) {
+ was_wakeup = true;
+ } else if (s == ActorSignals::Cpu) {
+ was_cpu = true;
+ } else {
+ UNREACHABLE();
+ }
+ signals.clear_signal(s);
+ }
+ CHECK(was_kill && was_wakeup && was_cpu);
+ flags.clear_signals();
+ CHECK(lockerA.try_unlock(flags));
+ }
+ }
+
+ {
+ ActorLocker lockerB(&state);
+ CHECK(lockerB.try_lock());
+ CHECK(lockerB.try_unlock(lockerB.flags()));
+ CHECK(lockerB.add_signals(kill_signal));
+ CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill));
+ auto flags = lockerB.flags();
+ flags.clear_signals();
+ ActorLocker lockerA(&state);
+ CHECK(!lockerA.add_signals(kill_signal));
+ CHECK(!lockerB.try_unlock(flags));
+ CHECK(!lockerA.add_signals(kill_signal)); // do not loose this signal!
+ CHECK(!lockerB.try_unlock(flags));
+ CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill));
+ CHECK(lockerB.try_unlock(flags));
+ }
+
+ {
+ ActorLocker lockerA(&state);
+ CHECK(lockerA.try_lock());
+ auto flags = lockerA.flags();
+ flags.set_pause(true);
+ CHECK(lockerA.try_unlock(flags));
+ //We have to lock, though we can't execute.
+ CHECK(lockerA.add_signals(wakeup_signal));
+ }
+}
+
+#if !TD_THREAD_UNSUPPORTED
+TEST(Actor2, locker_stress) {
+ ActorState state;
+
+ constexpr size_t threads_n = 5;
+ auto stage = [&](std::atomic<int> &value, int need) {
+ value.fetch_add(1, std::memory_order_release);
+ while (value.load(std::memory_order_acquire) < need) {
+ td::this_thread::yield();
+ }
+ };
+
+ struct Node {
+ std::atomic<td::uint32> request{0};
+ td::uint32 response = 0;
+ char pad[64];
+ };
+ std::array<Node, threads_n> nodes;
+ auto do_work = [&]() {
+ for (auto &node : nodes) {
+ auto query = node.request.load(std::memory_order_acquire);
+ if (query) {
+ node.response = query * query;
+ node.request.store(0, std::memory_order_relaxed);
+ }
+ }
+ };
+
+ std::atomic<int> begin{0};
+ std::atomic<int> ready{0};
+ std::atomic<int> check{0};
+ std::atomic<int> finish{0};
+ std::vector<td::thread> threads;
+ for (size_t i = 0; i < threads_n; i++) {
+ threads.push_back(td::thread([&, id = i] {
+ for (size_t i = 1; i < 1000000; i++) {
+ ActorLocker locker(&state);
+ auto need = static_cast<int>(threads_n * i);
+ auto query = static_cast<td::uint32>(id + need);
+ stage(begin, need);
+ nodes[id].request = 0;
+ nodes[id].response = 0;
+ stage(ready, need);
+ if (locker.try_lock()) {
+ nodes[id].response = query * query;
+ } else {
+ auto cpu = ActorSignals::one(ActorSignals::Cpu);
+ nodes[id].request.store(query, std::memory_order_release);
+ locker.add_signals(cpu);
+ }
+ while (locker.own_lock()) {
+ auto flags = locker.flags();
+ auto signals = flags.get_signals();
+ if (!signals.empty()) {
+ do_work();
+ }
+ flags.clear_signals();
+ locker.try_unlock(flags);
+ }
+
+ stage(check, need);
+ if (id == 0) {
+ CHECK(locker.add_signals(ActorSignals{}));
+ CHECK(!locker.flags().has_signals());
+ CHECK(locker.try_unlock(locker.flags()));
+ for (size_t thread_id = 0; thread_id < threads_n; thread_id++) {
+ CHECK(nodes[thread_id].response ==
+ static_cast<td::uint32>(thread_id + need) * static_cast<td::uint32>(thread_id + need))
+ << td::tag("thread", thread_id) << " " << nodes[thread_id].response << " "
+ << nodes[thread_id].request.load();
+ }
+ }
+ }
+ }));
+ }
+ for (auto &thread : threads) {
+ thread.join();
+ }
+}
+
+namespace {
+const size_t BUF_SIZE = 1024 * 1024;
+char buf[BUF_SIZE];
+td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1));
+} // namespace
+
+TEST(Actor2, executor_simple) {
+ using namespace td;
+ using namespace td::actor2;
+ struct Dispatcher : public SchedulerDispatcher {
+ void add_to_queue(ActorInfoPtr ptr, SchedulerId scheduler_id, bool need_poll) override {
+ queue.push_back(std::move(ptr));
+ }
+ void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) override {
+ UNREACHABLE();
+ }
+ SchedulerId get_scheduler_id() const override {
+ return SchedulerId{0};
+ }
+ std::deque<ActorInfoPtr> queue;
+ };
+ Dispatcher dispatcher;
+
+ class TestActor : public Actor {
+ public:
+ void close() {
+ stop();
+ }
+
+ private:
+ void start_up() override {
+ sb << "StartUp";
+ }
+ void tear_down() override {
+ sb << "TearDown";
+ }
+ };
+ ActorInfoCreator actor_info_creator;
+ auto actor = actor_info_creator.create(
+ std::make_unique<TestActor>(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor"));
+ dispatcher.add_to_queue(actor, SchedulerId{0}, false);
+
+ {
+ ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
+ CHECK(executor.can_send());
+ CHECK(executor.can_send_immediate());
+ CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice();
+ sb.clear();
+ executor.send(ActorMessageCreator::lambda([&] { sb << "A"; }));
+ CHECK(sb.as_cslice() == "A") << sb.as_cslice();
+ sb.clear();
+ auto big_message = ActorMessageCreator::lambda([&] { sb << "big"; });
+ big_message.set_big();
+ executor.send(std::move(big_message));
+ CHECK(sb.as_cslice() == "") << sb.as_cslice();
+ executor.send(ActorMessageCreator::lambda([&] { sb << "A"; }));
+ CHECK(sb.as_cslice() == "") << sb.as_cslice();
+ }
+ CHECK(dispatcher.queue.size() == 1);
+ { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue()); }
+ CHECK(dispatcher.queue.size() == 1);
+ dispatcher.queue.clear();
+ CHECK(sb.as_cslice() == "bigA") << sb.as_cslice();
+ sb.clear();
+ {
+ ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
+ executor.send(
+ ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
+ }
+ CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice();
+ sb.clear();
+ CHECK(!actor->has_actor());
+ {
+ ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options());
+ executor.send(
+ ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); }));
+ }
+ CHECK(dispatcher.queue.empty());
+ CHECK(sb.as_cslice() == "");
+}
+
+using namespace td::actor2;
+using td::uint32;
+static std::atomic<int> cnt;
+class Worker : public Actor {
+ public:
+ void query(uint32 x, ActorInfoPtr master);
+ void close() {
+ stop();
+ }
+};
+class Master : public Actor {
+ public:
+ void on_result(uint32 x, uint32 y) {
+ loop();
+ }
+
+ private:
+ uint32 l = 0;
+ uint32 r = 100000;
+ ActorInfoPtr worker;
+ void start_up() override {
+ worker = detail::create_actor<Worker>(ActorOptions().with_name("Master"));
+ loop();
+ }
+ void loop() override {
+ l++;
+ if (l == r) {
+ if (!--cnt) {
+ SchedulerContext::get()->stop();
+ }
+ detail::send_closure(*worker, &Worker::close);
+ stop();
+ return;
+ }
+ detail::send_lambda(*worker,
+ [x = l, self = get_actor_info_ptr()] { detail::current_actor<Worker>().query(x, self); });
+ }
+};
+
+void Worker::query(uint32 x, ActorInfoPtr master) {
+ auto y = x;
+ for (int i = 0; i < 100; i++) {
+ y = y * y;
+ }
+ detail::send_lambda(*master, [result = y, x] { detail::current_actor<Master>().on_result(x, result); });
+}
+
+TEST(Actor2, scheduler_simple) {
+ auto group_info = std::make_shared<SchedulerGroupInfo>(1);
+ Scheduler scheduler{group_info, SchedulerId{0}, 2};
+ scheduler.start();
+ scheduler.run_in_context([] {
+ cnt = 10;
+ for (int i = 0; i < 10; i++) {
+ detail::create_actor<Master>(ActorOptions().with_name("Master"));
+ }
+ });
+ while (scheduler.run(1000)) {
+ }
+ Scheduler::close_scheduler_group(*group_info);
+}
+
+TEST(Actor2, actor_id_simple) {
+ auto group_info = std::make_shared<SchedulerGroupInfo>(1);
+ Scheduler scheduler{group_info, SchedulerId{0}, 2};
+ sb.clear();
+ scheduler.start();
+
+ scheduler.run_in_context([] {
+ class A : public Actor {
+ public:
+ A(int value) : value_(value) {
+ sb << "A" << value_;
+ }
+ void hello() {
+ sb << "hello";
+ }
+ ~A() {
+ sb << "~A";
+ if (--cnt <= 0) {
+ SchedulerContext::get()->stop();
+ }
+ }
+
+ private:
+ int value_;
+ };
+ cnt = 1;
+ auto id = create_actor<A>("A", 123);
+ CHECK(sb.as_cslice() == "A123");
+ sb.clear();
+ send_closure(id, &A::hello);
+ });
+ while (scheduler.run(1000)) {
+ }
+ CHECK(sb.as_cslice() == "hello~A");
+ Scheduler::close_scheduler_group(*group_info);
+ sb.clear();
+}
+
+TEST(Actor2, actor_creation) {
+ auto group_info = std::make_shared<SchedulerGroupInfo>(1);
+ Scheduler scheduler{group_info, SchedulerId{0}, 1};
+ scheduler.start();
+
+ scheduler.run_in_context([]() mutable {
+ class B;
+ class A : public Actor {
+ public:
+ void f() {
+ check();
+ stop();
+ }
+
+ private:
+ void start_up() override {
+ check();
+ create_actor<B>("Simple", actor_id(this)).release();
+ }
+
+ void check() {
+ auto &context = *SchedulerContext::get();
+ CHECK(context.has_poll());
+ context.get_poll();
+ }
+
+ void tear_down() override {
+ if (--cnt <= 0) {
+ SchedulerContext::get()->stop();
+ }
+ }
+ };
+
+ class B : public Actor {
+ public:
+ B(ActorId<A> a) : a_(a) {
+ }
+
+ private:
+ void start_up() override {
+ auto &context = *SchedulerContext::get();
+ CHECK(!context.has_poll());
+ send_closure(a_, &A::f);
+ stop();
+ }
+ void tear_down() override {
+ if (--cnt <= 0) {
+ SchedulerContext::get()->stop();
+ }
+ }
+ ActorId<A> a_;
+ };
+ cnt = 2;
+ create_actor<A>(ActorOptions().with_name("Poll").with_poll()).release();
+ });
+ while (scheduler.run(1000)) {
+ }
+ scheduler.stop();
+ Scheduler::close_scheduler_group(*group_info);
+}
+
+TEST(Actor2, actor_timeout_simple) {
+ auto group_info = std::make_shared<SchedulerGroupInfo>(1);
+ Scheduler scheduler{group_info, SchedulerId{0}, 2};
+ sb.clear();
+ scheduler.start();
+
+ scheduler.run_in_context([] {
+ class A : public Actor {
+ public:
+ void start_up() override {
+ set_timeout();
+ }
+ void alarm() override {
+ double diff = td::Time::now() - expected_timeout_;
+ CHECK(-0.001 < diff && diff < 0.1) << diff;
+ if (cnt_-- > 0) {
+ set_timeout();
+ } else {
+ stop();
+ }
+ }
+
+ void tear_down() override {
+ SchedulerContext::get()->stop();
+ }
+
+ private:
+ double expected_timeout_;
+ int cnt_ = 5;
+ void set_timeout() {
+ auto wakeup_timestamp = td::Timestamp::in(0.1);
+ expected_timeout_ = wakeup_timestamp.at();
+ alarm_timestamp() = wakeup_timestamp;
+ }
+ };
+ create_actor<A>(ActorInfoCreator::Options().with_name("A").with_poll()).release();
+ });
+ while (scheduler.run(1000)) {
+ }
+ Scheduler::close_scheduler_group(*group_info);
+ sb.clear();
+}
+#endif //!TD_THREAD_UNSUPPORTED
diff --git a/libs/tdlib/td/tdactor/test/actors_main.cpp b/libs/tdlib/td/tdactor/test/actors_main.cpp
new file mode 100644
index 0000000000..ffceacc595
--- /dev/null
+++ b/libs/tdlib/td/tdactor/test/actors_main.cpp
@@ -0,0 +1,463 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/utils/tests.h"
+
+#include "td/actor/actor.h"
+#include "td/actor/PromiseFuture.h"
+
+#include "td/utils/logging.h"
+#include "td/utils/Random.h"
+
+#include <limits>
+#include <map>
+#include <utility>
+
+using namespace td;
+
+REGISTER_TESTS(actors_main);
+
+namespace {
+
+template <class ContainerT>
+static typename ContainerT::value_type &rand_elem(ContainerT &cont) {
+ CHECK(0 < cont.size() && cont.size() <= static_cast<size_t>(std::numeric_limits<int>::max()));
+ return cont[Random::fast(0, static_cast<int>(cont.size()) - 1)];
+}
+
+static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) {
+ uint32 res = 1;
+ while (p) {
+ if (p & 1) {
+ res *= x;
+ }
+ x *= x;
+ p >>= 1;
+ }
+ return res;
+}
+
+static uint32 slow_pow_mod_uint32(uint32 x, uint32 p) {
+ uint32 res = 1;
+ for (uint32 i = 0; i < p; i++) {
+ res *= x;
+ }
+ return res;
+}
+
+struct Query {
+ uint32 query_id;
+ uint32 result;
+ std::vector<int> todo;
+ Query() = default;
+ Query(const Query &) = delete;
+ Query &operator=(const Query &) = delete;
+ Query(Query &&) = default;
+ Query &operator=(Query &&) = default;
+ ~Query() {
+ CHECK(todo.empty()) << "Query lost";
+ }
+ int next_pow() {
+ CHECK(!todo.empty());
+ int res = todo.back();
+ todo.pop_back();
+ return res;
+ }
+ bool ready() {
+ return todo.empty();
+ }
+};
+
+static uint32 fast_calc(Query &q) {
+ uint32 result = q.result;
+ for (auto x : q.todo) {
+ result = fast_pow_mod_uint32(result, x);
+ }
+ return result;
+}
+
+class Worker final : public Actor {
+ public:
+ explicit Worker(int threads_n) : threads_n_(threads_n) {
+ }
+ void query(PromiseActor<uint32> &&promise, uint32 x, uint32 p) {
+ uint32 result = slow_pow_mod_uint32(x, p);
+ promise.set_value(std::move(result));
+
+ (void)threads_n_;
+ // if (threads_n_ > 1 && Random::fast(0, 9) == 0) {
+ // migrate(Random::fast(2, threads_n));
+ //}
+ }
+
+ private:
+ int threads_n_;
+};
+
+class QueryActor final : public Actor {
+ public:
+ class Callback {
+ public:
+ Callback() = default;
+ Callback(const Callback &) = delete;
+ Callback &operator=(const Callback &) = delete;
+ Callback(Callback &&) = delete;
+ Callback &operator=(Callback &&) = delete;
+ virtual ~Callback() = default;
+ virtual void on_result(Query &&query) = 0;
+ virtual void on_closed() = 0;
+ };
+
+ explicit QueryActor(int threads_n) : threads_n_(threads_n) {
+ }
+
+ void set_callback(std::unique_ptr<Callback> callback) {
+ callback_ = std::move(callback);
+ }
+ void set_workers(std::vector<ActorId<Worker>> workers) {
+ workers_ = std::move(workers);
+ }
+
+ void query(Query &&query) {
+ uint32 x = query.result;
+ uint32 p = query.next_pow();
+ if (Random::fast(0, 3) && (p <= 1000 || workers_.empty())) {
+ query.result = slow_pow_mod_uint32(x, p);
+ callback_->on_result(std::move(query));
+ } else {
+ auto future = send_promise(rand_elem(workers_), Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, x, p);
+ if (future.is_ready()) {
+ query.result = future.move_as_ok();
+ callback_->on_result(std::move(query));
+ } else {
+ future.set_event(EventCreator::raw(actor_id(), query.query_id));
+ auto query_id = query.query_id;
+ pending_.insert(std::make_pair(query_id, std::make_pair(std::move(future), std::move(query))));
+ }
+ }
+ if (threads_n_ > 1 && Random::fast(0, 9) == 0) {
+ migrate(Random::fast(2, threads_n_));
+ }
+ }
+
+ void raw_event(const Event::Raw &event) override {
+ uint32 id = event.u32;
+ auto it = pending_.find(id);
+ auto future = std::move(it->second.first);
+ auto query = std::move(it->second.second);
+ pending_.erase(it);
+ CHECK(future.is_ready());
+ query.result = future.move_as_ok();
+ callback_->on_result(std::move(query));
+ }
+
+ void close() {
+ callback_->on_closed();
+ stop();
+ }
+
+ void on_start_migrate(int32 sched_id) override {
+ for (auto &it : pending_) {
+ start_migrate(it.second.first, sched_id);
+ }
+ }
+ void on_finish_migrate() override {
+ for (auto &it : pending_) {
+ finish_migrate(it.second.first);
+ }
+ }
+
+ private:
+ unique_ptr<Callback> callback_;
+ std::map<uint32, std::pair<FutureActor<uint32>, Query>> pending_;
+ std::vector<ActorId<Worker>> workers_;
+ int threads_n_;
+};
+
+class MainQueryActor final : public Actor {
+ class QueryActorCallback : public QueryActor::Callback {
+ public:
+ void on_result(Query &&query) override {
+ if (query.ready()) {
+ send_closure(parent_id_, &MainQueryActor::on_result, std::move(query));
+ } else {
+ send_closure(next_solver_, &QueryActor::query, std::move(query));
+ }
+ }
+ void on_closed() override {
+ send_closure(parent_id_, &MainQueryActor::on_closed);
+ }
+ QueryActorCallback(ActorId<MainQueryActor> parent_id, ActorId<QueryActor> next_solver)
+ : parent_id_(parent_id), next_solver_(next_solver) {
+ }
+
+ private:
+ ActorId<MainQueryActor> parent_id_;
+ ActorId<QueryActor> next_solver_;
+ };
+
+ const int ACTORS_CNT = 10;
+ const int WORKERS_CNT = 4;
+
+ public:
+ explicit MainQueryActor(int threads_n) : threads_n_(threads_n) {
+ }
+
+ void start_up() override {
+ actors_.resize(ACTORS_CNT);
+ for (auto &actor : actors_) {
+ auto actor_ptr = make_unique<QueryActor>(threads_n_);
+ actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0)
+ .release();
+ }
+
+ workers_.resize(WORKERS_CNT);
+ for (auto &worker : workers_) {
+ auto actor_ptr = make_unique<Worker>(threads_n_);
+ worker =
+ register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release();
+ }
+
+ for (int i = 0; i < ACTORS_CNT; i++) {
+ ref_cnt_++;
+ send_closure(actors_[i], &QueryActor::set_callback,
+ make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT]));
+ send_closure(actors_[i], &QueryActor::set_workers, workers_);
+ }
+ yield();
+ }
+
+ void on_result(Query &&query) {
+ CHECK(query.ready());
+ CHECK(query.result == expected_[query.query_id]);
+ in_cnt_++;
+ wakeup();
+ }
+
+ Query create_query() {
+ Query q;
+ q.query_id = (query_id_ += 2);
+ q.result = q.query_id;
+ q.todo = {1, 1, 1, 1, 1, 1, 1, 1, 10000};
+ expected_[q.query_id] = fast_calc(q);
+ return q;
+ }
+
+ void on_closed() {
+ ref_cnt_--;
+ if (ref_cnt_ == 0) {
+ Scheduler::instance()->finish();
+ }
+ }
+
+ void wakeup() override {
+ int cnt = 100000;
+ while (out_cnt_ < in_cnt_ + 100 && out_cnt_ < cnt) {
+ if (Random::fast(0, 1)) {
+ send_closure(rand_elem(actors_), &QueryActor::query, create_query());
+ } else {
+ send_closure_later(rand_elem(actors_), &QueryActor::query, create_query());
+ }
+ out_cnt_++;
+ }
+ if (in_cnt_ == cnt) {
+ in_cnt_++;
+ ref_cnt_--;
+ for (auto &actor : actors_) {
+ send_closure(actor, &QueryActor::close);
+ }
+ }
+ }
+
+ private:
+ std::map<uint32, uint32> expected_;
+ std::vector<ActorId<QueryActor>> actors_;
+ std::vector<ActorId<Worker>> workers_;
+ int out_cnt_ = 0;
+ int in_cnt_ = 0;
+ int query_id_ = 1;
+ int ref_cnt_ = 1;
+ int threads_n_;
+};
+
+class SimpleActor final : public Actor {
+ public:
+ explicit SimpleActor(int32 threads_n) : threads_n_(threads_n) {
+ }
+ void start_up() override {
+ auto actor_ptr = make_unique<Worker>(threads_n_);
+ worker_ =
+ register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release();
+ yield();
+ }
+
+ void wakeup() override {
+ if (q_ == 100000) {
+ Scheduler::instance()->finish();
+ stop();
+ return;
+ }
+ q_++;
+ p_ = Random::fast(0, 1) ? 1 : 10000;
+ auto future = send_promise(worker_, Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, q_, p_);
+ if (future.is_ready()) {
+ auto result = future.move_as_ok();
+ CHECK(result == fast_pow_mod_uint32(q_, p_));
+ yield();
+ } else {
+ future.set_event(EventCreator::raw(actor_id(), nullptr));
+ future_ = std::move(future);
+ }
+ // if (threads_n_ > 1 && Random::fast(0, 2) == 0) {
+ // migrate(Random::fast(1, threads_n));
+ //}
+ }
+ void raw_event(const Event::Raw &event) override {
+ auto result = future_.move_as_ok();
+ CHECK(result == fast_pow_mod_uint32(q_, p_));
+ yield();
+ }
+
+ void on_start_migrate(int32 sched_id) override {
+ start_migrate(future_, sched_id);
+ }
+ void on_finish_migrate() override {
+ finish_migrate(future_);
+ }
+
+ private:
+ int32 threads_n_;
+ ActorId<Worker> worker_;
+ FutureActor<uint32> future_;
+ uint32 q_ = 1;
+ uint32 p_;
+};
+} // namespace
+
+class SendToDead : public Actor {
+ public:
+ class Parent : public Actor {
+ public:
+ explicit Parent(ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) {
+ }
+ void start_up() override {
+ set_timeout_in(Random::fast_uint32() % 3 * 0.001);
+ if (ttl_ != 0) {
+ child_ = create_actor_on_scheduler<Parent>(
+ "Child", Random::fast_uint32() % Scheduler::instance()->sched_count(), actor_shared(), ttl_ - 1);
+ }
+ }
+ void timeout_expired() override {
+ stop();
+ }
+
+ private:
+ ActorOwn<Parent> child_;
+ ActorShared<> parent_;
+ int ttl_;
+ };
+
+ void start_up() override {
+ for (int i = 0; i < 2000; i++) {
+ create_actor_on_scheduler<Parent>("Parent", Random::fast_uint32() % Scheduler::instance()->sched_count(),
+ create_reference(), 4)
+ .release();
+ }
+ }
+
+ ActorShared<> create_reference() {
+ ref_cnt_++;
+ return actor_shared();
+ }
+ void hangup_shared() override {
+ ref_cnt_--;
+ if (ref_cnt_ == 0) {
+ ttl_--;
+ if (ttl_ <= 0) {
+ Scheduler::instance()->finish();
+ stop();
+ } else {
+ start_up();
+ }
+ }
+ }
+
+ uint32 ttl_{50};
+ uint32 ref_cnt_{0};
+};
+
+TEST(Actors, send_to_dead) {
+ //TODO: fix CHECK(storage_count_.load() == 0)
+ return;
+ ConcurrentScheduler sched;
+ int threads_n = 5;
+ sched.init(threads_n);
+
+ sched.create_actor_unsafe<SendToDead>(0, "manager").release();
+ sched.start();
+ while (sched.run_main(10)) {
+ // empty
+ }
+ sched.finish();
+}
+
+TEST(Actors, main_simple) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
+
+ ConcurrentScheduler sched;
+ int threads_n = 3;
+ sched.init(threads_n);
+
+ sched.create_actor_unsafe<SimpleActor>(threads_n > 1 ? 1 : 0, "simple", threads_n).release();
+ sched.start();
+ while (sched.run_main(10)) {
+ // empty
+ }
+ sched.finish();
+}
+
+TEST(Actors, main) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
+
+ ConcurrentScheduler sched;
+ int threads_n = 9;
+ sched.init(threads_n);
+
+ sched.create_actor_unsafe<MainQueryActor>(threads_n > 1 ? 1 : 0, "manager", threads_n).release();
+ sched.start();
+ while (sched.run_main(10)) {
+ // empty
+ }
+ sched.finish();
+}
+
+class DoAfterStop : public Actor {
+ public:
+ void loop() override {
+ ptr = std::make_unique<int>(10);
+ stop();
+ CHECK(*ptr == 10);
+ Scheduler::instance()->finish();
+ }
+
+ private:
+ std::unique_ptr<int> ptr;
+};
+
+TEST(Actors, do_after_stop) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
+
+ ConcurrentScheduler sched;
+ int threads_n = 0;
+ sched.init(threads_n);
+
+ sched.create_actor_unsafe<DoAfterStop>(0, "manager").release();
+ sched.start();
+ while (sched.run_main(10)) {
+ // empty
+ }
+ sched.finish();
+}
diff --git a/libs/tdlib/td/tdactor/test/actors_simple.cpp b/libs/tdlib/td/tdactor/test/actors_simple.cpp
new file mode 100644
index 0000000000..c0a6c32b61
--- /dev/null
+++ b/libs/tdlib/td/tdactor/test/actors_simple.cpp
@@ -0,0 +1,622 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/utils/tests.h"
+
+#include "td/actor/actor.h"
+#include "td/actor/MultiPromise.h"
+#include "td/actor/PromiseFuture.h"
+#include "td/actor/SleepActor.h"
+#include "td/actor/Timeout.h"
+
+#include "td/utils/logging.h"
+#include "td/utils/Observer.h"
+#include "td/utils/port/FileFd.h"
+#include "td/utils/Slice.h"
+#include "td/utils/Status.h"
+#include "td/utils/StringBuilder.h"
+
+#include <tuple>
+
+REGISTER_TESTS(actors_simple)
+
+namespace {
+using namespace td;
+
+static const size_t BUF_SIZE = 1024 * 1024;
+static char buf[BUF_SIZE];
+static char buf2[BUF_SIZE];
+static StringBuilder sb(MutableSlice(buf, BUF_SIZE - 1));
+static StringBuilder sb2(MutableSlice(buf2, BUF_SIZE - 1));
+
+TEST(Actors, SendLater) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
+ sb.clear();
+ Scheduler scheduler;
+ scheduler.init();
+
+ auto guard = scheduler.get_guard();
+ class Worker : public Actor {
+ public:
+ void f() {
+ sb << "A";
+ }
+ };
+ auto id = create_actor<Worker>("Worker");
+ scheduler.run_no_guard(0);
+ send_closure(id, &Worker::f);
+ send_closure_later(id, &Worker::f);
+ send_closure(id, &Worker::f);
+ ASSERT_STREQ("A", sb.as_cslice().c_str());
+ scheduler.run_no_guard(0);
+ ASSERT_STREQ("AAA", sb.as_cslice().c_str());
+}
+
+class X {
+ public:
+ X() {
+ sb << "[cnstr_default]";
+ }
+ X(const X &) {
+ sb << "[cnstr_copy]";
+ }
+ X(X &&) {
+ sb << "[cnstr_move]";
+ }
+ X &operator=(const X &) {
+ sb << "[set_copy]";
+ return *this;
+ }
+ X &operator=(X &&) {
+ sb << "[set_move]";
+ return *this;
+ }
+ ~X() = default;
+};
+
+class XReceiver final : public Actor {
+ public:
+ void by_const_ref(const X &) {
+ sb << "[by_const_ref]";
+ }
+ void by_lvalue_ref(const X &) {
+ sb << "[by_lvalue_ref]";
+ }
+ void by_value(X) {
+ sb << "[by_value]";
+ }
+};
+
+TEST(Actors, simple_pass_event_arguments) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
+ Scheduler scheduler;
+ scheduler.init();
+
+ auto guard = scheduler.get_guard();
+ auto id = create_actor<XReceiver>("XR").release();
+ scheduler.run_no_guard(0);
+
+ X x;
+
+ // check tuple
+ // std::tuple<X> tx;
+ // sb.clear();
+ // std::tuple<X> ty(std::move(tx));
+ // tx = std::move(ty);
+ // ASSERT_STREQ("[cnstr_move]", sb.as_cslice().c_str());
+
+ // Send temporary object
+
+ // Tmp-->ConstRef
+ sb.clear();
+ send_closure(id, &XReceiver::by_const_ref, X());
+ ASSERT_STREQ("[cnstr_default][by_const_ref]", sb.as_cslice().c_str());
+
+ // Tmp-->ConstRef (Delayed)
+ sb.clear();
+ send_closure_later(id, &XReceiver::by_const_ref, X());
+ scheduler.run_no_guard(0);
+ // LOG(ERROR) << sb.as_cslice();
+ ASSERT_STREQ("[cnstr_default][cnstr_move][by_const_ref]", sb.as_cslice().c_str());
+
+ // Tmp-->LvalueRef
+ sb.clear();
+ send_closure(id, &XReceiver::by_lvalue_ref, X());
+ ASSERT_STREQ("[cnstr_default][by_lvalue_ref]", sb.as_cslice().c_str());
+
+ // Tmp-->LvalueRef (Delayed)
+ sb.clear();
+ send_closure_later(id, &XReceiver::by_lvalue_ref, X());
+ scheduler.run_no_guard(0);
+ ASSERT_STREQ("[cnstr_default][cnstr_move][by_lvalue_ref]", sb.as_cslice().c_str());
+
+ // Tmp-->Value
+ sb.clear();
+ send_closure(id, &XReceiver::by_value, X());
+ ASSERT_STREQ("[cnstr_default][cnstr_move][by_value]", sb.as_cslice().c_str());
+
+ // Tmp-->Value (Delayed)
+ sb.clear();
+ send_closure_later(id, &XReceiver::by_value, X());
+ scheduler.run_no_guard(0);
+ ASSERT_STREQ("[cnstr_default][cnstr_move][cnstr_move][by_value]", sb.as_cslice().c_str());
+
+ // Var-->ConstRef
+ sb.clear();
+ send_closure(id, &XReceiver::by_const_ref, x);
+ ASSERT_STREQ("[by_const_ref]", sb.as_cslice().c_str());
+
+ // Var-->ConstRef (Delayed)
+ sb.clear();
+ send_closure_later(id, &XReceiver::by_const_ref, x);
+ scheduler.run_no_guard(0);
+ ASSERT_STREQ("[cnstr_copy][by_const_ref]", sb.as_cslice().c_str());
+
+ // Var-->LvalueRef
+ // Var-->LvalueRef (Delayed)
+ // CE or stange behaviour
+
+ // Var-->Value
+ sb.clear();
+ send_closure(id, &XReceiver::by_value, x);
+ ASSERT_STREQ("[cnstr_copy][by_value]", sb.as_cslice().c_str());
+
+ // Var-->Value (Delayed)
+ sb.clear();
+ send_closure_later(id, &XReceiver::by_value, x);
+ scheduler.run_no_guard(0);
+ ASSERT_STREQ("[cnstr_copy][cnstr_move][by_value]", sb.as_cslice().c_str());
+}
+
+class PrintChar final : public Actor {
+ public:
+ PrintChar(char c, int cnt) : char_(c), cnt_(cnt) {
+ }
+ void start_up() override {
+ yield();
+ }
+ void wakeup() override {
+ if (cnt_ == 0) {
+ stop();
+ } else {
+ sb << char_;
+ cnt_--;
+ yield();
+ }
+ }
+
+ private:
+ char char_;
+ int cnt_;
+};
+} // namespace
+
+//
+// Yield must add actor to the end of queue
+//
+TEST(Actors, simple_hand_yield) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
+ Scheduler scheduler;
+ scheduler.init();
+ sb.clear();
+ int cnt = 1000;
+ {
+ auto guard = scheduler.get_guard();
+ create_actor<PrintChar>("PrintA", 'A', cnt).release();
+ create_actor<PrintChar>("PrintB", 'B', cnt).release();
+ create_actor<PrintChar>("PrintC", 'C', cnt).release();
+ }
+ scheduler.run(0);
+ std::string expected;
+ for (int i = 0; i < cnt; i++) {
+ expected += "ABC";
+ }
+ ASSERT_STREQ(expected.c_str(), sb.as_cslice().c_str());
+}
+
+class Ball {
+ public:
+ friend void start_migrate(Ball &ball, int32 sched_id) {
+ sb << "start";
+ }
+ friend void finish_migrate(Ball &ball) {
+ sb2 << "finish";
+ }
+};
+
+class Pong final : public Actor {
+ public:
+ void pong(Ball ball) {
+ Scheduler::instance()->finish();
+ }
+};
+
+class Ping final : public Actor {
+ public:
+ explicit Ping(ActorId<Pong> pong) : pong_(pong) {
+ }
+ void start_up() override {
+ send_closure(pong_, &Pong::pong, Ball());
+ }
+
+ private:
+ ActorId<Pong> pong_;
+};
+
+TEST(Actors, simple_migrate) {
+ sb.clear();
+ sb2.clear();
+
+ ConcurrentScheduler scheduler;
+ scheduler.init(2);
+ auto pong = scheduler.create_actor_unsafe<Pong>(2, "Pong").release();
+ scheduler.create_actor_unsafe<Ping>(1, "Ping", pong).release();
+ scheduler.start();
+ while (scheduler.run_main(10)) {
+ }
+ scheduler.finish();
+#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
+ ASSERT_STREQ("", sb.as_cslice().c_str());
+ ASSERT_STREQ("", sb2.as_cslice().c_str());
+#else
+ ASSERT_STREQ("start", sb.as_cslice().c_str());
+ ASSERT_STREQ("finish", sb2.as_cslice().c_str());
+#endif
+}
+
+class OpenClose final : public Actor {
+ public:
+ explicit OpenClose(int cnt) : cnt_(cnt) {
+ }
+ void start_up() override {
+ yield();
+ }
+ void wakeup() override {
+ ObserverBase *observer = reinterpret_cast<ObserverBase *>(123);
+ if (cnt_ > 0) {
+ auto r_file_fd = FileFd::open("server", FileFd::Read | FileFd::Create);
+ CHECK(r_file_fd.is_ok()) << r_file_fd.error();
+ auto file_fd = r_file_fd.move_as_ok();
+ // LOG(ERROR) << file_fd.get_native_fd();
+ file_fd.get_fd().set_observer(observer);
+ file_fd.close();
+ cnt_--;
+ yield();
+ } else {
+ Scheduler::instance()->finish();
+ }
+ }
+
+ private:
+ int cnt_;
+};
+
+TEST(Actors, open_close) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
+ ConcurrentScheduler scheduler;
+ scheduler.init(2);
+ int cnt = 1000000;
+#if TD_WINDOWS || TD_ANDROID
+ // TODO(perf) optimize
+ cnt = 100;
+#endif
+ scheduler.create_actor_unsafe<OpenClose>(1, "A", cnt).release();
+ scheduler.create_actor_unsafe<OpenClose>(2, "B", cnt).release();
+ scheduler.start();
+ while (scheduler.run_main(10)) {
+ }
+ scheduler.finish();
+}
+
+namespace {
+class MsgActor : public Actor {
+ public:
+ virtual void msg() = 0;
+};
+
+class Slave : public Actor {
+ public:
+ ActorId<MsgActor> msg;
+ explicit Slave(ActorId<MsgActor> msg) : msg(msg) {
+ }
+ void hangup() override {
+ send_closure(msg, &MsgActor::msg);
+ }
+};
+
+class MasterActor : public MsgActor {
+ public:
+ void loop() override {
+ alive_ = true;
+ slave = create_actor<Slave>("slave", static_cast<ActorId<MsgActor>>(actor_id(this)));
+ stop();
+ }
+ ActorOwn<Slave> slave;
+
+ MasterActor() = default;
+ MasterActor(const MasterActor &) = delete;
+ MasterActor &operator=(const MasterActor &) = delete;
+ MasterActor(MasterActor &&) = delete;
+ MasterActor &operator=(MasterActor &&) = delete;
+ ~MasterActor() override {
+ alive_ = 987654321;
+ }
+ void msg() override {
+ CHECK(alive_ == 123456789);
+ }
+ uint64 alive_ = 123456789;
+};
+} // namespace
+
+TEST(Actors, call_after_destruct) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
+ Scheduler scheduler;
+ scheduler.init();
+ {
+ auto guard = scheduler.get_guard();
+ create_actor<MasterActor>("Master").release();
+ }
+ scheduler.run(0);
+}
+
+class LinkTokenSlave : public Actor {
+ public:
+ explicit LinkTokenSlave(ActorShared<> parent) : parent_(std::move(parent)) {
+ }
+ void add(uint64 link_token) {
+ CHECK(link_token == get_link_token());
+ }
+ void close() {
+ stop();
+ }
+
+ private:
+ ActorShared<> parent_;
+};
+
+class LinkTokenMasterActor : public Actor {
+ public:
+ explicit LinkTokenMasterActor(int cnt) : cnt_(cnt) {
+ }
+ void start_up() override {
+ child_ = create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release();
+ yield();
+ }
+ void loop() override {
+ for (int i = 0; i < 100 && cnt_ > 0; cnt_--, i++) {
+ switch (i % 4) {
+ case 0: {
+ send_closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1);
+ break;
+ }
+ case 1: {
+ send_closure_later(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1);
+ break;
+ }
+ case 2: {
+ EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1)
+ .try_emit();
+ break;
+ }
+ case 3: {
+ EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1)
+ .try_emit_later();
+ break;
+ }
+ }
+ }
+ if (cnt_ == 0) {
+ send_closure(child_, &LinkTokenSlave::close);
+ } else {
+ yield();
+ }
+ }
+
+ void hangup_shared() override {
+ CHECK(get_link_token() == 123);
+ Scheduler::instance()->finish();
+ stop();
+ }
+
+ private:
+ int cnt_;
+ ActorId<LinkTokenSlave> child_;
+};
+
+TEST(Actors, link_token) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
+ ConcurrentScheduler scheduler;
+ scheduler.init(0);
+ auto cnt = 100000;
+ scheduler.create_actor_unsafe<LinkTokenMasterActor>(0, "A", cnt).release();
+ scheduler.start();
+ while (scheduler.run_main(10)) {
+ }
+ scheduler.finish();
+}
+
+TEST(Actors, promise) {
+ int value = -1;
+ Promise<int> p1 = PromiseCreator::lambda([&](int x) { value = x; });
+ p1.set_error(Status::Error("Test error"));
+ ASSERT_EQ(0, value);
+ Promise<int32> p2 = PromiseCreator::lambda([&](Result<int32> x) { value = 1; });
+ p2.set_error(Status::Error("Test error"));
+ ASSERT_EQ(1, value);
+}
+
+class LaterSlave : public Actor {
+ public:
+ explicit LaterSlave(ActorShared<> parent) : parent_(std::move(parent)) {
+ }
+
+ private:
+ ActorShared<> parent_;
+
+ void hangup() override {
+ sb << "A";
+ send_closure(actor_id(this), &LaterSlave::finish);
+ }
+ void finish() {
+ sb << "B";
+ stop();
+ }
+};
+
+class LaterMasterActor : public Actor {
+ int cnt_ = 3;
+ std::vector<ActorOwn<LaterSlave>> children_;
+ void start_up() override {
+ for (int i = 0; i < cnt_; i++) {
+ children_.push_back(create_actor<LaterSlave>("B", actor_shared()));
+ }
+ yield();
+ }
+ void loop() override {
+ children_.clear();
+ }
+ void hangup_shared() override {
+ if (!--cnt_) {
+ Scheduler::instance()->finish();
+ stop();
+ }
+ }
+};
+
+TEST(Actors, later) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
+ sb.clear();
+ ConcurrentScheduler scheduler;
+ scheduler.init(0);
+ scheduler.create_actor_unsafe<LaterMasterActor>(0, "A").release();
+ scheduler.start();
+ while (scheduler.run_main(10)) {
+ }
+ scheduler.finish();
+ ASSERT_STREQ(sb.as_cslice().c_str(), "AAABBB");
+}
+
+class MultiPromise2 : public Actor {
+ public:
+ void start_up() override {
+ auto promise = PromiseCreator::lambda([](Result<Unit> result) {
+ result.ensure();
+ Scheduler::instance()->finish();
+ });
+
+ MultiPromiseActorSafe multi_promise;
+ multi_promise.add_promise(std::move(promise));
+ for (int i = 0; i < 10; i++) {
+ create_actor<SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release();
+ }
+ }
+};
+
+class MultiPromise1 : public Actor {
+ public:
+ void start_up() override {
+ auto promise = PromiseCreator::lambda([](Result<Unit> result) {
+ CHECK(result.is_error());
+ create_actor<MultiPromise2>("B").release();
+ });
+ MultiPromiseActorSafe multi_promise;
+ multi_promise.add_promise(std::move(promise));
+ }
+};
+
+TEST(Actors, MultiPromise) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
+ sb.clear();
+ ConcurrentScheduler scheduler;
+ scheduler.init(0);
+ scheduler.create_actor_unsafe<MultiPromise1>(0, "A").release();
+ scheduler.start();
+ while (scheduler.run_main(10)) {
+ }
+ scheduler.finish();
+}
+
+class FastPromise : public Actor {
+ public:
+ void start_up() override {
+ PromiseFuture<int> pf;
+ auto promise = pf.move_promise();
+ auto future = pf.move_future();
+ promise.set_value(123);
+ CHECK(future.move_as_ok() == 123);
+ Scheduler::instance()->finish();
+ }
+};
+
+TEST(Actors, FastPromise) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
+ sb.clear();
+ ConcurrentScheduler scheduler;
+ scheduler.init(0);
+ scheduler.create_actor_unsafe<FastPromise>(0, "A").release();
+ scheduler.start();
+ while (scheduler.run_main(10)) {
+ }
+ scheduler.finish();
+}
+
+class StopInTeardown : public Actor {
+ void loop() override {
+ stop();
+ }
+ void tear_down() override {
+ stop();
+ Scheduler::instance()->finish();
+ }
+};
+
+TEST(Actors, stop_in_teardown) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
+ sb.clear();
+ ConcurrentScheduler scheduler;
+ scheduler.init(0);
+ scheduler.create_actor_unsafe<StopInTeardown>(0, "A").release();
+ scheduler.start();
+ while (scheduler.run_main(10)) {
+ }
+ scheduler.finish();
+}
+
+class AlwaysWaitForMailbox : public Actor {
+ public:
+ void start_up() override {
+ always_wait_for_mailbox();
+ create_actor<SleepActor>("Sleep", 0.1, PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](Unit) {
+ send_closure(actor_id, &AlwaysWaitForMailbox::g);
+ send_closure(actor_id, &AlwaysWaitForMailbox::g);
+ CHECK(!ptr->was_f_);
+ }))
+ .release();
+ }
+
+ void f() {
+ was_f_ = true;
+ Scheduler::instance()->finish();
+ }
+ void g() {
+ send_closure(actor_id(this), &AlwaysWaitForMailbox::f);
+ }
+
+ private:
+ Timeout timeout_;
+ bool was_f_{false};
+};
+
+TEST(Actors, always_wait_for_mailbox) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
+ ConcurrentScheduler scheduler;
+ scheduler.init(0);
+ scheduler.create_actor_unsafe<AlwaysWaitForMailbox>(0, "A").release();
+ scheduler.start();
+ while (scheduler.run_main(10)) {
+ }
+ scheduler.finish();
+}
diff --git a/libs/tdlib/td/tdactor/test/actors_workers.cpp b/libs/tdlib/td/tdactor/test/actors_workers.cpp
new file mode 100644
index 0000000000..b97a258a44
--- /dev/null
+++ b/libs/tdlib/td/tdactor/test/actors_workers.cpp
@@ -0,0 +1,156 @@
+//
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+#include "td/utils/tests.h"
+
+#include "td/actor/actor.h"
+
+#include "td/utils/logging.h"
+
+REGISTER_TESTS(actors_workers);
+
+namespace {
+
+using namespace td;
+
+class PowerWorker final : public Actor {
+ public:
+ class Callback {
+ public:
+ Callback() = default;
+ Callback(const Callback &) = delete;
+ Callback &operator=(const Callback &) = delete;
+ Callback(Callback &&) = delete;
+ Callback &operator=(Callback &&) = delete;
+ virtual ~Callback() = default;
+ virtual void on_ready(int query, int res) = 0;
+ virtual void on_closed() = 0;
+ };
+ void set_callback(unique_ptr<Callback> callback) {
+ callback_ = std::move(callback);
+ }
+ void task(uint32 x, uint32 p) {
+ uint32 res = 1;
+ for (uint32 i = 0; i < p; i++) {
+ res *= x;
+ }
+ callback_->on_ready(x, res);
+ }
+ void close() {
+ callback_->on_closed();
+ stop();
+ }
+
+ private:
+ std::unique_ptr<Callback> callback_;
+};
+
+class Manager final : public Actor {
+ public:
+ Manager(int queries_n, int query_size, std::vector<ActorId<PowerWorker>> workers)
+ : workers_(std::move(workers)), left_query_(queries_n), query_size_(query_size) {
+ }
+
+ class Callback : public PowerWorker::Callback {
+ public:
+ Callback(ActorId<Manager> actor_id, int worker_id) : actor_id_(actor_id), worker_id_(worker_id) {
+ }
+ void on_ready(int query, int result) override {
+ send_closure(actor_id_, &Manager::on_ready, worker_id_, query, result);
+ }
+ void on_closed() override {
+ send_closure_later(actor_id_, &Manager::on_closed, worker_id_);
+ }
+
+ private:
+ ActorId<Manager> actor_id_;
+ int worker_id_;
+ };
+
+ void start_up() override {
+ ref_cnt_ = static_cast<int>(workers_.size());
+ int i = 0;
+ for (auto &worker : workers_) {
+ ref_cnt_++;
+ send_closure_later(worker, &PowerWorker::set_callback, make_unique<Callback>(actor_id(this), i));
+ i++;
+ send_closure_later(worker, &PowerWorker::task, 3, query_size_);
+ left_query_--;
+ }
+ }
+
+ void on_ready(int worker_id, int query, int res) {
+ ref_cnt_--;
+ if (left_query_ == 0) {
+ send_closure(workers_[worker_id], &PowerWorker::close);
+ } else {
+ ref_cnt_++;
+ send_closure(workers_[worker_id], &PowerWorker::task, 3, query_size_);
+ left_query_--;
+ }
+ }
+
+ void on_closed(int worker_id) {
+ ref_cnt_--;
+ if (ref_cnt_ == 0) {
+ Scheduler::instance()->finish();
+ stop();
+ }
+ }
+
+ private:
+ std::vector<ActorId<PowerWorker>> workers_;
+ int left_query_;
+ int ref_cnt_;
+ int query_size_;
+};
+
+void test_workers(int threads_n, int workers_n, int queries_n, int query_size) {
+ SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
+
+ ConcurrentScheduler sched;
+ sched.init(threads_n);
+
+ std::vector<ActorId<PowerWorker>> workers;
+ for (int i = 0; i < workers_n; i++) {
+ int thread_id = threads_n ? i % (threads_n - 1) + 2 : 0;
+ workers.push_back(sched.create_actor_unsafe<PowerWorker>(thread_id, "worker" + to_string(i)).release());
+ }
+ sched.create_actor_unsafe<Manager>(threads_n ? 1 : 0, "manager", queries_n, query_size, std::move(workers)).release();
+
+ sched.start();
+ while (sched.run_main(10)) {
+ // empty
+ }
+ sched.finish();
+
+ // sched.test_one_thread_run();
+}
+} // namespace
+
+TEST(Actors, workers_big_query_one_thread) {
+ test_workers(0, 10, 1000, 300000);
+}
+
+TEST(Actors, workers_big_query_two_threads) {
+ test_workers(2, 10, 1000, 300000);
+}
+
+TEST(Actors, workers_big_query_nine_threads) {
+ test_workers(9, 10, 1000, 300000);
+}
+
+TEST(Actors, workers_small_query_one_thread) {
+ test_workers(0, 10, 1000000, 1);
+}
+
+TEST(Actors, workers_small_query_two_threads) {
+ test_workers(2, 10, 1000000, 1);
+}
+
+TEST(Actors, workers_small_query_nine_threads) {
+ test_workers(9, 10, 1000000, 1);
+}