diff options
Diffstat (limited to 'libs/tdlib/td/tdactor')
37 files changed, 7844 insertions, 0 deletions
diff --git a/libs/tdlib/td/tdactor/CMakeLists.txt b/libs/tdlib/td/tdactor/CMakeLists.txt new file mode 100644 index 0000000000..c0c83025e5 --- /dev/null +++ b/libs/tdlib/td/tdactor/CMakeLists.txt @@ -0,0 +1,65 @@ +cmake_minimum_required(VERSION 3.0.2 FATAL_ERROR) + +#SOURCE SETS +set(TDACTOR_SOURCE + td/actor/impl/ConcurrentScheduler.cpp + td/actor/impl/Scheduler.cpp + td/actor/MultiPromise.cpp + td/actor/Timeout.cpp + + td/actor/impl2/Scheduler.cpp + + td/actor/impl/Actor-decl.h + td/actor/impl/Actor.h + td/actor/impl/ActorId-decl.h + td/actor/impl/ActorId.h + td/actor/impl/ActorInfo-decl.h + td/actor/impl/ActorInfo.h + td/actor/impl/EventFull-decl.h + td/actor/impl/EventFull.h + td/actor/impl/ConcurrentScheduler.h + td/actor/impl/Event.h + td/actor/impl/Scheduler-decl.h + td/actor/impl/Scheduler.h + td/actor/Condition.h + td/actor/MultiPromise.h + td/actor/PromiseFuture.h + td/actor/SchedulerLocalStorage.h + td/actor/SignalSlot.h + td/actor/SleepActor.h + td/actor/Timeout.h + td/actor/actor.h + + td/actor/impl2/ActorLocker.h + td/actor/impl2/ActorSignals.h + td/actor/impl2/ActorState.h + td/actor/impl2/Scheduler.h + td/actor/impl2/SchedulerId.h +) + +set(TDACTOR_TEST_SOURCE + ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_impl2.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_main.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_simple.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_workers.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/test/actors_bugs.cpp + PARENT_SCOPE +) + +#RULES + +#LIBRARIES + +add_library(tdactor STATIC ${TDACTOR_SOURCE}) +target_include_directories(tdactor PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>) +target_link_libraries(tdactor PUBLIC tdutils) + +add_executable(example example/example.cpp) +target_link_libraries(example PRIVATE tdactor) + +install(TARGETS tdactor EXPORT TdTargets + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib + RUNTIME DESTINATION bin + INCLUDES DESTINATION include +) diff --git a/libs/tdlib/td/tdactor/example/example.cpp b/libs/tdlib/td/tdactor/example/example.cpp new file mode 100644 index 0000000000..4c2415c5e2 --- /dev/null +++ b/libs/tdlib/td/tdactor/example/example.cpp @@ -0,0 +1,49 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/actor/actor.h" + +#include "td/utils/logging.h" + +class Worker : public td::Actor { + public: + void ping(int x) { + LOG(ERROR) << "got ping " << x; + } +}; + +class MainActor : public td::Actor { + public: + void start_up() override { + LOG(ERROR) << "start up"; + set_timeout_in(10); + worker_ = td::create_actor_on_scheduler<Worker>("Worker", 1); + send_closure(worker_, &Worker::ping, 123); + } + + void timeout_expired() override { + LOG(ERROR) << "timeout expired"; + td::Scheduler::instance()->finish(); + } + + private: + td::ActorOwn<Worker> worker_; +}; + +int main(void) { + td::ConcurrentScheduler scheduler; + scheduler.init(4 /*threads_count*/); + scheduler.start(); + { + auto guard = scheduler.get_current_guard(); + td::create_actor_on_scheduler<MainActor>("Main actor", 0).release(); + } + while (!scheduler.is_finished()) { + scheduler.run_main(10); + } + scheduler.finish(); + return 0; +} diff --git a/libs/tdlib/td/tdactor/td/actor/Condition.h b/libs/tdlib/td/tdactor/td/actor/Condition.h new file mode 100644 index 0000000000..c3799df487 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/Condition.h @@ -0,0 +1,47 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/actor.h" + +#include "td/utils/logging.h" + +namespace td { +class Condition { + class Helper : public Actor { + public: + void wait(Promise<> promise) { + pending_promises_.push_back(std::move(promise)); + } + + private: + std::vector<Promise<>> pending_promises_; + void tear_down() override { + for (auto &promise : pending_promises_) { + promise.set_value(Unit()); + } + } + }; + + public: + Condition() { + own_actor_ = create_actor<Helper>("helper"); + actor_ = own_actor_.get(); + } + void wait(Promise<> promise) { + send_closure(actor_, &Helper::wait, std::move(promise)); + } + void set_true() { + CHECK(!own_actor_.empty()); + own_actor_.reset(); + } + + private: + ActorId<Helper> actor_; + ActorOwn<Helper> own_actor_; +}; +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/MultiPromise.cpp b/libs/tdlib/td/tdactor/td/actor/MultiPromise.cpp new file mode 100644 index 0000000000..0d98f5cfb4 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/MultiPromise.cpp @@ -0,0 +1,90 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/actor/MultiPromise.h" + +namespace td { +void MultiPromiseActor::add_promise(Promise<Unit> &&promise) { + promises_.emplace_back(std::move(promise)); +} + +Promise<Unit> MultiPromiseActor::get_promise() { + if (empty()) { + register_actor("MultiPromise", this).release(); + } + CHECK(!promises_.empty()); + + PromiseActor<Unit> promise; + FutureActor<Unit> future; + init_promise_future(&promise, &future); + + future.set_event(EventCreator::raw(actor_id(), nullptr)); + futures_.emplace_back(std::move(future)); + return PromiseCreator::from_promise_actor(std::move(promise)); +} + +void MultiPromiseActor::raw_event(const Event::Raw &event) { + received_results_++; + if (received_results_ == futures_.size()) { + if (!ignore_errors_) { + for (auto &future : futures_) { + auto result = future.move_as_result(); + if (result.is_error()) { + return set_result(result.move_as_error()); + } + } + } + return set_result(Unit()); + } +} + +void MultiPromiseActor::set_ignore_errors(bool ignore_errors) { + ignore_errors_ = ignore_errors; +} + +void MultiPromiseActor::set_result(Result<Unit> &&result) { + // MultiPromiseActor should be cleared before he begins to send out result + auto promises_copy = std::move(promises_); + promises_.clear(); + auto futures_copy = std::move(futures_); + futures_.clear(); + received_results_ = 0; + stop(); + + if (!promises_copy.empty()) { + for (size_t i = 0; i + 1 < promises_copy.size(); i++) { + promises_copy[i].set_result(result.clone()); + } + promises_copy.back().set_result(std::move(result)); + } +} + +size_t MultiPromiseActor::promise_count() const { + return promises_.size(); +} + +void MultiPromiseActorSafe::add_promise(Promise<Unit> &&promise) { + multi_promise_->add_promise(std::move(promise)); +} + +Promise<Unit> MultiPromiseActorSafe::get_promise() { + return multi_promise_->get_promise(); +} + +void MultiPromiseActorSafe::set_ignore_errors(bool ignore_errors) { + multi_promise_->set_ignore_errors(ignore_errors); +} + +size_t MultiPromiseActorSafe::promise_count() const { + return multi_promise_->promise_count(); +} + +MultiPromiseActorSafe::~MultiPromiseActorSafe() { + if (!multi_promise_->empty()) { + register_existing_actor(std::move(multi_promise_)).release(); + } +} +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/MultiPromise.h b/libs/tdlib/td/tdactor/td/actor/MultiPromise.h new file mode 100644 index 0000000000..aa28947464 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/MultiPromise.h @@ -0,0 +1,116 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/actor.h" +#include "td/actor/PromiseFuture.h" + +#include "td/utils/common.h" +#include "td/utils/logging.h" +#include "td/utils/Status.h" + +namespace td { + +class MultiPromiseInterface { + public: + virtual void add_promise(Promise<> &&promise) = 0; + virtual Promise<> get_promise() = 0; + + // deprecated? + virtual size_t promise_count() const = 0; + virtual void set_ignore_errors(bool ignore_errors) = 0; + + MultiPromiseInterface() = default; + MultiPromiseInterface(const MultiPromiseInterface &) = delete; + MultiPromiseInterface &operator=(const MultiPromiseInterface &) = delete; + MultiPromiseInterface(MultiPromiseInterface &&) = default; + MultiPromiseInterface &operator=(MultiPromiseInterface &&) = default; + virtual ~MultiPromiseInterface() = default; +}; + +class MultiPromise : public MultiPromiseInterface { + public: + void add_promise(Promise<> &&promise) override { + impl_->add_promise(std::move(promise)); + } + Promise<> get_promise() override { + return impl_->get_promise(); + } + + // deprecated? + size_t promise_count() const override { + return impl_->promise_count(); + } + void set_ignore_errors(bool ignore_errors) override { + impl_->set_ignore_errors(ignore_errors); + } + + MultiPromise() = default; + explicit MultiPromise(std::unique_ptr<MultiPromiseInterface> impl) : impl_(std::move(impl)) { + } + + private: + std::unique_ptr<MultiPromiseInterface> impl_; +}; + +class MultiPromiseActor final + : public Actor + , public MultiPromiseInterface { + public: + MultiPromiseActor() = default; + + void add_promise(Promise<Unit> &&promise) override; + + Promise<Unit> get_promise() override; + + void set_ignore_errors(bool ignore_errors) override; + + size_t promise_count() const override; + + private: + void set_result(Result<Unit> &&result); + + vector<Promise<Unit>> promises_; // promises waiting for result + vector<FutureActor<Unit>> futures_; // futures waiting for result of the queries + size_t received_results_ = 0; + bool ignore_errors_ = false; + + void raw_event(const Event::Raw &event) override; + + void on_start_migrate(int32) override { + UNREACHABLE(); + } + void on_finish_migrate() override { + UNREACHABLE(); + } +}; + +class MultiPromiseActorSafe : public MultiPromiseInterface { + public: + void add_promise(Promise<Unit> &&promise) override; + Promise<Unit> get_promise() override; + void set_ignore_errors(bool ignore_errors) override; + size_t promise_count() const override; + MultiPromiseActorSafe() = default; + MultiPromiseActorSafe(const MultiPromiseActorSafe &other) = delete; + MultiPromiseActorSafe &operator=(const MultiPromiseActorSafe &other) = delete; + MultiPromiseActorSafe(MultiPromiseActorSafe &&other) = delete; + MultiPromiseActorSafe &operator=(MultiPromiseActorSafe &&other) = delete; + ~MultiPromiseActorSafe() override; + + private: + std::unique_ptr<MultiPromiseActor> multi_promise_ = std::make_unique<MultiPromiseActor>(); +}; + +class MultiPromiseCreator { + public: + static MultiPromise create() { + return MultiPromise(std::make_unique<MultiPromiseActor>()); + } +}; + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/PromiseFuture.h b/libs/tdlib/td/tdactor/td/actor/PromiseFuture.h new file mode 100644 index 0000000000..63156c3838 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/PromiseFuture.h @@ -0,0 +1,570 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/actor.h" + +#include "td/utils/Closure.h" +#include "td/utils/common.h" +#include "td/utils/invoke.h" // for tuple_for_each +#include "td/utils/logging.h" +#include "td/utils/ScopeGuard.h" +#include "td/utils/Status.h" + +#include <tuple> +#include <type_traits> +#include <utility> + +namespace td { +template <class T = Unit> +class PromiseInterface { + public: + PromiseInterface() = default; + PromiseInterface(const PromiseInterface &) = delete; + PromiseInterface &operator=(const PromiseInterface &) = delete; + PromiseInterface(PromiseInterface &&) = default; + PromiseInterface &operator=(PromiseInterface &&) = default; + virtual ~PromiseInterface() = default; + virtual void set_value(T &&value) { + set_result(std::move(value)); + } + virtual void set_error(Status &&error) { + set_result(std::move(error)); + } + virtual void set_result(Result<T> &&result) { + if (result.is_ok()) { + set_value(result.move_as_ok()); + } else { + set_error(result.move_as_error()); + } + } + virtual void start_migrate(int32 sched_id) { + } + virtual void finish_migrate() { + } +}; + +template <class T = Unit> +class FutureInterface { + public: + FutureInterface() = default; + FutureInterface(const FutureInterface &) = delete; + FutureInterface &operator=(const FutureInterface &) = delete; + FutureInterface(FutureInterface &&) = default; + FutureInterface &operator=(FutureInterface &&) = default; + virtual ~FutureInterface() = default; + virtual bool is_ready() = 0; + virtual bool is_ok() = 0; + virtual bool is_error() = 0; + virtual const T &ok() = 0; + virtual T move_as_ok() = 0; + virtual const Status &error() = 0; + virtual Status move_as_error() TD_WARN_UNUSED_RESULT = 0; + virtual const Result<T> &result() = 0; + virtual Result<T> move_as_result() TD_WARN_UNUSED_RESULT = 0; +}; + +template <class T> +class SafePromise; + +template <class T = Unit> +class Promise { + public: + void set_value(T &&value) { + if (!promise_) { + return; + } + promise_->set_value(std::move(value)); + promise_.reset(); + } + void set_error(Status &&error) { + if (!promise_) { + return; + } + promise_->set_error(std::move(error)); + promise_.reset(); + } + void set_result(Result<T> &&result) { + if (!promise_) { + return; + } + promise_->set_result(std::move(result)); + promise_.reset(); + } + void reset() { + promise_.reset(); + } + void start_migrate(int32 sched_id) { + if (!promise_) { + return; + } + promise_->start_migrate(sched_id); + } + void finish_migrate() { + if (!promise_) { + return; + } + promise_->finish_migrate(); + } + std::unique_ptr<PromiseInterface<T>> release() { + return std::move(promise_); + } + + Promise() = default; + explicit Promise(std::unique_ptr<PromiseInterface<T>> promise) : promise_(std::move(promise)) { + } + Promise(SafePromise<T> &&other); + Promise &operator=(SafePromise<T> &&other); + + explicit operator bool() { + return static_cast<bool>(promise_); + } + + private: + std::unique_ptr<PromiseInterface<T>> promise_; +}; + +template <class T> +void start_migrate(Promise<T> &promise, int32 sched_id) { + // promise.start_migrate(sched_id); +} +template <class T> +void finish_migrate(Promise<T> &promise) { + // promise.finish_migrate(); +} + +template <class T = Unit> +class SafePromise { + public: + SafePromise(Promise<T> promise, Result<T> result) : promise_(std::move(promise)), result_(std::move(result)) { + } + SafePromise(const SafePromise &other) = delete; + SafePromise &operator=(const SafePromise &other) = delete; + SafePromise(SafePromise &&other) = default; + SafePromise &operator=(SafePromise &&other) = default; + ~SafePromise() { + if (promise_) { + promise_.set_result(std::move(result_)); + } + } + Promise<T> release() { + return std::move(promise_); + } + + private: + Promise<T> promise_; + Result<T> result_; +}; + +template <class T> +Promise<T>::Promise(SafePromise<T> &&other) : Promise(other.release()) { +} +template <class T> +Promise<T> &Promise<T>::operator=(SafePromise<T> &&other) { + *this = other.release(); + return *this; +} + +namespace detail { + +class EventPromise : public PromiseInterface<Unit> { + public: + void set_value(Unit &&) override { + ok_.try_emit(); + fail_.clear(); + } + void set_error(Status &&) override { + do_set_error(); + } + + EventPromise(const EventPromise &other) = delete; + EventPromise &operator=(const EventPromise &other) = delete; + EventPromise(EventPromise &&other) = delete; + EventPromise &operator=(EventPromise &&other) = delete; + ~EventPromise() override { + do_set_error(); + } + + EventPromise() = default; + explicit EventPromise(EventFull ok) : ok_(std::move(ok)), use_ok_as_fail_(true) { + } + EventPromise(EventFull ok, EventFull fail) : ok_(std::move(ok)), fail_(std::move(fail)), use_ok_as_fail_(false) { + } + + private: + EventFull ok_; + EventFull fail_; + bool use_ok_as_fail_ = false; + void do_set_error() { + if (use_ok_as_fail_) { + ok_.try_emit(); + } else { + ok_.clear(); + fail_.try_emit(); + } + } +}; + +template <typename T> +struct GetArg : public GetArg<decltype(&T::operator())> {}; + +template <class C, class R, class Arg> +class GetArg<R (C::*)(Arg)> { + public: + using type = Arg; +}; +template <class C, class R, class Arg> +class GetArg<R (C::*)(Arg) const> { + public: + using type = Arg; +}; + +template <class T> +using get_arg_t = std::decay_t<typename GetArg<T>::type>; + +template <class T> +struct DropResult { + using type = T; +}; + +template <class T> +struct DropResult<Result<T>> { + using type = T; +}; + +template <class T> +using drop_result_t = typename DropResult<T>::type; + +template <class ValueT, class FunctionOkT, class FunctionFailT> +class LambdaPromise : public PromiseInterface<ValueT> { + enum OnFail { None, Ok, Fail }; + + public: + void set_value(ValueT &&value) override { + ok_(std::move(value)); + on_fail_ = None; + } + void set_error(Status &&error) override { + do_error(std::move(error)); + } + LambdaPromise(const LambdaPromise &other) = delete; + LambdaPromise &operator=(const LambdaPromise &other) = delete; + LambdaPromise(LambdaPromise &&other) = delete; + LambdaPromise &operator=(LambdaPromise &&other) = delete; + ~LambdaPromise() override { + do_error(Status::Error("Lost promise")); + } + + template <class FromOkT, class FromFailT> + LambdaPromise(FromOkT &&ok, FromFailT &&fail, bool use_ok_as_fail) + : ok_(std::forward<FromOkT>(ok)), fail_(std::forward<FromFailT>(fail)), on_fail_(use_ok_as_fail ? Ok : Fail) { + } + + private: + FunctionOkT ok_; + FunctionFailT fail_; + OnFail on_fail_ = None; + + template <class FuncT, class ArgT = detail::get_arg_t<FuncT>> + std::enable_if_t<std::is_assignable<ArgT, Status>::value> do_error_impl(FuncT &func, Status &&status) { + func(std::move(status)); + } + + template <class FuncT, class ArgT = detail::get_arg_t<FuncT>> + std::enable_if_t<!std::is_assignable<ArgT, Status>::value> do_error_impl(FuncT &func, Status &&status) { + func(Auto()); + } + + void do_error(Status &&error) { + switch (on_fail_) { + case None: + break; + case Ok: + do_error_impl(ok_, std::move(error)); + break; + case Fail: + fail_(std::move(error)); + break; + } + on_fail_ = None; + } +}; + +template <class... ArgsT> +class JoinPromise : public PromiseInterface<Unit> { + public: + explicit JoinPromise(ArgsT &&... arg) : promises_(std::forward<ArgsT>(arg)...) { + } + void set_value(Unit &&) override { + tuple_for_each(promises_, [](auto &promise) { promise.set_value(Unit()); }); + } + void set_error(Status &&error) override { + tuple_for_each(promises_, [&error](auto &promise) { promise.set_error(error.clone()); }); + } + + private: + std::tuple<std::decay_t<ArgsT>...> promises_; +}; +} // namespace detail + +/*** FutureActor and PromiseActor ***/ +template <class T> +class FutureActor; + +template <class T> +class PromiseActor; + +template <class T> +class ActorTraits<FutureActor<T>> { + public: + static constexpr bool is_lite = true; +}; + +template <class T> +class PromiseActor final : public PromiseInterface<T> { + friend class FutureActor<T>; + enum State { Waiting, Hangup }; + + public: + PromiseActor() = default; + PromiseActor(const PromiseActor &other) = delete; + PromiseActor &operator=(const PromiseActor &other) = delete; + PromiseActor(PromiseActor &&) = default; + PromiseActor &operator=(PromiseActor &&) = default; + ~PromiseActor() override { + close(); + } + + void set_value(T &&value) override; + void set_error(Status &&error) override; + + void close() { + future_id_.reset(); + } + + // NB: if true is returned no further events will be sent + bool is_hangup() { + if (state_ == State::Hangup) { + return true; + } + if (!future_id_.is_alive()) { + state_ = State::Hangup; + future_id_.release(); + event_.clear(); + return true; + } + return false; + } + + template <class S> + friend void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future); + + bool empty_promise() { + return future_id_.empty(); + } + bool empty() { + return future_id_.empty(); + } + + private: + ActorOwn<FutureActor<T>> future_id_; + EventFull event_; + State state_; + + void init() { + state_ = State::Waiting; + event_.clear(); + } +}; + +template <class T> +class FutureActor final : public Actor { + friend class PromiseActor<T>; + enum State { Waiting, Ready }; + + public: + FutureActor() = default; + + FutureActor(const FutureActor &other) = delete; + FutureActor &operator=(const FutureActor &other) = delete; + + FutureActor(FutureActor &&other) = default; + FutureActor &operator=(FutureActor &&other) = default; + + ~FutureActor() override = default; + + bool is_ok() const { + return is_ready() && result_.is_ok(); + } + bool is_error() const { + CHECK(is_ready()); + return is_ready() && result_.is_error(); + } + T move_as_ok() { + return move_as_result().move_as_ok(); + } + Status move_as_error() TD_WARN_UNUSED_RESULT { + return move_as_result().move_as_error(); + } + Result<T> move_as_result() TD_WARN_UNUSED_RESULT { + CHECK(is_ready()); + SCOPE_EXIT { + do_stop(); + }; + return std::move(result_); + } + bool is_ready() const { + return !empty() && state_ == State::Ready; + } + + void close() { + event_.clear(); + result_.clear(); + do_stop(); + } + + void set_event(EventFull &&event) { + CHECK(!empty()); + event_ = std::move(event); + if (state_ != State::Waiting) { + event_.try_emit_later(); + } + } + + template <class S> + friend void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future); + + private: + EventFull event_; + Result<T> result_; + State state_; + + void set_value(T &&value) { + set_result(std::move(value)); + } + + void set_error(Status &&error) { + set_result(std::move(error)); + } + + void set_result(Result<T> &&result) { + CHECK(state_ == State::Waiting); + result_ = std::move(result); + state_ = State::Ready; + + event_.try_emit_later(); + } + + void hangup() override { + set_error(Status::Hangup()); + } + + void start_up() override { + // empty + } + + void init() { + CHECK(empty()); + state_ = State::Waiting; + event_.clear(); + } +}; + +template <class T> +void PromiseActor<T>::set_value(T &&value) { + if (state_ == State::Waiting && !future_id_.empty()) { + send_closure(std::move(future_id_), &FutureActor<T>::set_value, std::move(value)); + } +} +template <class T> +void PromiseActor<T>::set_error(Status &&error) { + if (state_ == State::Waiting && !future_id_.empty()) { + send_closure(std::move(future_id_), &FutureActor<T>::set_error, std::move(error)); + } +} + +template <class S> +void init_promise_future(PromiseActor<S> *promise, FutureActor<S> *future) { + promise->init(); + future->init(); + promise->future_id_ = register_actor("FutureActor", future); + + CHECK(future->get_info() != nullptr); +} + +template <class T> +class PromiseFuture { + public: + PromiseFuture() { + init_promise_future(&promise_, &future_); + } + PromiseActor<T> &promise() { + return promise_; + } + FutureActor<T> &future() { + return future_; + } + PromiseActor<T> &&move_promise() { + return std::move(promise_); + } + FutureActor<T> &&move_future() { + return std::move(future_); + } + + private: + PromiseActor<T> promise_; + FutureActor<T> future_; +}; + +template <class T, class ActorAT, class ActorBT, class ResultT, class... DestArgsT, class... ArgsT> +FutureActor<T> send_promise(ActorId<ActorAT> actor_id, Send::Flags flags, + ResultT (ActorBT::*func)(PromiseActor<T> &&, DestArgsT...), ArgsT &&... args) { + PromiseFuture<T> pf; + ::td::Scheduler::instance()->send_closure( + std::move(actor_id), create_immediate_closure(func, pf.move_promise(), std::forward<ArgsT>(args)...), flags); + return pf.move_future(); +} + +class PromiseCreator { + public: + struct Ignore { + void operator()(Status &&error) { + error.ignore(); + } + }; + + template <class OkT, class ArgT = detail::drop_result_t<detail::get_arg_t<OkT>>> + static Promise<ArgT> lambda(OkT &&ok) { + return Promise<ArgT>(std::make_unique<detail::LambdaPromise<ArgT, std::decay_t<OkT>, Ignore>>(std::forward<OkT>(ok), + Ignore(), true)); + } + + template <class OkT, class FailT, class ArgT = detail::get_arg_t<OkT>> + static Promise<ArgT> lambda(OkT &&ok, FailT &&fail) { + return Promise<ArgT>(std::make_unique<detail::LambdaPromise<ArgT, std::decay_t<OkT>, std::decay_t<FailT>>>( + std::forward<OkT>(ok), std::forward<FailT>(fail), false)); + } + + static Promise<> event(EventFull &&ok) { + return Promise<>(std::make_unique<detail::EventPromise>(std::move(ok))); + } + + static Promise<> event(EventFull ok, EventFull fail) { + return Promise<>(std::make_unique<detail::EventPromise>(std::move(ok), std::move(fail))); + } + + template <class... ArgsT> + static Promise<> join(ArgsT &&... args) { + return Promise<>(std::make_unique<detail::JoinPromise<ArgsT...>>(std::forward<ArgsT>(args)...)); + } + + template <class T> + static Promise<T> from_promise_actor(PromiseActor<T> &&from) { + return Promise<T>(std::make_unique<PromiseActor<T>>(std::move(from))); + } +}; +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h b/libs/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h new file mode 100644 index 0000000000..f505836a16 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/SchedulerLocalStorage.h @@ -0,0 +1,70 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/actor.h" + +#include "td/utils/logging.h" +#include "td/utils/optional.h" + +#include <functional> + +namespace td { +template <class T> +class SchedulerLocalStorage { + public: + SchedulerLocalStorage() : data_(Scheduler::instance()->sched_count()) { + } + T &get() { + return data_[Scheduler::instance()->sched_id()]; + } + template <class F> + void for_each(F &&f) { + for (auto &value : data_) { + f(value); + } + } + template <class F> + void for_each(F &&f) const { + for (const auto &value : data_) { + f(value); + } + } + + private: + std::vector<T> data_; +}; + +template <class T> +class LazySchedulerLocalStorage { + public: + LazySchedulerLocalStorage() = default; + explicit LazySchedulerLocalStorage(std::function<T()> create_func) : create_func_(std::move(create_func)) { + } + + T &get() { + auto &optional_value_ = sls_optional_value_.get(); + if (!optional_value_) { + CHECK(create_func_); + optional_value_ = create_func_(); + } + return *optional_value_; + } + void clear_values() { + sls_optional_value_.for_each([&](auto &optional_value) { + if (optional_value) { + optional_value = optional<T>(); + } + }); + } + + private: + std::function<T()> create_func_; + SchedulerLocalStorage<optional<T>> sls_optional_value_; +}; + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/SignalSlot.h b/libs/tdlib/td/tdactor/td/actor/SignalSlot.h new file mode 100644 index 0000000000..73b48f58ed --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/SignalSlot.h @@ -0,0 +1,108 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/actor.h" + +namespace td { +class Slot; +class Signal { + public: + void emit(); + + explicit Signal(ActorId<Slot> slot_id) : slot_id_(slot_id) { + } + + private: + ActorId<Slot> slot_id_; +}; +class Slot final : public Actor { + public: + Slot() = default; + Slot(const Slot &other) = delete; + Slot &operator=(const Slot &other) = delete; + Slot(Slot &&) = default; + Slot &operator=(Slot &&) = default; + ~Slot() override { + close(); + } + void set_event(EventFull &&event) { + was_signal_ = false; + event_ = std::move(event); + } + + bool has_event() { + return !event_.empty(); + } + + bool was_signal() { + return was_signal_; + } + + void clear_event() { + event_.clear(); + } + + void close() { + if (!empty()) { + do_stop(); + } + } + + void set_timeout_in(double timeout_in) { + register_if_empty(); + Actor::set_timeout_in(timeout_in); + } + void set_timeout_at(double timeout_at) { + register_if_empty(); + Actor::set_timeout_at(timeout_at); + } + + friend class Signal; + Signal get_signal() { + register_if_empty(); + return Signal(actor_id(this)); + } + ActorShared<> get_signal_new() { + register_if_empty(); + return actor_shared(); + } + + private: + bool was_signal_ = false; + EventFull event_; + + void timeout_expired() override { + signal(); + } + + void start_up() override { + empty(); + } + + void register_if_empty() { + if (empty()) { + register_actor("Slot", this).release(); + } + } + + // send event only once + void signal() { + if (!was_signal_) { + was_signal_ = true; + event_.try_emit_later(); + } + } + void hangup_shared() override { + signal(); + } +}; +inline void Signal::emit() { + send_closure(slot_id_, &Slot::signal); +} + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/SleepActor.h b/libs/tdlib/td/tdactor/td/actor/SleepActor.h new file mode 100644 index 0000000000..9b9981ec38 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/SleepActor.h @@ -0,0 +1,33 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/actor.h" + +#include "td/actor/PromiseFuture.h" + +namespace td { + +class SleepActor : public Actor { + public: + SleepActor(double timeout, Promise<> promise) : timeout_(timeout), promise_(std::move(promise)) { + } + + private: + double timeout_; + Promise<> promise_; + + void start_up() override { + set_timeout_in(timeout_); + } + void timeout_expired() override { + promise_.set_value(Unit()); + stop(); + } +}; + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/Timeout.cpp b/libs/tdlib/td/tdactor/td/actor/Timeout.cpp new file mode 100644 index 0000000000..fa2e5ffff3 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/Timeout.cpp @@ -0,0 +1,96 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/actor/Timeout.h" + +#include "td/utils/Time.h" + +namespace td { + +bool MultiTimeout::has_timeout(int64 key) const { + return items_.find(Item(key)) != items_.end(); +} + +void MultiTimeout::set_timeout_at(int64 key, double timeout) { + LOG(DEBUG) << "Set timeout for " << key << " in " << timeout - Time::now(); + auto item = items_.emplace(key); + auto heap_node = static_cast<HeapNode *>(const_cast<Item *>(&*item.first)); + if (heap_node->in_heap()) { + CHECK(!item.second); + bool need_update_timeout = heap_node->is_top(); + timeout_queue_.fix(timeout, heap_node); + if (need_update_timeout || heap_node->is_top()) { + update_timeout(); + } + } else { + CHECK(item.second); + timeout_queue_.insert(timeout, heap_node); + if (heap_node->is_top()) { + update_timeout(); + } + } +} + +void MultiTimeout::add_timeout_at(int64 key, double timeout) { + LOG(DEBUG) << "Add timeout for " << key << " in " << timeout - Time::now(); + auto item = items_.emplace(key); + auto heap_node = static_cast<HeapNode *>(const_cast<Item *>(&*item.first)); + if (heap_node->in_heap()) { + CHECK(!item.second); + } else { + CHECK(item.second); + timeout_queue_.insert(timeout, heap_node); + if (heap_node->is_top()) { + update_timeout(); + } + } +} + +void MultiTimeout::cancel_timeout(int64 key) { + LOG(DEBUG) << "Cancel timeout for " << key; + auto item = items_.find(Item(key)); + if (item != items_.end()) { + auto heap_node = static_cast<HeapNode *>(const_cast<Item *>(&*item)); + CHECK(heap_node->in_heap()); + bool need_update_timeout = heap_node->is_top(); + timeout_queue_.erase(heap_node); + items_.erase(item); + + if (need_update_timeout) { + update_timeout(); + } + } +} + +void MultiTimeout::update_timeout() { + if (items_.empty()) { + LOG(DEBUG) << "Cancel timeout"; + CHECK(timeout_queue_.empty()); + CHECK(Actor::has_timeout()); + Actor::cancel_timeout(); + } else { + LOG(DEBUG) << "Set timeout in " << timeout_queue_.top_key() - Time::now_cached(); + Actor::set_timeout_at(timeout_queue_.top_key()); + } +} + +void MultiTimeout::timeout_expired() { + double now = Time::now_cached(); + while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) { + int64 key = static_cast<Item *>(timeout_queue_.pop())->key; + items_.erase(Item(key)); + expired_.push_back(key); + } + if (!items_.empty()) { + update_timeout(); + } + for (auto key : expired_) { + callback_(data_, key); + } + expired_.clear(); +} + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/Timeout.h b/libs/tdlib/td/tdactor/td/actor/Timeout.h new file mode 100644 index 0000000000..a3a9ba1913 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/Timeout.h @@ -0,0 +1,127 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/actor.h" + +#include "td/utils/Heap.h" +#include "td/utils/logging.h" +#include "td/utils/Time.h" + +#include <set> + +namespace td { +class Timeout final : public Actor { + public: + using Data = void *; + using Callback = void (*)(Data); + Timeout() { + register_actor("Timeout", this).release(); + } + + void set_callback(Callback callback) { + callback_ = callback; + } + void set_callback_data(Data &&data) { + data_ = data; + } + + bool has_timeout() const { + return Actor::has_timeout(); + } + void set_timeout_in(double timeout) { + Actor::set_timeout_in(timeout); + } + void cancel_timeout() { + if (has_timeout()) { + Actor::cancel_timeout(); + callback_ = Callback(); + data_ = Data(); + } + } + + private: + friend class Scheduler; + + Callback callback_; + Data data_; + + void set_timeout_at(double timeout) { + Actor::set_timeout_at(timeout); + } + + void timeout_expired() override { + CHECK(!has_timeout()); + CHECK(callback_ != Callback()); + Callback callback = callback_; + Data data = data_; + callback_ = Callback(); + data_ = Data(); + + callback(data); + } +}; + +// TODO optimize +class MultiTimeout final : public Actor { + struct Item : public HeapNode { + int64 key; + + explicit Item(int64 key) : key(key) { + } + + bool operator<(const Item &other) const { + return key < other.key; + } + }; + + public: + using Data = void *; + using Callback = void (*)(Data, int64); + MultiTimeout() { + register_actor("MultiTimeout", this).release(); + } + + void set_callback(Callback callback) { + callback_ = callback; + } + void set_callback_data(Data data) { + data_ = data; + } + + bool has_timeout(int64 key) const; + + void set_timeout_in(int64 key, double timeout) { + set_timeout_at(key, Time::now() + timeout); + } + + void add_timeout_in(int64 key, double timeout) { + add_timeout_at(key, Time::now() + timeout); + } + + void set_timeout_at(int64 key, double timeout); + + void add_timeout_at(int64 key, double timeout); // memcache semantics, doesn't replace old timeout + + void cancel_timeout(int64 key); + + private: + friend class Scheduler; + + Callback callback_; + Data data_; + + KHeap<double> timeout_queue_; + std::set<Item> items_; + std::vector<int64> expired_; + + void update_timeout(); + + void timeout_expired() override; +}; + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/actor.h b/libs/tdlib/td/tdactor/td/actor/actor.h new file mode 100644 index 0000000000..dadfadc055 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/actor.h @@ -0,0 +1,14 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl/Actor.h" +#include "td/actor/impl/ActorId.h" +#include "td/actor/impl/ActorInfo.h" +#include "td/actor/impl/ConcurrentScheduler.h" +#include "td/actor/impl/EventFull.h" +#include "td/actor/impl/Scheduler.h" diff --git a/libs/tdlib/td/tdactor/td/actor/impl/Actor-decl.h b/libs/tdlib/td/tdactor/td/actor/impl/Actor-decl.h new file mode 100644 index 0000000000..4342214800 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/Actor-decl.h @@ -0,0 +1,120 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl/ActorId-decl.h" +#include "td/actor/impl/ActorInfo-decl.h" +#include "td/actor/impl/Event.h" + +#include "td/utils/ObjectPool.h" +#include "td/utils/Observer.h" +#include "td/utils/Slice.h" + +#include <memory> + +namespace td { + +class Actor : public ObserverBase { + public: + using Deleter = ActorInfo::Deleter; + Actor() = default; + Actor(const Actor &) = delete; + Actor &operator=(const Actor &) = delete; + Actor(Actor &&other); + Actor &operator=(Actor &&other); + ~Actor() override { + if (!empty()) { + do_stop(); + } + } + + virtual void start_up() { + yield(); + } + virtual void tear_down() { + } + virtual void wakeup() { + loop(); + } + virtual void hangup() { + stop(); + } + virtual void hangup_shared() { + // ignore + } + virtual void timeout_expired() { + loop(); + } + virtual void raw_event(const Event::Raw &event) { + } + virtual void loop() { + } + + // TODO: not called in events. Can't use stop, or migrate inside of them + virtual void on_start_migrate(int32 sched_id) { + } + virtual void on_finish_migrate() { + } + + void notify() override; + + // proxy to scheduler + void yield(); + void stop(); + void do_stop(); + bool has_timeout() const; + void set_timeout_in(double timeout_in); + void set_timeout_at(double timeout_at); + void cancel_timeout(); + void migrate(int32 sched_id); + void do_migrate(int32 sched_id); + + uint64 get_link_token(); + void set_context(std::shared_ptr<ActorContext> context); + void set_tag(CSlice tag); + + void always_wait_for_mailbox(); + + // for ActorInfo mostly + void init(ObjectPool<ActorInfo>::OwnerPtr &&info); + ActorInfo *get_info(); + const ActorInfo *get_info() const; + ObjectPool<ActorInfo>::OwnerPtr clear(); + + bool empty() const; + + template <class FuncT, class... ArgsT> + auto self_closure(FuncT &&func, ArgsT &&... args); + + template <class SelfT, class FuncT, class... ArgsT> + auto self_closure(SelfT *self, FuncT &&func, ArgsT &&... args); + + template <class LambdaT> + auto self_lambda(LambdaT &&lambda); + + // proxy to info_ + ActorId<> actor_id(); + template <class SelfT> + ActorId<SelfT> actor_id(SelfT *self); + + ActorShared<> actor_shared(); + template <class SelfT> + ActorShared<SelfT> actor_shared(SelfT *self, uint64 id = static_cast<uint64>(-1)); + + Slice get_name() const; + + private: + ObjectPool<ActorInfo>::OwnerPtr info_; +}; + +template <class ActorT> +class ActorTraits { + public: + static constexpr bool is_lite = false; +}; + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl/Actor.h b/libs/tdlib/td/tdactor/td/actor/impl/Actor.h new file mode 100644 index 0000000000..3fe5e20abf --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/Actor.h @@ -0,0 +1,153 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl/Actor-decl.h" +#include "td/actor/impl/EventFull-decl.h" +#include "td/actor/impl/Scheduler-decl.h" + +#include "td/utils/logging.h" +#include "td/utils/ObjectPool.h" +#include "td/utils/Slice.h" + +#include <memory> +#include <type_traits> +#include <utility> + +namespace td { +inline Actor::Actor(Actor &&other) { + CHECK(info_.empty()); + info_ = std::move(other.info_); + if (!empty()) { + info_->on_actor_moved(this); + } +} +inline Actor &Actor::operator=(Actor &&other) { + CHECK(info_.empty()); + info_ = std::move(other.info_); + if (!empty()) { + info_->on_actor_moved(this); + } + return *this; +} + +inline void Actor::notify() { + yield(); +} + +// proxy to scheduler +inline void Actor::yield() { + Scheduler::instance()->yield_actor(this); +} +inline void Actor::stop() { + Scheduler::instance()->stop_actor(this); +} +inline void Actor::do_stop() { + Scheduler::instance()->do_stop_actor(this); + CHECK(empty()); +} +inline bool Actor::has_timeout() const { + return Scheduler::instance()->has_actor_timeout(this); +} +inline void Actor::set_timeout_in(double timeout_in) { + Scheduler::instance()->set_actor_timeout_in(this, timeout_in); +} +inline void Actor::set_timeout_at(double timeout_at) { + Scheduler::instance()->set_actor_timeout_at(this, timeout_at); +} +inline void Actor::cancel_timeout() { + Scheduler::instance()->cancel_actor_timeout(this); +} +inline void Actor::migrate(int32 sched_id) { + Scheduler::instance()->migrate_actor(this, sched_id); +} +inline void Actor::do_migrate(int32 sched_id) { + Scheduler::instance()->do_migrate_actor(this, sched_id); +} + +template <class ActorType> +std::enable_if_t<std::is_base_of<Actor, ActorType>::value> start_migrate(ActorType &obj, int32 sched_id) { + if (!obj.empty()) { + Scheduler::instance()->start_migrate_actor(&obj, sched_id); + } +} +template <class ActorType> +std::enable_if_t<std::is_base_of<Actor, ActorType>::value> finish_migrate(ActorType &obj) { + if (!obj.empty()) { + Scheduler::instance()->finish_migrate_actor(&obj); + } +} +inline uint64 Actor::get_link_token() { + return Scheduler::instance()->get_link_token(this); +} +inline void Actor::set_context(std::shared_ptr<ActorContext> context) { + info_->set_context(std::move(context)); +} +inline void Actor::set_tag(CSlice tag) { + info_->get_context()->tag_ = tag.c_str(); + Scheduler::on_context_updated(); +} + +inline void Actor::init(ObjectPool<ActorInfo>::OwnerPtr &&info) { + info_ = std::move(info); +} +inline ActorInfo *Actor::get_info() { + return &*info_; +} +inline const ActorInfo *Actor::get_info() const { + return &*info_; +} +inline ObjectPool<ActorInfo>::OwnerPtr Actor::clear() { + return std::move(info_); +} + +inline bool Actor::empty() const { + return info_.empty(); +} + +inline ActorId<> Actor::actor_id() { + return actor_id(this); +} +template <class SelfT> +ActorId<SelfT> Actor::actor_id(SelfT *self) { + CHECK(static_cast<Actor *>(self) == this); + return ActorId<SelfT>(info_.get_weak()); +} + +inline ActorShared<> Actor::actor_shared() { + return actor_shared(this); +} +template <class SelfT> +ActorShared<SelfT> Actor::actor_shared(SelfT *self, uint64 id) { + CHECK(static_cast<Actor *>(self) == this); + return ActorShared<SelfT>(actor_id(self), id); +} + +template <class FuncT, class... ArgsT> +auto Actor::self_closure(FuncT &&func, ArgsT &&... args) { + return self_closure(this, std::forward<FuncT>(func), std::forward<ArgsT>(args)...); +} + +template <class SelfT, class FuncT, class... ArgsT> +auto Actor::self_closure(SelfT *self, FuncT &&func, ArgsT &&... args) { + return EventCreator::closure(actor_id(self), std::forward<FuncT>(func), std::forward<ArgsT>(args)...); +} + +template <class LambdaT> +auto Actor::self_lambda(LambdaT &&lambda) { + return EventCreator::lambda(actor_id(), std::forward<LambdaT>(lambda)); +} + +inline Slice Actor::get_name() const { + return info_->get_name(); +} + +inline void Actor::always_wait_for_mailbox() { + info_->always_wait_for_mailbox(); +} + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h b/libs/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h new file mode 100644 index 0000000000..5e82ed6a05 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/ActorId-decl.h @@ -0,0 +1,169 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/ObjectPool.h" +#include "td/utils/Slice.h" + +#include <type_traits> + +namespace td { +class ActorInfo; +class Actor; +template <class ActorType = Actor> +class ActorId { + public: + using ActorT = ActorType; + explicit ActorId(ObjectPool<ActorInfo>::WeakPtr ptr) : ptr_(ptr) { + } + ActorId() = default; + ActorId(const ActorId &) = default; + ActorId &operator=(const ActorId &) = default; + ActorId(ActorId &&other) : ptr_(other.ptr_) { + other.ptr_.clear(); + } + ActorId &operator=(ActorId &&other) { + if (&other == this) { + return *this; + } + ptr_ = other.ptr_; + other.clear(); + return *this; + } + ~ActorId() = default; + + bool empty() const { + return ptr_.empty(); + } + void clear() { + ptr_.clear(); + } + + bool is_alive() const { + return ptr_.is_alive_unsafe(); + } + + ActorInfo *get_actor_info() const; + ActorType *get_actor_unsafe() const; + + // returns pointer to actor if it is on current thread. nullptr otherwise + ActorType *try_get_actor() const; + + Slice get_name() const; + + template <class ToActorType, class = std::enable_if_t<std::is_base_of<ToActorType, ActorType>::value>> + explicit operator ActorId<ToActorType>() const { + return ActorId<ToActorType>(ptr_); + } + + template <class AsActorType> + ActorId<AsActorType> as() const { + return ActorId<AsActorType>(ptr_); + } + + private: + ObjectPool<ActorInfo>::WeakPtr ptr_; +}; + +// threat ActorId as pointer and ActorOwn as +// unique_ptr<ActorId> +template <class ActorType = Actor> +class ActorOwn { + public: + using ActorT = ActorType; + ActorOwn() = default; + explicit ActorOwn(ActorId<ActorType>); + template <class OtherActorType> + explicit ActorOwn(ActorId<OtherActorType> id); + template <class OtherActorType> + explicit ActorOwn(ActorOwn<OtherActorType> &&); + template <class OtherActorType> + ActorOwn &operator=(ActorOwn<OtherActorType> &&); + ActorOwn(ActorOwn &&); + ActorOwn &operator=(ActorOwn &&); + ActorOwn(const ActorOwn &) = delete; + ActorOwn &operator=(const ActorOwn &) = delete; + ~ActorOwn(); + + bool empty() const; + bool is_alive() const { + return id_.is_alive(); + } + ActorId<ActorType> get() const; + ActorId<ActorType> release(); + void reset(ActorId<ActorType> other = ActorId<ActorType>()); + void hangup() const; + const ActorId<ActorType> *operator->() const; + + using ActorIdConstRef = const ActorId<ActorType> &; + // operator ActorIdConstRef(); + + private: + ActorId<ActorType> id_; +}; + +template <class ActorType = Actor> +class ActorShared { + public: + using ActorT = ActorType; + ActorShared() = default; + template <class OtherActorType> + ActorShared(ActorId<OtherActorType>, uint64 token); + template <class OtherActorType> + ActorShared(ActorShared<OtherActorType> &&); + template <class OtherActorType> + ActorShared(ActorOwn<OtherActorType> &&); + template <class OtherActorType> + ActorShared &operator=(ActorShared<OtherActorType> &&); + ActorShared(ActorShared &&); + ActorShared &operator=(ActorShared &&); + ActorShared(const ActorShared &) = delete; + ActorShared &operator=(const ActorShared &) = delete; + ~ActorShared(); + + uint64 token() const; + bool empty() const; + bool is_alive() const { + return id_.is_alive(); + } + ActorId<ActorType> get() const; + ActorId<ActorType> release(); + void reset(ActorId<ActorType> other = ActorId<ActorType>()); + template <class OtherActorType> + void reset(ActorId<OtherActorType> other); + const ActorId<ActorType> *operator->() const; + + private: + ActorId<ActorType> id_; + uint64 token_; +}; + +class ActorRef { + public: + ActorRef() = default; + template <class T> + ActorRef(const ActorId<T> &actor_id); + template <class T> + ActorRef(const ActorShared<T> &actor_id); + template <class T> + ActorRef(ActorShared<T> &&actor_id); + template <class T> + ActorRef(const ActorOwn<T> &actor_id); + template <class T> + ActorRef(ActorOwn<T> &&actor_id); + ActorId<> get() const { + return actor_id_; + } + uint64 token() const { + return token_; + } + + private: + ActorId<> actor_id_; + uint64 token_ = 0; +}; +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl/ActorId.h b/libs/tdlib/td/tdactor/td/actor/impl/ActorId.h new file mode 100644 index 0000000000..34d7970633 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/ActorId.h @@ -0,0 +1,200 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl/ActorId-decl.h" +#include "td/actor/impl/ActorInfo-decl.h" +#include "td/actor/impl/Scheduler-decl.h" + +#include "td/utils/Slice.h" + +namespace td { +/*** ActorId ***/ + +// If actor is on our scheduler(thread) result will be valid +// If actor is on another scheduler we will see it in migrate_dest_flags +template <class ActorType> +ActorInfo *ActorId<ActorType>::get_actor_info() const { + if (ptr_.is_alive()) { + return &*ptr_; + } + return nullptr; +} + +template <class ActorType> +ActorType *ActorId<ActorType>::get_actor_unsafe() const { + return static_cast<ActorType *>(ptr_->get_actor_unsafe()); +} + +template <class ActorType> +ActorType *ActorId<ActorType>::try_get_actor() const { + auto info = get_actor_info(); + if (info && !info->is_migrating() && Scheduler::instance()->sched_id() == info->migrate_dest()) { + return static_cast<ActorType *>(info->get_actor_unsafe()); + } + return nullptr; +} + +template <class ActorType> +Slice ActorId<ActorType>::get_name() const { + return ptr_->get_name(); +} + +// ActorOwn +template <class ActorType> +ActorOwn<ActorType>::ActorOwn(ActorId<ActorType> id) : id_(std::move(id)) { +} +template <class ActorType> +template <class OtherActorType> +ActorOwn<ActorType>::ActorOwn(ActorId<OtherActorType> id) : id_(std::move(id)) { +} +template <class ActorType> +template <class OtherActorType> +ActorOwn<ActorType>::ActorOwn(ActorOwn<OtherActorType> &&other) : id_(other.release()) { +} +template <class ActorType> +template <class OtherActorType> +ActorOwn<ActorType> &ActorOwn<ActorType>::operator=(ActorOwn<OtherActorType> &&other) { + reset(static_cast<ActorId<ActorType>>(other.release())); + return *this; +} + +template <class ActorType> +ActorOwn<ActorType>::ActorOwn(ActorOwn &&other) : id_(other.release()) { +} +template <class ActorType> +ActorOwn<ActorType> &ActorOwn<ActorType>::operator=(ActorOwn &&other) { + reset(other.release()); + return *this; +} + +template <class ActorType> +ActorOwn<ActorType>::~ActorOwn() { + reset(); +} + +template <class ActorType> +bool ActorOwn<ActorType>::empty() const { + return id_.empty(); +} +template <class ActorType> +ActorId<ActorType> ActorOwn<ActorType>::get() const { + return id_; +} + +template <class ActorType> +ActorId<ActorType> ActorOwn<ActorType>::release() { + return std::move(id_); +} +template <class ActorType> +void ActorOwn<ActorType>::reset(ActorId<ActorType> other) { + static_assert(sizeof(ActorType) > 0, "Can't use ActorOwn with incomplete type"); + hangup(); + id_ = std::move(other); +} + +template <class ActorType> +void ActorOwn<ActorType>::hangup() const { + if (!id_.empty()) { + send_event(id_, Event::hangup()); + } +} +template <class ActorType> +const ActorId<ActorType> *ActorOwn<ActorType>::operator->() const { + return &id_; +} + +// ActorShared +template <class ActorType> +template <class OtherActorType> +ActorShared<ActorType>::ActorShared(ActorId<OtherActorType> id, uint64 token) : id_(std::move(id)), token_(token) { +} +template <class ActorType> +template <class OtherActorType> +ActorShared<ActorType>::ActorShared(ActorShared<OtherActorType> &&other) : id_(other.release()), token_(other.token()) { +} +template <class ActorType> +template <class OtherActorType> +ActorShared<ActorType>::ActorShared(ActorOwn<OtherActorType> &&other) : id_(other.release()), token_(0) { +} +template <class ActorType> +template <class OtherActorType> +ActorShared<ActorType> &ActorShared<ActorType>::operator=(ActorShared<OtherActorType> &&other) { + reset(other.release()); + token_ = other.token(); + return *this; +} + +template <class ActorType> +ActorShared<ActorType>::ActorShared(ActorShared &&other) : id_(other.release()), token_(other.token_) { +} +template <class ActorType> +ActorShared<ActorType> &ActorShared<ActorType>::operator=(ActorShared &&other) { + reset(other.release()); + token_ = other.token_; + return *this; +} + +template <class ActorType> +ActorShared<ActorType>::~ActorShared() { + reset(); +} + +template <class ActorType> +uint64 ActorShared<ActorType>::token() const { + return token_; +} +template <class ActorType> +bool ActorShared<ActorType>::empty() const { + return id_.empty(); +} +template <class ActorType> +ActorId<ActorType> ActorShared<ActorType>::get() const { + return id_; +} + +template <class ActorType> +ActorId<ActorType> ActorShared<ActorType>::release() { + return std::move(id_); +} +template <class ActorType> +void ActorShared<ActorType>::reset(ActorId<ActorType> other) { + reset<ActorType>(std::move(other)); +} + +template <class ActorType> +template <class OtherActorType> +void ActorShared<ActorType>::reset(ActorId<OtherActorType> other) { + static_assert(sizeof(ActorType) > 0, "Can't use ActorShared with incomplete type"); + if (!id_.empty()) { + send_event(*this, Event::hangup()); + } + id_ = static_cast<ActorId<ActorType>>(other); +} +template <class ActorType> +const ActorId<ActorType> *ActorShared<ActorType>::operator->() const { + return &id_; +} + +/*** ActorRef ***/ +template <class T> +ActorRef::ActorRef(const ActorId<T> &actor_id) : actor_id_(actor_id) { +} +template <class T> +ActorRef::ActorRef(const ActorShared<T> &actor_id) : actor_id_(actor_id.get()), token_(actor_id.token()) { +} +template <class T> +ActorRef::ActorRef(ActorShared<T> &&actor_id) : actor_id_(actor_id.release()), token_(actor_id.token()) { +} +template <class T> +ActorRef::ActorRef(const ActorOwn<T> &actor_id) : actor_id_(actor_id.get()) { +} +template <class T> +ActorRef::ActorRef(ActorOwn<T> &&actor_id) : actor_id_(actor_id.release()) { +} + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h b/libs/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h new file mode 100644 index 0000000000..de9fba794e --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/ActorInfo-decl.h @@ -0,0 +1,119 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl/ActorId-decl.h" +#include "td/actor/impl/Event.h" + +#include "td/utils/common.h" +#include "td/utils/Heap.h" +#include "td/utils/List.h" +#include "td/utils/ObjectPool.h" +#include "td/utils/Slice.h" +#include "td/utils/StringBuilder.h" + +#include <atomic> +#include <memory> +#include <utility> + +namespace td { + +class Actor; + +class ActorContext { + public: + ActorContext() = default; + ActorContext(const ActorContext &) = delete; + ActorContext &operator=(const ActorContext &) = delete; + ActorContext(ActorContext &&) = delete; + ActorContext &operator=(ActorContext &&) = delete; + virtual ~ActorContext() = default; + const char *tag_ = nullptr; + std::weak_ptr<ActorContext> this_ptr_; +}; + +class ActorInfo + : private ListNode + , HeapNode { + public: + enum class Deleter : uint8 { Destroy, None }; + + ActorInfo() = default; + ~ActorInfo() = default; + + ActorInfo(ActorInfo &&) = delete; + ActorInfo &operator=(ActorInfo &&) = delete; + + ActorInfo(const ActorInfo &) = delete; + ActorInfo &operator=(const ActorInfo &) = delete; + + void init(int32 sched_id, Slice name, ObjectPool<ActorInfo>::OwnerPtr &&this_ptr, Actor *actor_ptr, Deleter deleter, + bool is_lite); + void on_actor_moved(Actor *actor_new_ptr); + + template <class ActorT> + ActorOwn<ActorT> transfer_ownership_to_scheduler(std::unique_ptr<ActorT> actor); + void clear(); + void destroy_actor(); + + bool empty() const; + void start_migrate(int32 to_sched_id); + bool is_migrating() const; + int32 migrate_dest() const; + std::pair<int32, bool> migrate_dest_flag_atomic() const; + + void finish_migrate(); + + ActorId<> actor_id(); + template <class SelfT> + ActorId<SelfT> actor_id(SelfT *self); + Actor *get_actor_unsafe(); + const Actor *get_actor_unsafe() const; + + void set_context(std::shared_ptr<ActorContext> context); + ActorContext *get_context(); + const ActorContext *get_context() const; + CSlice get_name() const; + + HeapNode *get_heap_node(); + const HeapNode *get_heap_node() const; + static ActorInfo *from_heap_node(HeapNode *node); + + ListNode *get_list_node(); + const ListNode *get_list_node() const; + static ActorInfo *from_list_node(ListNode *node); + + void start_run(); + bool is_running() const; + void finish_run(); + + vector<Event> mailbox_; + + bool is_lite() const; + + void set_wait_generation(uint32 wait_generation); + bool must_wait(uint32 wait_generation) const; + void always_wait_for_mailbox(); + + private: + Deleter deleter_; + bool is_lite_; + bool is_running_; + bool always_wait_for_mailbox_{false}; + uint32 wait_generation_{0}; + + std::atomic<int32> sched_id_{0}; + Actor *actor_ = nullptr; + +#ifdef TD_DEBUG + string name_; +#endif + std::shared_ptr<ActorContext> context_; +}; + +StringBuilder &operator<<(StringBuilder &sb, const ActorInfo &info); +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl/ActorInfo.h b/libs/tdlib/td/tdactor/td/actor/impl/ActorInfo.h new file mode 100644 index 0000000000..df0b0dfd81 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/ActorInfo.h @@ -0,0 +1,201 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl/Actor-decl.h" +#include "td/actor/impl/ActorInfo-decl.h" +#include "td/actor/impl/Scheduler-decl.h" + +#include "td/utils/common.h" +#include "td/utils/format.h" +#include "td/utils/Heap.h" +#include "td/utils/List.h" +#include "td/utils/logging.h" +#include "td/utils/ObjectPool.h" +#include "td/utils/Slice.h" +#include "td/utils/StringBuilder.h" + +#include <atomic> +#include <memory> +#include <utility> + +namespace td { +/*** ActorInfo ***/ +inline StringBuilder &operator<<(StringBuilder &sb, const ActorInfo &info) { + sb << info.get_name() << ":" << const_cast<void *>(static_cast<const void *>(&info)) << ":" + << const_cast<void *>(static_cast<const void *>(info.get_context())); + return sb; +} +inline void ActorInfo::init(int32 sched_id, Slice name, ObjectPool<ActorInfo>::OwnerPtr &&this_ptr, Actor *actor_ptr, + Deleter deleter, bool is_lite) { + CHECK(!is_running()); + CHECK(!is_migrating()); + sched_id_.store(sched_id, std::memory_order_relaxed); + actor_ = actor_ptr; + + if (!is_lite) { + context_ = Scheduler::context()->this_ptr_.lock(); +#ifdef TD_DEBUG + name_ = name.str(); +#endif + } + + actor_->init(std::move(this_ptr)); + deleter_ = deleter; + is_lite_ = is_lite; + is_running_ = false; + wait_generation_ = 0; +} +inline bool ActorInfo::is_lite() const { + return is_lite_; +} +inline void ActorInfo::set_wait_generation(uint32 wait_generation) { + wait_generation_ = wait_generation; +} +inline bool ActorInfo::must_wait(uint32 wait_generation) const { + return wait_generation_ == wait_generation || (always_wait_for_mailbox_ && !mailbox_.empty()); +} +inline void ActorInfo::always_wait_for_mailbox() { + always_wait_for_mailbox_ = true; +} +inline void ActorInfo::on_actor_moved(Actor *actor_new_ptr) { + actor_ = actor_new_ptr; +} + +inline void ActorInfo::clear() { + // LOG_IF(WARNING, !mailbox_.empty()) << "Destroy actor with non-empty mailbox: " << get_name() + // << format::as_array(mailbox_); + mailbox_.clear(); + CHECK(!is_running()); + CHECK(!is_migrating()); + // NB: must be in non migrating state + // store invalid scheduler id. + sched_id_.store((1 << 30) - 1, std::memory_order_relaxed); + destroy_actor(); + // Destroy context only after destructor. + context_.reset(); +} + +inline void ActorInfo::destroy_actor() { + if (!actor_) { + return; + } + switch (deleter_) { + case Deleter::Destroy: + std::default_delete<Actor>()(actor_); + break; + case Deleter::None: + break; + } + actor_ = nullptr; +} + +template <class ActorT> +ActorOwn<ActorT> ActorInfo::transfer_ownership_to_scheduler(std::unique_ptr<ActorT> actor) { + CHECK(!empty()); + CHECK(deleter_ == Deleter::None); + ActorT *actor_ptr = actor.release(); + CHECK(actor_ == static_cast<Actor *>(actor_ptr)); + actor_ = static_cast<Actor *>(actor_ptr); + deleter_ = Deleter::Destroy; + return ActorOwn<ActorT>(actor_id(actor_ptr)); +} + +inline bool ActorInfo::empty() const { + return actor_ == nullptr; +} + +inline void ActorInfo::start_migrate(int32 to_sched_id) { + sched_id_.store(to_sched_id | (1 << 30), std::memory_order_relaxed); +} +inline std::pair<int32, bool> ActorInfo::migrate_dest_flag_atomic() const { + int32 sched_id = sched_id_.load(std::memory_order_relaxed); + return std::make_pair(sched_id & ~(1 << 30), (sched_id & (1 << 30)) != 0); +} +inline void ActorInfo::finish_migrate() { + sched_id_.store(migrate_dest(), std::memory_order_relaxed); +} +inline bool ActorInfo::is_migrating() const { + return migrate_dest_flag_atomic().second; +} +inline int32 ActorInfo::migrate_dest() const { + return migrate_dest_flag_atomic().first; +} + +inline ActorId<> ActorInfo::actor_id() { + return actor_id(actor_); +} + +template <class SelfT> +ActorId<SelfT> ActorInfo::actor_id(SelfT *self) { + return actor_->actor_id(self); +} + +inline Actor *ActorInfo::get_actor_unsafe() { + return actor_; +} +inline const Actor *ActorInfo::get_actor_unsafe() const { + return actor_; +} + +inline void ActorInfo::set_context(std::shared_ptr<ActorContext> context) { + CHECK(is_running()); + context->this_ptr_ = context; + context->tag_ = Scheduler::context()->tag_; + context_ = std::move(context); + Scheduler::context() = context_.get(); + Scheduler::on_context_updated(); +} +inline const ActorContext *ActorInfo::get_context() const { + return context_.get(); +} + +inline ActorContext *ActorInfo::get_context() { + return context_.get(); +} + +inline CSlice ActorInfo::get_name() const { +#ifdef TD_DEBUG + return name_; +#else + return ""; +#endif +} + +inline void ActorInfo::start_run() { + VLOG(actor) << "start_run: " << *this; + CHECK(!is_running_) << "Recursive call of actor " << tag("name", get_name()); + is_running_ = true; +} +inline void ActorInfo::finish_run() { + is_running_ = false; + VLOG(actor) << "stop_run: " << *this; +} + +inline bool ActorInfo::is_running() const { + return is_running_; +} + +inline HeapNode *ActorInfo::get_heap_node() { + return this; +} +inline const HeapNode *ActorInfo::get_heap_node() const { + return this; +} +inline ActorInfo *ActorInfo::from_heap_node(HeapNode *node) { + return static_cast<ActorInfo *>(node); +} +inline ListNode *ActorInfo::get_list_node() { + return this; +} +inline const ListNode *ActorInfo::get_list_node() const { + return this; +} +inline ActorInfo *ActorInfo::from_list_node(ListNode *node) { + return static_cast<ActorInfo *>(node); +} +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp b/libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp new file mode 100644 index 0000000000..47593db90b --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.cpp @@ -0,0 +1,102 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/actor/impl/ConcurrentScheduler.h" + +#include "td/actor/impl/Actor.h" +#include "td/actor/impl/ActorId.h" +#include "td/actor/impl/ActorInfo.h" +#include "td/actor/impl/Scheduler.h" + +#include "td/utils/MpscPollableQueue.h" +#include "td/utils/port/thread_local.h" + +#include <memory> + +namespace td { + +void ConcurrentScheduler::init(int32 threads_n) { +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + threads_n = 0; +#endif + threads_n++; + std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound(threads_n); + for (int32 i = 0; i < threads_n; i++) { +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED +#else + auto queue = std::make_shared<MpscPollableQueue<EventFull>>(); + queue->init(); + outbound[i] = queue; +#endif + } + + schedulers_.resize(threads_n); + for (int32 i = 0; i < threads_n; i++) { + auto &sched = schedulers_[i]; + sched = make_unique<Scheduler>(); + sched->init(i, outbound, static_cast<Scheduler::Callback *>(this)); + } + + state_ = State::Start; +} + +void ConcurrentScheduler::test_one_thread_run() { + do { + for (auto &sched : schedulers_) { + sched->run(0); + } + } while (!is_finished_.load(std::memory_order_relaxed)); +} + +void ConcurrentScheduler::start() { + CHECK(state_ == State::Start); + is_finished_.store(false, std::memory_order_relaxed); + set_thread_id(0); +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + for (size_t i = 1; i < schedulers_.size(); i++) { + auto &sched = schedulers_[i]; + threads_.push_back(td::thread([&, tid = i]() { + set_thread_id(static_cast<int32>(tid)); + while (!is_finished()) { + sched->run(10); + } + })); + } +#endif + state_ = State::Run; +} + +bool ConcurrentScheduler::run_main(double timeout) { + CHECK(state_ == State::Run); + // run main scheduler in same thread + auto &main_sched = schedulers_[0]; + if (!is_finished()) { + main_sched->run(timeout); + } + return !is_finished(); +} + +void ConcurrentScheduler::finish() { + CHECK(state_ == State::Run); + if (!is_finished()) { + on_finish(); + } +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + for (auto &thread : threads_) { + thread.join(); + } + threads_.clear(); +#endif + schedulers_.clear(); + for (auto &f : at_finish_) { + f(); + } + at_finish_.clear(); + + state_ = State::Start; +} + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.h b/libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.h new file mode 100644 index 0000000000..1e9793eab4 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/ConcurrentScheduler.h @@ -0,0 +1,93 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl/Scheduler-decl.h" + +#include "td/utils/common.h" +#include "td/utils/logging.h" +#include "td/utils/port/thread.h" +#include "td/utils/Slice.h" + +#include <atomic> +#include <functional> +#include <mutex> +#include <utility> + +namespace td { + +class ConcurrentScheduler : private Scheduler::Callback { + public: + void init(int32 threads_n); + + void finish_async() { + schedulers_[0]->finish(); + } + void wakeup() { + schedulers_[0]->wakeup(); + } + SchedulerGuard get_current_guard() { + return schedulers_[0]->get_guard(); + } + + void test_one_thread_run(); + + bool is_finished() { + return is_finished_.load(std::memory_order_relaxed); + } + + void start(); + + bool run_main(double timeout); + + void finish(); + + template <class ActorT, class... Args> + ActorOwn<ActorT> create_actor_unsafe(int32 sched_id, Slice name, Args &&... args) { +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + sched_id = 0; +#endif + CHECK(0 <= sched_id && sched_id < static_cast<int32>(schedulers_.size())); + auto guard = schedulers_[sched_id]->get_guard(); + return schedulers_[sched_id]->create_actor<ActorT>(name, std::forward<Args>(args)...); + } + + template <class ActorT> + ActorOwn<ActorT> register_actor_unsafe(int32 sched_id, Slice name, ActorT *actor) { +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + sched_id = 0; +#endif + CHECK(0 <= sched_id && sched_id < static_cast<int32>(schedulers_.size())); + auto guard = schedulers_[sched_id]->get_guard(); + return schedulers_[sched_id]->register_actor<ActorT>(name, actor); + } + + private: + enum class State { Start, Run }; + State state_; + std::vector<unique_ptr<Scheduler>> schedulers_; + std::atomic<bool> is_finished_; + std::mutex at_finish_mutex_; + std::vector<std::function<void()>> at_finish_; +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + std::vector<thread> threads_; +#endif + + void on_finish() override { + is_finished_.store(true, std::memory_order_relaxed); + for (auto &it : schedulers_) { + it->wakeup(); + } + } + + void register_at_finish(std::function<void()> f) override { + std::lock_guard<std::mutex> lock(at_finish_mutex_); + at_finish_.push_back(std::move(f)); + } +}; + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl/Event.h b/libs/tdlib/td/tdactor/td/actor/impl/Event.h new file mode 100644 index 0000000000..fac66dd120 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/Event.h @@ -0,0 +1,247 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/Closure.h" +#include "td/utils/common.h" +#include "td/utils/format.h" +#include "td/utils/logging.h" +#include "td/utils/StringBuilder.h" + +#include <type_traits> +#include <utility> + +namespace td { + +class Actor; + +// Events +// +// Small structure (up to 16 bytes) used to send events between actors. +// +// There are some predefined types of events: +// NoType -- unitialized event +// Start -- start actor +// Stop -- stop actor +// Yield -- wake up actor +// Timeout -- some timeout has expired +// Hangup -- hang up called +// Raw -- just pass 8 bytes (union Raw is used for convenience) +// Custom -- Send CustomEvent + +template <class T> +std::enable_if_t<!std::is_base_of<Actor, T>::value> start_migrate(T &obj, int32 sched_id) { +} +template <class T> +std::enable_if_t<!std::is_base_of<Actor, T>::value> finish_migrate(T &obj) { +} + +class CustomEvent { + public: + CustomEvent() = default; + CustomEvent(const CustomEvent &) = delete; + CustomEvent &operator=(const CustomEvent &) = delete; + CustomEvent(CustomEvent &&) = delete; + CustomEvent &operator=(CustomEvent &&) = delete; + virtual ~CustomEvent() = default; + + virtual void run(Actor *actor) = 0; + virtual CustomEvent *clone() const = 0; + virtual void start_migrate(int32 sched_id) { + } + virtual void finish_migrate() { + } +}; + +template <class ClosureT> +class ClosureEvent : public CustomEvent { + public: + void run(Actor *actor) override { + closure_.run(static_cast<typename ClosureT::ActorType *>(actor)); + } + CustomEvent *clone() const override { + return new ClosureEvent<ClosureT>(closure_.clone()); + } + template <class... ArgsT> + explicit ClosureEvent(ArgsT &&... args) : closure_(std::forward<ArgsT>(args)...) { + } + + void start_migrate(int32 sched_id) override { + closure_.for_each([sched_id](auto &obj) { + using ::td::start_migrate; + start_migrate(obj, sched_id); + }); + } + + void finish_migrate() override { + closure_.for_each([](auto &obj) { + using ::td::finish_migrate; + finish_migrate(obj); + }); + } + + private: + ClosureT closure_; +}; + +template <class LambdaT> +class LambdaEvent : public CustomEvent { + public: + void run(Actor *actor) override { + f_(); + } + CustomEvent *clone() const override { + LOG(FATAL) << "Not supported"; + return nullptr; + } + template <class FromLambdaT> + explicit LambdaEvent(FromLambdaT &&lambda) : f_(std::forward<FromLambdaT>(lambda)) { + } + + private: + LambdaT f_; +}; + +class Event { + public: + enum class Type { NoType, Start, Stop, Yield, Timeout, Hangup, Raw, Custom }; + Type type; + uint64 link_token = 0; + union Raw { + void *ptr; + CustomEvent *custom_event; + uint32 u32; + uint64 u64; + } data{}; + + // factory functions + static Event start() { + return Event(Type::Start); + } + static Event stop() { + return Event(Type::Stop); + } + static Event yield() { + return Event(Type::Yield); + } + static Event timeout() { + return Event(Type::Timeout); + } + static Event hangup() { + return Event(Type::Hangup); + } + static Event raw(void *ptr) { + return Event(Type::Raw, ptr); + } + static Event raw(uint32 u32) { + return Event(Type::Raw, u32); + } + static Event raw(uint64 u64) { + return Event(Type::Raw, u64); + } + static Event custom(CustomEvent *custom_event) { + return Event(Type::Custom, custom_event); + } + + template <class FromImmediateClosureT> + static Event immediate_closure(FromImmediateClosureT &&closure) { + return custom( + new ClosureEvent<typename FromImmediateClosureT::Delayed>(std::forward<FromImmediateClosureT>(closure))); + } + template <class... ArgsT> + static Event delayed_closure(ArgsT &&... args) { + using DelayedClosureT = decltype(create_delayed_closure(std::forward<ArgsT>(args)...)); + return custom(new ClosureEvent<DelayedClosureT>(std::forward<ArgsT>(args)...)); + } + + template <class FromLambdaT> + static Event lambda(FromLambdaT &&lambda) { + return custom(new LambdaEvent<std::decay_t<FromLambdaT>>(std::forward<FromLambdaT>(lambda))); + } + + Event() : Event(Type::NoType) { + } + Event(const Event &other) = delete; + Event &operator=(const Event &) = delete; + Event(Event &&other) : type(other.type), link_token(other.link_token), data(other.data) { + other.type = Type::NoType; + } + Event &operator=(Event &&other) { + destroy(); + type = other.type; + link_token = other.link_token; + data = other.data; + other.type = Type::NoType; + return *this; + } + ~Event() { + destroy(); + } + + Event clone() const { + Event res; + res.type = type; + if (type == Type::Custom) { + res.data.custom_event = data.custom_event->clone(); + } else { + res.data = data; + } + return res; + } + + bool empty() const { + return type == Type::NoType; + } + + void clear() { + destroy(); + type = Type::NoType; + } + + Event &set_link_token(uint64 new_link_token) { + link_token = new_link_token; + return *this; + } + + friend void start_migrate(Event &obj, int32 sched_id) { + if (obj.type == Type::Custom) { + obj.data.custom_event->start_migrate(sched_id); + } + } + friend void finish_migrate(Event &obj) { + if (obj.type == Type::Custom) { + obj.data.custom_event->finish_migrate(); + } + } + + private: + explicit Event(Type type) : type(type) { + } + + Event(Type type, void *ptr) : Event(type) { + data.ptr = ptr; + } + Event(Type type, CustomEvent *custom_event) : Event(type) { + data.custom_event = custom_event; + } + Event(Type type, uint32 u32) : Event(type) { + data.u32 = u32; + } + Event(Type type, uint64 u64) : Event(type) { + data.u64 = u64; + } + + void destroy() { + if (type == Type::Custom) { + delete data.custom_event; + } + } +}; +inline StringBuilder &operator<<(StringBuilder &sb, const Event &e) { + return sb << tag("Event", static_cast<int32>(e.type)); +} +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h b/libs/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h new file mode 100644 index 0000000000..ef2f1c2dcb --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/EventFull-decl.h @@ -0,0 +1,87 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl/ActorId-decl.h" +#include "td/actor/impl/Event.h" + +#include "td/utils/type_traits.h" + +#include <type_traits> +#include <utility> + +namespace td { + +class EventFull { + public: + EventFull() = default; + + bool empty() const { + return data_.empty(); + } + + void clear() { + data_.clear(); + } + + ActorId<> actor_id() const { + return actor_id_; + } + Event &data() { + return data_; + } + + void try_emit_later(); + void try_emit(); + + private: + friend class EventCreator; + + EventFull(ActorRef actor_ref, Event &&data) : actor_id_(actor_ref.get()), data_(std::move(data)) { + data_.link_token = actor_ref.token(); + } + template <class T> + EventFull(ActorId<T> actor_id, Event &&data) : actor_id_(actor_id), data_(std::move(data)) { + } + + ActorId<> actor_id_; + + Event data_; +}; + +class EventCreator { + public: + template <class ActorIdT, class FunctionT, class... ArgsT> + static EventFull closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) { + using ActorT = typename std::decay_t<ActorIdT>::ActorT; + using FunctionClassT = member_function_class_t<FunctionT>; + static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure"); + + return EventFull(std::forward<ActorIdT>(actor_id), Event::delayed_closure(function, std::forward<ArgsT>(args)...)); + } + + template <class LambdaT> + static EventFull lambda(ActorRef actor_ref, LambdaT &&lambda) { + return EventFull(actor_ref, Event::lambda(std::forward<LambdaT>(lambda))); + } + + static EventFull yield(ActorRef actor_ref) { + return EventFull(actor_ref, Event::yield()); + } + static EventFull raw(ActorRef actor_ref, uint64 data) { + return EventFull(actor_ref, Event::raw(data)); + } + static EventFull raw(ActorRef actor_ref, void *ptr) { + return EventFull(actor_ref, Event::raw(ptr)); + } + + static EventFull event_unsafe(ActorId<> actor_id, Event &&event) { + return EventFull(actor_id, std::move(event)); + } +}; + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl/EventFull.h b/libs/tdlib/td/tdactor/td/actor/impl/EventFull.h new file mode 100644 index 0000000000..1e997ee4b3 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/EventFull.h @@ -0,0 +1,38 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl/EventFull-decl.h" +#include "td/actor/impl/Scheduler-decl.h" + +#include "td/utils/logging.h" + +#include <utility> + +namespace td { + +inline void EventFull::try_emit_later() { + if (empty()) { + return; + } + auto link_token = data_.link_token; + send_event_later(ActorShared<>(actor_id_, link_token), std::move(data_)); + data_.clear(); + CHECK(empty()); +} + +inline void EventFull::try_emit() { + if (empty()) { + return; + } + auto link_token = data_.link_token; + send_event(ActorShared<>(actor_id_, link_token), std::move(data_)); + data_.clear(); + CHECK(empty()); +} + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h b/libs/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h new file mode 100644 index 0000000000..4b51c102a5 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/Scheduler-decl.h @@ -0,0 +1,296 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl/Actor-decl.h" +#include "td/actor/impl/ActorId-decl.h" +#include "td/actor/impl/EventFull-decl.h" + +#include "td/utils/Closure.h" +#include "td/utils/Heap.h" +#include "td/utils/List.h" +#include "td/utils/MovableValue.h" +#include "td/utils/MpscPollableQueue.h" +#include "td/utils/ObjectPool.h" +#include "td/utils/port/EventFd.h" +#include "td/utils/port/Fd.h" +#include "td/utils/port/Poll.h" +#include "td/utils/port/thread_local.h" +#include "td/utils/Slice.h" +#include "td/utils/type_traits.h" + +#include <functional> +#include <map> +#include <memory> +#include <type_traits> +#include <utility> + +namespace td { +class ActorInfo; +struct Send { + using Flags = uint32; + static const Flags immediate = 0x001; + static const Flags later = 0x002; + static const Flags later_weak = 0x004; +}; + +class Scheduler; +class SchedulerGuard { + public: + explicit SchedulerGuard(Scheduler *scheduler); + ~SchedulerGuard(); + SchedulerGuard(const SchedulerGuard &other) = delete; + SchedulerGuard &operator=(const SchedulerGuard &other) = delete; + SchedulerGuard(SchedulerGuard &&other) = default; + SchedulerGuard &operator=(SchedulerGuard &&other) = delete; + + private: + MovableValue<bool> is_valid_ = true; + Scheduler *scheduler_; + ActorContext *save_context_; + Scheduler *save_scheduler_; + const char *save_tag_; +}; + +class Scheduler { + public: + class Callback { + public: + Callback() = default; + Callback(const Callback &) = delete; + Callback &operator=(const Callback &) = delete; + virtual ~Callback() = default; + virtual void on_finish() = 0; + virtual void register_at_finish(std::function<void()>) = 0; + }; + Scheduler() = default; + Scheduler(const Scheduler &) = delete; + Scheduler &operator=(const Scheduler &) = delete; + Scheduler(Scheduler &&) = delete; + Scheduler &operator=(Scheduler &&) = delete; + ~Scheduler(); + + void init(); + void init(int32 id, std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound, Callback *callback); + void clear(); + + int32 sched_id() const; + int32 sched_count() const; + + template <class ActorT, class... Args> + TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&... args); + template <class ActorT, class... Args> + TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args); + template <class ActorT> + TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, ActorT *actor_ptr, int32 sched_id = -1); + template <class ActorT> + TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, unique_ptr<ActorT> actor_ptr, int32 sched_id = -1); + + template <class ActorT> + TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_existing_actor(unique_ptr<ActorT> actor_ptr); + + void send_to_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event); + void send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event); + + template <class EventT> + void send_lambda(ActorRef actor_ref, EventT &&lambda, Send::Flags flags = 0); + + template <class EventT> + void send_closure(ActorRef actor_ref, EventT &&closure, Send::Flags flags = 0); + + void send(ActorRef actor_ref, Event &&event, Send::Flags flags = 0); + + void hack(const ActorId<> &actor_id, Event &&event) { + actor_id.get_actor_unsafe()->raw_event(event.data); + } + void before_tail_send(const ActorId<> &actor_id); + + void subscribe(const Fd &fd, Fd::Flags flags = Fd::Write | Fd::Read); + void unsubscribe(const Fd &fd); + void unsubscribe_before_close(const Fd &fd); + + void yield_actor(Actor *actor); + void stop_actor(Actor *actor); + void do_stop_actor(Actor *actor); + uint64 get_link_token(Actor *actor); + void migrate_actor(Actor *actor, int32 dest_sched_id); + void do_migrate_actor(Actor *actor, int32 dest_sched_id); + void start_migrate_actor(Actor *actor, int32 dest_sched_id); + void finish_migrate_actor(Actor *actor); + + bool has_actor_timeout(const Actor *actor) const; + void set_actor_timeout_in(Actor *actor, double timeout); + void set_actor_timeout_at(Actor *actor, double timeout_at); + void cancel_actor_timeout(Actor *actor); + + void finish(); + void yield(); + void run(double timeout); + void run_no_guard(double timeout); + + void wakeup(); + + static Scheduler *instance(); + static ActorContext *&context(); + static void on_context_updated(); + + SchedulerGuard get_guard(); + + private: + static void set_scheduler(Scheduler *scheduler); + /*** ServiceActor ***/ + class ServiceActor final : public Actor { + public: + void set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues); + void start_up() override; + + private: + std::shared_ptr<MpscPollableQueue<EventFull>> inbound_; + void loop() override; + }; + friend class ServiceActor; + + void do_custom_event(ActorInfo *actor, CustomEvent &event); + void do_event(ActorInfo *actor, Event &&event); + + void enter_actor(ActorInfo *actor_info); + void exit_actor(ActorInfo *actor_info); + + void yield_actor(ActorInfo *actor_info); + void stop_actor(ActorInfo *actor_info); + void do_stop_actor(ActorInfo *actor_info); + uint64 get_link_token(ActorInfo *actor_info); + void migrate_actor(ActorInfo *actor_info, int32 dest_sched_id); + void do_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id); + void start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id); + + bool has_actor_timeout(const ActorInfo *actor_info) const; + void set_actor_timeout_in(ActorInfo *actor_info, double timeout); + void set_actor_timeout_at(ActorInfo *actor_info, double timeout_at); + void cancel_actor_timeout(ActorInfo *actor_info); + + void register_migrated_actor(ActorInfo *actor_info); + void add_to_mailbox(ActorInfo *actor_info, Event &&event); + void clear_mailbox(ActorInfo *actor_info); + + template <class RunFuncT, class EventFuncT> + void flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func); + + template <class RunFuncT, class EventFuncT> + void send_impl(const ActorId<> &actor_id, Send::Flags flags, const RunFuncT &run_func, const EventFuncT &event_func); + + void inc_wait_generation(); + + double run_timeout(); + void run_mailbox(); + double run_events(); + void run_poll(double timeout); + + template <class ActorT> + ActorOwn<ActorT> register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id); + void destroy_actor(ActorInfo *actor_info); + + static TD_THREAD_LOCAL Scheduler *scheduler_; + static TD_THREAD_LOCAL ActorContext *context_; + + Callback *callback_ = nullptr; + std::unique_ptr<ObjectPool<ActorInfo>> actor_info_pool_; + + int32 actor_count_; + ListNode pending_actors_list_; + ListNode ready_actors_list_; + KHeap<double> timeout_queue_; + + std::map<ActorInfo *, std::vector<Event>> pending_events_; + +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + EventFd event_fd_; +#endif + ServiceActor service_actor_; + Poll poll_; + + bool yield_flag_; + bool has_guard_ = false; + bool close_flag_ = false; + + uint32 wait_generation_ = 0; + int32 sched_id_; + int32 sched_n_; + std::shared_ptr<MpscPollableQueue<EventFull>> inbound_queue_; + std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound_queues_; + + std::shared_ptr<ActorContext> save_context_; + + struct EventContext { + int32 dest_sched_id; + enum Flags { Stop = 1, Migrate = 2 }; + int32 flags{0}; + uint64 link_token; + + ActorInfo *actor_info; + }; + EventContext *event_context_ptr_; + + friend class GlobalScheduler; + friend class SchedulerGuard; + friend class EventGuard; +}; + +/*** Interface to current scheduler ***/ +void subscribe(const Fd &fd, Fd::Flags flags = Fd::Write | Fd::Read); +void unsubscribe(const Fd &fd); +void unsubscribe_before_close(const Fd &fd); + +template <class ActorT, class... Args> +TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor(Slice name, Args &&... args); +template <class ActorT, class... Args> +TD_WARN_UNUSED_RESULT ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args); +template <class ActorT> +TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, ActorT *actor_ptr, int32 sched_id = -1); +template <class ActorT> +TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_actor(Slice name, unique_ptr<ActorT> actor_ptr, int32 sched_id = -1); + +template <class ActorT> +TD_WARN_UNUSED_RESULT ActorOwn<ActorT> register_existing_actor(unique_ptr<ActorT> actor_ptr); + +template <class ActorIdT, class FunctionT, class... ArgsT> +void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) { + using ActorT = typename std::decay_t<ActorIdT>::ActorT; + using FunctionClassT = member_function_class_t<FunctionT>; + static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure"); + + Scheduler::instance()->send_closure(std::forward<ActorIdT>(actor_id), + create_immediate_closure(function, std::forward<ArgsT>(args)...)); +} + +template <class ActorIdT, class FunctionT, class... ArgsT> +void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) { + using ActorT = typename std::decay_t<ActorIdT>::ActorT; + using FunctionClassT = member_function_class_t<FunctionT>; + static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure"); + + Scheduler::instance()->send(std::forward<ActorIdT>(actor_id), + Event::delayed_closure(function, std::forward<ArgsT>(args)...), Send::later); +} + +template <class... ArgsT> +void send_lambda(ActorRef actor_ref, ArgsT &&... args) { + Scheduler::instance()->send_lambda(actor_ref, std::forward<ArgsT>(args)...); +} + +template <class... ArgsT> +void send_event(ActorRef actor_ref, ArgsT &&... args) { + Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)...); +} + +template <class... ArgsT> +void send_event_later(ActorRef actor_ref, ArgsT &&... args) { + Scheduler::instance()->send(actor_ref, std::forward<ArgsT>(args)..., Send::later); +} + +void yield_scheduler(); +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp b/libs/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp new file mode 100644 index 0000000000..479e419d62 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/Scheduler.cpp @@ -0,0 +1,496 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/actor/impl/Scheduler.h" + +#include "td/actor/impl/Actor.h" +#include "td/actor/impl/ActorId.h" +#include "td/actor/impl/ActorInfo.h" +#include "td/actor/impl/Event.h" +#include "td/actor/impl/EventFull.h" + +#include "td/utils/common.h" +#include "td/utils/format.h" +#include "td/utils/List.h" +#include "td/utils/logging.h" +#include "td/utils/ObjectPool.h" +#include "td/utils/port/thread_local.h" +#include "td/utils/ScopeGuard.h" +#include "td/utils/Time.h" + +#include <functional> +#include <utility> + +namespace td { + +TD_THREAD_LOCAL Scheduler *Scheduler::scheduler_; // static zero-initialized +TD_THREAD_LOCAL ActorContext *Scheduler::context_; // static zero-initialized + +Scheduler::~Scheduler() { + clear(); +} + +Scheduler *Scheduler::instance() { + return scheduler_; +} + +ActorContext *&Scheduler::context() { + return context_; +} + +void Scheduler::on_context_updated() { + LOG_TAG = context_->tag_; +} + +void Scheduler::set_scheduler(Scheduler *scheduler) { + scheduler_ = scheduler; +} + +void Scheduler::ServiceActor::start_up() { +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + CHECK(!inbound_); +#else + if (!inbound_) { + return; + } + auto &fd = inbound_->reader_get_event_fd(); + + fd.get_fd().set_observer(this); + ::td::subscribe(fd.get_fd(), Fd::Read); + yield(); +#endif +} + +void Scheduler::ServiceActor::loop() { + auto &queue = inbound_; + int ready_n = queue->reader_wait_nonblock(); + if (ready_n == 0) { + return; + } + while (ready_n-- > 0) { + EventFull event = queue->reader_get_unsafe(); + if (event.actor_id().empty()) { + Scheduler::instance()->register_migrated_actor(static_cast<ActorInfo *>(event.data().data.ptr)); + } else { + VLOG(actor) << "Receive " << event.data(); + finish_migrate(event.data()); + event.try_emit(); + } + } + queue->reader_flush(); + yield(); +} + +/*** SchedlerGuard ***/ +SchedulerGuard::SchedulerGuard(Scheduler *scheduler) : scheduler_(scheduler) { + CHECK(!scheduler_->has_guard_); + scheduler_->has_guard_ = true; + save_scheduler_ = Scheduler::instance(); + Scheduler::set_scheduler(scheduler_); + + // Scheduler::context() must be not null + save_context_ = scheduler_->save_context_.get(); + save_tag_ = LOG_TAG; + LOG_TAG = save_context_->tag_; + std::swap(save_context_, Scheduler::context()); +} + +SchedulerGuard::~SchedulerGuard() { + if (is_valid_.get()) { + std::swap(save_context_, scheduler_->context()); + Scheduler::set_scheduler(save_scheduler_); + CHECK(scheduler_->has_guard_); + scheduler_->has_guard_ = false; + LOG_TAG = save_tag_; + } +} + +/*** EventGuard ***/ +EventGuard::EventGuard(Scheduler *scheduler, ActorInfo *actor_info) : scheduler_(scheduler) { + actor_info->start_run(); + event_context_.actor_info = actor_info; + event_context_ptr_ = &event_context_; + + save_context_ = actor_info->get_context(); +#ifdef TD_DEBUG + save_log_tag2_ = actor_info->get_name().c_str(); +#endif + swap_context(actor_info); +} + +EventGuard::~EventGuard() { + auto info = event_context_.actor_info; + auto node = info->get_list_node(); + node->remove(); + if (info->mailbox_.empty()) { + scheduler_->pending_actors_list_.put(node); + } else { + scheduler_->ready_actors_list_.put(node); + } + info->finish_run(); + swap_context(info); + CHECK(info->is_lite() || save_context_ == info->get_context()); +#ifdef TD_DEBUG + CHECK(info->is_lite() || save_log_tag2_ == info->get_name().c_str()); +#endif + if (event_context_.flags & Scheduler::EventContext::Stop) { + scheduler_->do_stop_actor(info); + return; + } + if (event_context_.flags & Scheduler::EventContext::Migrate) { + scheduler_->do_migrate_actor(info, event_context_.dest_sched_id); + } +} + +void EventGuard::swap_context(ActorInfo *info) { + std::swap(scheduler_->event_context_ptr_, event_context_ptr_); + + if (info->is_lite()) { + return; + } + +#ifdef TD_DEBUG + std::swap(LOG_TAG2, save_log_tag2_); +#endif + + auto *current_context_ptr = &Scheduler::context(); + if (save_context_ != *current_context_ptr) { + std::swap(save_context_, *current_context_ptr); + Scheduler::on_context_updated(); + } +} + +void Scheduler::init(int32 id, std::vector<std::shared_ptr<MpscPollableQueue<EventFull>>> outbound, + Callback *callback) { + save_context_ = std::make_shared<ActorContext>(); + save_context_->this_ptr_ = save_context_; + save_context_->tag_ = LOG_TAG; + + auto guard = get_guard(); + + callback_ = callback; + actor_info_pool_ = make_unique<ObjectPool<ActorInfo>>(); + + yield_flag_ = false; + actor_count_ = 0; + sched_id_ = 0; + + poll_.init(); + +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + event_fd_.init(); + subscribe(event_fd_.get_fd(), Fd::Read); +#endif + + if (!outbound.empty()) { + inbound_queue_ = std::move(outbound[id]); + } + outbound_queues_ = std::move(outbound); + sched_id_ = id; + sched_n_ = static_cast<int32>(outbound_queues_.size()); + service_actor_.set_queue(inbound_queue_); + register_actor("ServiceActor", &service_actor_).release(); +} + +void Scheduler::clear() { + if (service_actor_.empty()) { + return; + } + close_flag_ = true; + auto guard = get_guard(); + + // Stop all actors + if (!service_actor_.empty()) { + service_actor_.do_stop(); + } + while (!pending_actors_list_.empty()) { + auto actor_info = ActorInfo::from_list_node(pending_actors_list_.get()); + do_stop_actor(actor_info); + } + while (!ready_actors_list_.empty()) { + auto actor_info = ActorInfo::from_list_node(ready_actors_list_.get()); + do_stop_actor(actor_info); + } + LOG_IF(FATAL, !ready_actors_list_.empty()) << ActorInfo::from_list_node(ready_actors_list_.next)->get_name(); + CHECK(ready_actors_list_.empty()); + poll_.clear(); + +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + if (!event_fd_.empty()) { + event_fd_.close(); + } +#endif + + if (callback_) { + // can't move lambda with unique_ptr inside into std::function + auto ptr = actor_info_pool_.release(); + callback_->register_at_finish([=]() { delete ptr; }); + } else { + actor_info_pool_.reset(); + } +} + +void Scheduler::do_event(ActorInfo *actor_info, Event &&event) { + event_context_ptr_->link_token = event.link_token; + auto actor = actor_info->get_actor_unsafe(); + switch (event.type) { + case Event::Type::Start: { + VLOG(actor) << *actor_info << " Event::Start"; + actor->start_up(); + break; + } + case Event::Type::Stop: { + VLOG(actor) << *actor_info << " Event::Stop"; + actor->tear_down(); + break; + } + case Event::Type::Yield: { + VLOG(actor) << *actor_info << " Event::Yield"; + actor->wakeup(); + break; + } + case Event::Type::Hangup: { + auto token = get_link_token(actor); + VLOG(actor) << *actor_info << " Event::Hangup " << tag("token", format::as_hex(token)); + if (token != 0) { + actor->hangup_shared(); + } else { + actor->hangup(); + } + break; + } + case Event::Type::Timeout: { + VLOG(actor) << *actor_info << " Event::Timeout"; + actor->timeout_expired(); + break; + } + case Event::Type::Raw: { + VLOG(actor) << *actor_info << " Event::Raw"; + actor->raw_event(event.data); + break; + } + case Event::Type::Custom: { + do_custom_event(actor_info, *event.data.custom_event); + break; + } + case Event::Type::NoType: { + UNREACHABLE(); + break; + } + } + // can't clear event here. It may be already destroyed during destory_actor +} + +void Scheduler::register_migrated_actor(ActorInfo *actor_info) { + VLOG(actor) << "Register migrated actor: " << tag("name", *actor_info) << tag("ptr", actor_info) + << tag("actor_count", actor_count_); + actor_count_++; + CHECK(actor_info->is_migrating()); + CHECK(sched_id_ == actor_info->migrate_dest()); + // CHECK(!actor_info->is_running()); + actor_info->finish_migrate(); + for (auto &event : actor_info->mailbox_) { + finish_migrate(event); + } + auto it = pending_events_.find(actor_info); + if (it != pending_events_.end()) { + actor_info->mailbox_.insert(actor_info->mailbox_.end(), make_move_iterator(begin(it->second)), + make_move_iterator(end(it->second))); + pending_events_.erase(it); + } + if (actor_info->mailbox_.empty()) { + pending_actors_list_.put(actor_info->get_list_node()); + } else { + ready_actors_list_.put(actor_info->get_list_node()); + } + actor_info->get_actor_unsafe()->on_finish_migrate(); +} + +void Scheduler::send_to_other_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event) { + if (sched_id < sched_count()) { + auto actor_info = actor_id.get_actor_info(); + if (actor_info) { + VLOG(actor) << "Send to " << *actor_info << " on scheduler " << sched_id << ": " << event; + } else { + VLOG(actor) << "Send to scheduler " << sched_id << ": " << event; + } + start_migrate(event, sched_id); + outbound_queues_[sched_id]->writer_put(EventCreator::event_unsafe(actor_id, std::move(event))); + outbound_queues_[sched_id]->writer_flush(); + } +} + +void Scheduler::add_to_mailbox(ActorInfo *actor_info, Event &&event) { + if (!actor_info->is_running()) { + auto node = actor_info->get_list_node(); + node->remove(); + ready_actors_list_.put(node); + } + VLOG(actor) << "Add to mailbox: " << *actor_info << " " << event; + actor_info->mailbox_.push_back(std::move(event)); +} + +void Scheduler::do_stop_actor(Actor *actor) { + return do_stop_actor(actor->get_info()); +} +void Scheduler::do_stop_actor(ActorInfo *actor_info) { + CHECK(!actor_info->is_migrating()); + CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_; + ObjectPool<ActorInfo>::OwnerPtr owner_ptr; + if (!actor_info->is_lite()) { + EventGuard guard(this, actor_info); + do_event(actor_info, Event::stop()); + owner_ptr = actor_info->get_actor_unsafe()->clear(); + // Actor context is visible in destructor + actor_info->destroy_actor(); + event_context_ptr_->flags = 0; + } else { + owner_ptr = actor_info->get_actor_unsafe()->clear(); + } + destroy_actor(actor_info); +} + +void Scheduler::migrate_actor(Actor *actor, int32 dest_sched_id) { + migrate_actor(actor->get_info(), dest_sched_id); +} +void Scheduler::migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) { + CHECK(event_context_ptr_->actor_info == actor_info); + if (sched_id_ == dest_sched_id) { + return; + } + event_context_ptr_->flags |= EventContext::Migrate; + event_context_ptr_->dest_sched_id = dest_sched_id; +} + +void Scheduler::do_migrate_actor(Actor *actor, int32 dest_sched_id) { + do_migrate_actor(actor->get_info(), dest_sched_id); +} +void Scheduler::do_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) { +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + dest_sched_id = 0; +#endif + if (sched_id_ == dest_sched_id) { + return; + } + start_migrate_actor(actor_info, dest_sched_id); + send_to_other_scheduler(dest_sched_id, ActorId<>(), Event::raw(actor_info)); +} + +void Scheduler::start_migrate_actor(Actor *actor, int32 dest_sched_id) { + start_migrate_actor(actor->get_info(), dest_sched_id); +} +void Scheduler::start_migrate_actor(ActorInfo *actor_info, int32 dest_sched_id) { + VLOG(actor) << "Start migrate actor: " << tag("name", actor_info) << tag("ptr", actor_info) + << tag("actor_count", actor_count_); + actor_count_--; + CHECK(actor_count_ >= 0); + actor_info->get_actor_unsafe()->on_start_migrate(dest_sched_id); + for (auto &event : actor_info->mailbox_) { + start_migrate(event, dest_sched_id); + } + actor_info->start_migrate(dest_sched_id); + actor_info->get_list_node()->remove(); + cancel_actor_timeout(actor_info); +} + +void Scheduler::set_actor_timeout_in(ActorInfo *actor_info, double timeout) { + if (timeout > 1e10) { + timeout = 1e10; + } + if (timeout < 0) { + timeout = 0; + } + double expire_at = Time::now() + timeout; + set_actor_timeout_at(actor_info, expire_at); +} + +void Scheduler::set_actor_timeout_at(ActorInfo *actor_info, double timeout_at) { + HeapNode *heap_node = actor_info->get_heap_node(); + VLOG(actor) << "set actor " << *actor_info << " " << tag("timeout", timeout_at) << timeout_at - Time::now_cached(); + if (heap_node->in_heap()) { + timeout_queue_.fix(timeout_at, heap_node); + } else { + timeout_queue_.insert(timeout_at, heap_node); + } +} + +void Scheduler::run_poll(double timeout) { + // LOG(DEBUG) << "run poll [timeout:" << format::as_time(timeout) << "]"; + // we can't wait for less than 1ms + poll_.run(static_cast<int32>(timeout * 1000 + 1)); + +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + if (can_read(event_fd_.get_fd())) { + std::atomic_thread_fence(std::memory_order_acquire); + event_fd_.acquire(); + } +#endif +} + +void Scheduler::run_mailbox() { + VLOG(actor) << "run mailbox : begin"; + ListNode actors_list = std::move(ready_actors_list_); + while (!actors_list.empty()) { + ListNode *node = actors_list.get(); + CHECK(node); + auto actor_info = ActorInfo::from_list_node(node); + inc_wait_generation(); + flush_mailbox(actor_info, static_cast<void (*)(ActorInfo *)>(nullptr), static_cast<Event (*)()>(nullptr)); + } + VLOG(actor) << "run mailbox : finish " << actor_count_; + + //Useful for debug, but O(ActorsCount) check + + //int cnt = 0; + //for (ListNode *end = &pending_actors_list_, *it = pending_actors_list_.next; it != end; it = it->next) { + //cnt++; + //auto actor_info = ActorInfo::from_list_node(it); + //LOG(ERROR) << *actor_info; + //CHECK(actor_info->mailbox_.empty()); + //CHECK(!actor_info->is_running()); + //} + //for (ListNode *end = &ready_actors_list_, *it = ready_actors_list_.next; it != end; it = it->next) { + //auto actor_info = ActorInfo::from_list_node(it); + //LOG(ERROR) << *actor_info; + //cnt++; + //} + //CHECK(cnt == actor_count_) << cnt << " vs " << actor_count_; +} + +double Scheduler::run_timeout() { + double now = Time::now(); + while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) { + HeapNode *node = timeout_queue_.pop(); + ActorInfo *actor_info = ActorInfo::from_heap_node(node); + inc_wait_generation(); + send(actor_info->actor_id(), Event::timeout(), Send::immediate); + } + if (timeout_queue_.empty()) { + return 10000; + } + double timeout = timeout_queue_.top_key() - now; + // LOG(DEBUG) << "Timeout [cnt:" << timeout_queue_.size() << "] in " << format::as_time(timeout); + return timeout; +} + +void Scheduler::run_no_guard(double timeout) { + CHECK(has_guard_); + SCOPE_EXIT { + yield_flag_ = false; + }; + + double next_timeout = run_events(); + if (next_timeout < timeout) { + timeout = next_timeout; + } + if (yield_flag_) { + return; + } + run_poll(timeout); + run_events(); +} + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl/Scheduler.h b/libs/tdlib/td/tdactor/td/actor/impl/Scheduler.h new file mode 100644 index 0000000000..7edf3f1d2d --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl/Scheduler.h @@ -0,0 +1,397 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl/ActorInfo-decl.h" +#include "td/actor/impl/Scheduler-decl.h" + +#include "td/utils/format.h" +#include "td/utils/Heap.h" +#include "td/utils/logging.h" +#include "td/utils/MpscPollableQueue.h" +#include "td/utils/ObjectPool.h" +#include "td/utils/port/Fd.h" +#include "td/utils/Slice.h" + +#include <atomic> +#include <memory> +#include <tuple> +#include <utility> + +namespace td { + +/*** ServiceActor ***/ +inline void Scheduler::ServiceActor::set_queue(std::shared_ptr<MpscPollableQueue<EventFull>> queues) { + inbound_ = std::move(queues); +} + +/*** EventGuard ***/ +class EventGuard { + public: + EventGuard(Scheduler *scheduler, ActorInfo *actor_info); + + bool can_run() const { + return event_context_.flags == 0; + } + + EventGuard(const EventGuard &other) = delete; + EventGuard &operator=(const EventGuard &other) = delete; + EventGuard(EventGuard &&other) = delete; + EventGuard &operator=(EventGuard &&other) = delete; + ~EventGuard(); + + private: + Scheduler::EventContext event_context_; + Scheduler::EventContext *event_context_ptr_; + Scheduler *scheduler_; + ActorContext *save_context_; + const char *save_log_tag2_; + + void swap_context(ActorInfo *info); +}; + +/*** Scheduler ***/ +inline SchedulerGuard Scheduler::get_guard() { + return SchedulerGuard(this); +} + +inline void Scheduler::init() { + init(0, {}, nullptr); +} + +inline int32 Scheduler::sched_id() const { + return sched_id_; +} +inline int32 Scheduler::sched_count() const { + return sched_n_; +} + +template <class ActorT, class... Args> +ActorOwn<ActorT> Scheduler::create_actor(Slice name, Args &&... args) { + return register_actor_impl(name, new ActorT(std::forward<Args>(args)...), Actor::Deleter::Destroy, sched_id_); +} + +template <class ActorT, class... Args> +ActorOwn<ActorT> Scheduler::create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args) { + return register_actor_impl(name, new ActorT(std::forward<Args>(args)...), Actor::Deleter::Destroy, sched_id); +} + +template <class ActorT> +ActorOwn<ActorT> Scheduler::register_actor(Slice name, ActorT *actor_ptr, int32 sched_id) { + return register_actor_impl(name, actor_ptr, Actor::Deleter::None, sched_id); +} + +template <class ActorT> +ActorOwn<ActorT> Scheduler::register_actor(Slice name, unique_ptr<ActorT> actor_ptr, int32 sched_id) { + return register_actor_impl(name, actor_ptr.release(), Actor::Deleter::Destroy, sched_id); +} + +template <class ActorT> +ActorOwn<ActorT> Scheduler::register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id) { + if (sched_id == -1) { + sched_id = sched_id_; + } +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + sched_id = 0; +#endif + CHECK(sched_id == sched_id_ || (0 <= sched_id && sched_id < static_cast<int32>(outbound_queues_.size()))) << sched_id; + auto info = actor_info_pool_->create_empty(); + VLOG(actor) << "Create actor: " << tag("name", name) << tag("ptr", *info) << tag("context", context()) + << tag("this", this) << tag("actor_count", actor_count_); + actor_count_++; + auto weak_info = info.get_weak(); + auto actor_info = info.get(); + info->init(sched_id_, name, std::move(info), static_cast<Actor *>(actor_ptr), deleter, ActorTraits<ActorT>::is_lite); + + ActorId<ActorT> actor_id = weak_info->actor_id(actor_ptr); + if (sched_id != sched_id_) { + send(actor_id, Event::start(), Send::later_weak); + do_migrate_actor(actor_info, sched_id); + } else { + pending_actors_list_.put(weak_info->get_list_node()); + if (!ActorTraits<ActorT>::is_lite) { + send(actor_id, Event::start(), Send::later_weak); + } + } + + return ActorOwn<ActorT>(actor_id); +} + +template <class ActorT> +ActorOwn<ActorT> Scheduler::register_existing_actor(std::unique_ptr<ActorT> actor_ptr) { + CHECK(!actor_ptr->empty()); + auto actor_info = actor_ptr->get_info(); + CHECK(actor_info->migrate_dest_flag_atomic().first == sched_id_); + return actor_info->transfer_ownership_to_scheduler(std::move(actor_ptr)); +} + +inline void Scheduler::destroy_actor(ActorInfo *actor_info) { + VLOG(actor) << "Destroy actor: " << tag("name", *actor_info) << tag("ptr", actor_info) + << tag("actor_count", actor_count_); + + CHECK(actor_info->migrate_dest() == sched_id_) << actor_info->migrate_dest() << " " << sched_id_; + cancel_actor_timeout(actor_info); + actor_info->get_list_node()->remove(); + // called by ObjectPool + // actor_info->clear(); + actor_count_--; + CHECK(actor_count_ >= 0); +} + +inline void Scheduler::do_custom_event(ActorInfo *actor_info, CustomEvent &event) { + VLOG(actor) << *actor_info << " Event::Custom"; + event.run(actor_info->get_actor_unsafe()); +} + +template <class RunFuncT, class EventFuncT> +void Scheduler::flush_mailbox(ActorInfo *actor_info, const RunFuncT &run_func, const EventFuncT &event_func) { + auto &mailbox = actor_info->mailbox_; + size_t mailbox_size = mailbox.size(); + CHECK(mailbox_size != 0); + EventGuard guard(this, actor_info); + size_t i = 0; + for (; i < mailbox_size && guard.can_run(); i++) { + do_event(actor_info, std::move(mailbox[i])); + } + if (run_func) { + if (guard.can_run()) { + (*run_func)(actor_info); + } else { + mailbox.insert(begin(mailbox) + i, (*event_func)()); + } + } + mailbox.erase(begin(mailbox), begin(mailbox) + i); +} + +inline void Scheduler::send_to_scheduler(int32 sched_id, const ActorId<> &actor_id, Event &&event) { + if (sched_id == sched_id_) { + ActorInfo *actor_info = actor_id.get_actor_info(); + pending_events_[actor_info].push_back(std::move(event)); + } else { + send_to_other_scheduler(sched_id, actor_id, std::move(event)); + } +} + +inline void Scheduler::before_tail_send(const ActorId<> &actor_id) { + // TODO +} + +inline void Scheduler::inc_wait_generation() { + wait_generation_++; +} + +template <class RunFuncT, class EventFuncT> +void Scheduler::send_impl(const ActorId<> &actor_id, Send::Flags flags, const RunFuncT &run_func, + const EventFuncT &event_func) { + CHECK(has_guard_); + ActorInfo *actor_info = actor_id.get_actor_info(); + if (unlikely(actor_info == nullptr || close_flag_)) { + // LOG(ERROR) << "Invalid actor id"; + return; + } + + CHECK(actor_info != nullptr); + int32 actor_sched_id; + bool is_migrating; + std::tie(actor_sched_id, is_migrating) = actor_info->migrate_dest_flag_atomic(); + bool on_current_sched = !is_migrating && sched_id_ == actor_sched_id; + + if (likely(!(flags & Send::later) && !(flags & Send::later_weak) && on_current_sched && !actor_info->is_running() && + !actor_info->must_wait(wait_generation_))) { // run immediately + if (likely(actor_info->mailbox_.empty())) { + EventGuard guard(this, actor_info); + run_func(actor_info); + } else { + flush_mailbox(actor_info, &run_func, &event_func); + } + } else { + if (on_current_sched) { + add_to_mailbox(actor_info, event_func()); + if (flags & Send::later) { + actor_info->set_wait_generation(wait_generation_); + } + } else { + send_to_scheduler(actor_sched_id, actor_id, event_func()); + } + } +} + +template <class EventT> +void Scheduler::send_lambda(ActorRef actor_ref, EventT &&lambda, Send::Flags flags) { + return send_impl(actor_ref.get(), flags, + [&](ActorInfo *actor_info) { + event_context_ptr_->link_token = actor_ref.token(); + lambda(); + }, + [&]() { + auto event = Event::lambda(std::forward<EventT>(lambda)); + event.set_link_token(actor_ref.token()); + return std::move(event); + }); +} + +template <class EventT> +void Scheduler::send_closure(ActorRef actor_ref, EventT &&closure, Send::Flags flags) { + return send_impl(actor_ref.get(), flags, + [&](ActorInfo *actor_info) { + event_context_ptr_->link_token = actor_ref.token(); + closure.run(static_cast<typename EventT::ActorType *>(actor_info->get_actor_unsafe())); + }, + [&]() { + auto event = Event::immediate_closure(std::forward<EventT>(closure)); + event.set_link_token(actor_ref.token()); + return std::move(event); + }); +} + +inline void Scheduler::send(ActorRef actor_ref, Event &&event, Send::Flags flags) { + event.set_link_token(actor_ref.token()); + return send_impl(actor_ref.get(), flags, [&](ActorInfo *actor_info) { do_event(actor_info, std::move(event)); }, + [&]() { return std::move(event); }); +} + +inline void Scheduler::subscribe(const Fd &fd, Fd::Flags flags) { + poll_.subscribe(fd, flags); +} + +inline void Scheduler::unsubscribe(const Fd &fd) { + poll_.unsubscribe(fd); +} + +inline void Scheduler::unsubscribe_before_close(const Fd &fd) { + poll_.unsubscribe_before_close(fd); +} + +inline void Scheduler::yield_actor(Actor *actor) { + yield_actor(actor->get_info()); +} +inline void Scheduler::yield_actor(ActorInfo *actor_info) { + send(actor_info->actor_id(), Event::yield(), Send::later_weak); +} + +inline void Scheduler::stop_actor(Actor *actor) { + stop_actor(actor->get_info()); +} +inline void Scheduler::stop_actor(ActorInfo *actor_info) { + CHECK(event_context_ptr_->actor_info == actor_info); + event_context_ptr_->flags |= EventContext::Stop; +} + +inline uint64 Scheduler::get_link_token(Actor *actor) { + return get_link_token(actor->get_info()); +} +inline uint64 Scheduler::get_link_token(ActorInfo *actor_info) { + CHECK(event_context_ptr_->actor_info == actor_info); + return event_context_ptr_->link_token; +} + +inline void Scheduler::finish_migrate_actor(Actor *actor) { + register_migrated_actor(actor->get_info()); +} + +inline bool Scheduler::has_actor_timeout(const Actor *actor) const { + return has_actor_timeout(actor->get_info()); +} +inline void Scheduler::set_actor_timeout_in(Actor *actor, double timeout) { + set_actor_timeout_in(actor->get_info(), timeout); +} +inline void Scheduler::set_actor_timeout_at(Actor *actor, double timeout_at) { + set_actor_timeout_at(actor->get_info(), timeout_at); +} +inline void Scheduler::cancel_actor_timeout(Actor *actor) { + cancel_actor_timeout(actor->get_info()); +} + +inline bool Scheduler::has_actor_timeout(const ActorInfo *actor_info) const { + const HeapNode *heap_node = actor_info->get_heap_node(); + return heap_node->in_heap(); +} + +inline void Scheduler::cancel_actor_timeout(ActorInfo *actor_info) { + HeapNode *heap_node = actor_info->get_heap_node(); + if (heap_node->in_heap()) { + timeout_queue_.erase(heap_node); + } +} + +inline void Scheduler::finish() { + if (callback_) { + callback_->on_finish(); + } + yield(); +} + +inline void Scheduler::yield() { + yield_flag_ = true; +} + +inline void Scheduler::wakeup() { + std::atomic_thread_fence(std::memory_order_release); +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + event_fd_.release(); +#endif +} + +inline double Scheduler::run_events() { + double res; + VLOG(actor) << "run events " << sched_id_ << " " << tag("pending", pending_events_.size()) + << tag("actors", actor_count_); + do { + run_mailbox(); + res = run_timeout(); + } while (!ready_actors_list_.empty()); + return res; +} + +inline void Scheduler::run(double timeout) { + auto guard = get_guard(); + run_no_guard(timeout); +} + +/*** Interface to current scheduler ***/ +inline void subscribe(const Fd &fd, Fd::Flags flags) { + Scheduler::instance()->subscribe(fd, flags); +} + +inline void unsubscribe(const Fd &fd) { + Scheduler::instance()->unsubscribe(fd); +} + +inline void unsubscribe_before_close(const Fd &fd) { + Scheduler::instance()->unsubscribe_before_close(fd); +} + +template <class ActorT, class... Args> +ActorOwn<ActorT> create_actor(Slice name, Args &&... args) { + return Scheduler::instance()->create_actor<ActorT>(name, std::forward<Args>(args)...); +} + +template <class ActorT, class... Args> +ActorOwn<ActorT> create_actor_on_scheduler(Slice name, int32 sched_id, Args &&... args) { + return Scheduler::instance()->create_actor_on_scheduler<ActorT>(name, sched_id, std::forward<Args>(args)...); +} + +template <class ActorT> +ActorOwn<ActorT> register_actor(Slice name, ActorT *actor_ptr, int32 sched_id) { + return Scheduler::instance()->register_actor<ActorT>(name, actor_ptr, sched_id); +} + +template <class ActorT> +ActorOwn<ActorT> register_actor(Slice name, unique_ptr<ActorT> actor_ptr, int32 sched_id) { + return Scheduler::instance()->register_actor<ActorT>(name, std::move(actor_ptr), sched_id); +} + +template <class ActorT> +ActorOwn<ActorT> register_existing_actor(unique_ptr<ActorT> actor_ptr) { + return Scheduler::instance()->register_existing_actor(std::move(actor_ptr)); +} + +inline void yield_scheduler() { + Scheduler::instance()->yield(); +} + +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h b/libs/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h new file mode 100644 index 0000000000..2cb5cb2127 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl2/ActorLocker.h @@ -0,0 +1,117 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl2/ActorSignals.h" +#include "td/actor/impl2/ActorState.h" + +#include "td/utils/logging.h" + +#include <atomic> + +namespace td { +namespace actor2 { +class ActorLocker { + public: + struct Options { + Options() { + } + bool can_execute_paused = false; + bool is_shared = true; + Options &with_can_execute_paused(bool new_can_execute_paused) { + can_execute_paused = new_can_execute_paused; + return *this; + } + Options &with_is_shared(bool new_is_shared) { + is_shared = new_is_shared; + return *this; + } + }; + explicit ActorLocker(ActorState *state, Options options = {}) + : state_(state), flags_(state->get_flags_unsafe()), new_flags_{}, options_{options} { + } + bool try_lock() { + CHECK(!own_lock()); + while (!can_try_add_signals()) { + new_flags_ = flags_; + new_flags_.set_locked(true); + new_flags_.clear_signals(); + if (state_->state_.compare_exchange_strong(flags_.raw_ref(), new_flags_.raw(), std::memory_order_acq_rel)) { + own_lock_ = true; + return true; + } + } + return false; + } + bool try_unlock(ActorState::Flags flags) { + CHECK(!flags.is_locked()); + CHECK(own_lock()); + // can't unlock with signals set + //CHECK(!flags.has_signals()); + + flags_ = flags; + //try unlock + if (state_->state_.compare_exchange_strong(new_flags_.raw_ref(), flags.raw(), std::memory_order_acq_rel)) { + own_lock_ = false; + return true; + } + + // read all signals + flags.set_locked(true); + flags.clear_signals(); + do { + flags_.add_signals(new_flags_.get_signals()); + } while (!state_->state_.compare_exchange_strong(new_flags_.raw_ref(), flags.raw(), std::memory_order_acq_rel)); + new_flags_ = flags; + return false; + } + + bool try_add_signals(ActorSignals signals) { + CHECK(!own_lock()); + CHECK(can_try_add_signals()); + new_flags_ = flags_; + new_flags_.add_signals(signals); + return state_->state_.compare_exchange_strong(flags_.raw_ref(), new_flags_.raw(), std::memory_order_acq_rel); + } + bool add_signals(ActorSignals signals) { + CHECK(!own_lock()); + while (true) { + if (can_try_add_signals()) { + if (try_add_signals(signals)) { + return false; + } + } else { + if (try_lock()) { + flags_.add_signals(signals); + return true; + } + } + } + } + bool own_lock() const { + return own_lock_; + } + ActorState::Flags flags() const { + return flags_; + } + bool can_execute() const { + return flags_.is_shared() == options_.is_shared && (options_.can_execute_paused || !flags_.is_pause()); + } + + private: + ActorState *state_{nullptr}; + ActorState::Flags flags_; + ActorState::Flags new_flags_; + bool own_lock_{false}; + Options options_; + + bool can_try_add_signals() const { + return flags_.is_locked() || (flags_.is_in_queue() && !can_execute()); + } +}; +} // namespace actor2 +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h b/libs/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h new file mode 100644 index 0000000000..b7a7483022 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl2/ActorSignals.h @@ -0,0 +1,84 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/common.h" + +namespace td { +namespace actor2 { +class ActorSignals { + public: + ActorSignals() = default; + uint32 raw() const { + return raw_; + } + bool empty() const { + return raw_ == 0; + } + bool has_signal(uint32 signal) const { + return (raw_ & (1u << signal)) != 0; + } + void add_signal(uint32 signal) { + raw_ |= (1u << signal); + } + void add_signals(ActorSignals signals) { + raw_ |= signals.raw(); + } + void clear_signal(uint32 signal) { + raw_ &= ~(1u << signal); + } + uint32 first_signal() { + if (!raw_) { + return 0; + } +#if TD_MSVC + int res = 0; + int bit = 1; + while ((raw_ & bit) == 0) { + res++; + bit <<= 1; + } + return res; +#else + return __builtin_ctz(raw_); +#endif + } + enum Signal : uint32 { + // Signals in order of priority + Wakeup = 1, + Alarm = 2, + Kill = 3, // immediate kill + Io = 4, // move to io thread + Cpu = 5, // move to cpu thread + StartUp = 6, + TearDown = 7, + // Two signals for mpmc queue logic + // + // PopSignal is set after actor is popped from queue + // When processed it should set InQueue and Pause flags to false. + // + // MessagesSignal is set after new messages was added to actor + // If owner of actor wish to delay message handling, she should set InQueue flag to true and + // add actor into mpmc queue. + Pop = 8, // got popped from queue + Message = 9, // got new message + }; + + static ActorSignals one(uint32 signal) { + ActorSignals res; + res.add_signal(signal); + return res; + } + + private: + uint32 raw_{0}; + friend class ActorState; + explicit ActorSignals(uint32 raw) : raw_(raw) { + } +}; +} // namespace actor2 +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl2/ActorState.h b/libs/tdlib/td/tdactor/td/actor/impl2/ActorState.h new file mode 100644 index 0000000000..02ead6bcf6 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl2/ActorState.h @@ -0,0 +1,166 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl2/ActorSignals.h" +#include "td/actor/impl2/SchedulerId.h" + +#include "td/utils/common.h" + +#include <atomic> + +namespace td { +namespace actor2 { +class ActorState { + public: + class Flags { + public: + Flags() = default; + uint32 raw() const { + return raw_; + } + uint32 &raw_ref() { + return raw_; + } + SchedulerId get_scheduler_id() const { + return SchedulerId{static_cast<uint8>(raw_ & SchedulerMask)}; + } + void set_scheduler_id(SchedulerId id) { + raw_ = (raw_ & ~SchedulerMask) | id.value(); + } + + bool is_shared() const { + return check_flag(SharedFlag); + } + void set_shared(bool shared) { + set_flag(SharedFlag, shared); + } + + bool is_locked() const { + return check_flag(LockFlag); + } + void set_locked(bool locked) { + set_flag(LockFlag, locked); + } + + bool is_migrate() const { + return check_flag(MigrateFlag); + } + void set_migrate(bool migrate) { + set_flag(MigrateFlag, migrate); + } + + bool is_pause() const { + return check_flag(PauseFlag); + } + void set_pause(bool pause) { + set_flag(PauseFlag, pause); + } + + bool is_closed() const { + return check_flag(ClosedFlag); + } + void set_closed(bool closed) { + set_flag(ClosedFlag, closed); + } + + bool is_in_queue() const { + return check_flag(InQueueFlag); + } + void set_in_queue(bool in_queue) { + set_flag(InQueueFlag, in_queue); + } + + bool has_signals() const { + return check_flag(SignalMask); + } + void clear_signals() { + set_flag(SignalMask, false); + } + void set_signals(ActorSignals signals) { + raw_ = (raw_ & ~SignalMask) | (signals.raw() << SignalOffset); + } + void add_signals(ActorSignals signals) { + raw_ = raw_ | (signals.raw() << SignalOffset); + } + ActorSignals get_signals() const { + return ActorSignals{(raw_ & SignalMask) >> SignalOffset}; + } + + private: + uint32 raw_{0}; + + friend class ActorState; + Flags(uint32 raw) : raw_(raw) { + } + + bool check_flag(uint32 mask) const { + return (raw_ & mask) != 0; + } + void set_flag(uint32 mask, bool flag) { + raw_ = (raw_ & ~mask) | (flag * mask); + } + }; + + Flags get_flags_unsafe() { + return Flags(state_.load(std::memory_order_relaxed)); + } + void set_flags_unsafe(Flags flags) { + state_.store(flags.raw(), std::memory_order_relaxed); + } + + private: + friend class ActorLocker; + std::atomic<uint32> state_{0}; + enum : uint32 { + SchedulerMask = 255, + + // Actors can be shared or not. + // If actor is shared, than any thread may try to lock it + // If actor is not shared, than it is owned by its scheduler, and only + // its scheduler is allowed to access it + // This flag may NOT change during the lifetime of an actor + SharedFlag = 1 << 9, + + // Only shared actors need lock + // Lock if somebody is going to unlock it eventually. + // For example actor is locked, when some scheduler is executing its mailbox + // Or it is locked when it is in Mpmc queue, so someone will pop it eventually. + LockFlag = 1 << 10, + + // While actor is migrating from one scheduler to another no one is allowed to change it + // Could not be set for shared actors. + MigrateFlag = 1 << 11, + + // While set all messages are delayed + // Dropped from flush_maibox + // PauseFlag => InQueueFlag + PauseFlag = 1 << 12, + + ClosedFlag = 1 << 13, + + InQueueFlag = 1 << 14, + + // Signals + SignalOffset = 15, + Signal = 1 << SignalOffset, + WakeupSignalFlag = Signal << ActorSignals::Wakeup, + AlarmSignalFlag = Signal << ActorSignals::Alarm, + KillSignalFlag = Signal << ActorSignals::Kill, // immediate kill + IoSignalFlag = Signal << ActorSignals::Io, // move to io thread + CpuSignalFlag = Signal << ActorSignals::Cpu, // move to cpu thread + StartUpSignalFlag = Signal << ActorSignals::StartUp, + TearDownSignalFlag = Signal << ActorSignals::TearDown, + MessageSignalFlag = Signal << ActorSignals::Message, + PopSignalFlag = Signal << ActorSignals::Pop, + + SignalMask = WakeupSignalFlag | AlarmSignalFlag | KillSignalFlag | IoSignalFlag | CpuSignalFlag | + StartUpSignalFlag | TearDownSignalFlag | MessageSignalFlag | PopSignalFlag + }; +}; +} // namespace actor2 +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp b/libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp new file mode 100644 index 0000000000..720bf6bc4f --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.cpp @@ -0,0 +1,11 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/actor/impl2/Scheduler.h" + +namespace td { +namespace actor2 {} +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.h b/libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.h new file mode 100644 index 0000000000..9d5783b165 --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl2/Scheduler.h @@ -0,0 +1,1508 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/impl2/ActorLocker.h" +#include "td/actor/impl2/SchedulerId.h" + +#include "td/utils/Closure.h" +#include "td/utils/common.h" +#include "td/utils/format.h" +#include "td/utils/Heap.h" +#include "td/utils/List.h" +#include "td/utils/logging.h" +#include "td/utils/MpmcQueue.h" +#include "td/utils/MpmcWaiter.h" +#include "td/utils/MpscLinkQueue.h" +#include "td/utils/MpscPollableQueue.h" +#include "td/utils/port/Fd.h" +#include "td/utils/port/Poll.h" +#include "td/utils/port/thread.h" +#include "td/utils/port/thread_local.h" +#include "td/utils/ScopeGuard.h" +#include "td/utils/SharedObjectPool.h" +#include "td/utils/Slice.h" +#include "td/utils/Time.h" +#include "td/utils/type_traits.h" + +#include <atomic> +#include <condition_variable> +#include <limits> +#include <memory> +#include <mutex> +#include <type_traits> +#include <utility> + +namespace td { +namespace actor2 { +class Actor; + +template <class Impl> +class Context { + public: + static Impl *get() { + return context_; + } + class Guard { + public: + explicit Guard(Impl *new_context) { + old_context_ = context_; + context_ = new_context; + } + ~Guard() { + context_ = old_context_; + } + Guard(const Guard &) = delete; + Guard &operator=(const Guard &) = delete; + Guard(Guard &&) = delete; + Guard &operator=(Guard &&) = delete; + + private: + Impl *old_context_; + }; + + private: + static TD_THREAD_LOCAL Impl *context_; +}; + +template <class Impl> +TD_THREAD_LOCAL Impl *Context<Impl>::context_; + +enum : uint64 { EmptyLinkToken = std::numeric_limits<uint64>::max() }; + +class ActorExecuteContext : public Context<ActorExecuteContext> { + public: + explicit ActorExecuteContext(Actor *actor, Timestamp alarm_timestamp = Timestamp::never()) + : actor_(actor), alarm_timestamp_(alarm_timestamp) { + } + Actor &actor() const { + CHECK(actor_); + return *actor_; + } + bool has_flags() const { + return flags_ != 0; + } + void set_stop() { + flags_ |= 1 << Stop; + } + bool get_stop() const { + return (flags_ & (1 << Stop)) != 0; + } + void set_pause() { + flags_ |= 1 << Pause; + } + bool get_pause() const { + return (flags_ & (1 << Pause)) != 0; + } + void clear_actor() { + actor_ = nullptr; + } + void set_link_token(uint64 link_token) { + link_token_ = link_token; + } + uint64 get_link_token() const { + return link_token_; + } + Timestamp &alarm_timestamp() { + flags_ |= 1 << Alarm; + return alarm_timestamp_; + } + bool get_alarm_flag() const { + return (flags_ & (1 << Alarm)) != 0; + } + Timestamp get_alarm_timestamp() const { + return alarm_timestamp_; + } + + private: + Actor *actor_; + uint32 flags_{0}; + uint64 link_token_{EmptyLinkToken}; + Timestamp alarm_timestamp_; + enum { Stop, Pause, Alarm }; +}; + +class ActorMessageImpl : private MpscLinkQueueImpl::Node { + public: + ActorMessageImpl() = default; + ActorMessageImpl(const ActorMessageImpl &) = delete; + ActorMessageImpl &operator=(const ActorMessageImpl &) = delete; + ActorMessageImpl(ActorMessageImpl &&other) = delete; + ActorMessageImpl &operator=(ActorMessageImpl &&other) = delete; + virtual ~ActorMessageImpl() = default; + virtual void run() = 0; + //virtual void run_anonymous() = 0; + + // ActorMessage <--> MpscLintQueue::Node + // Each actor's mailbox will be a queue + static ActorMessageImpl *from_mpsc_link_queue_node(MpscLinkQueueImpl::Node *node) { + return static_cast<ActorMessageImpl *>(node); + } + MpscLinkQueueImpl::Node *to_mpsc_link_queue_node() { + return static_cast<MpscLinkQueueImpl::Node *>(this); + } + + uint64 link_token_{EmptyLinkToken}; + bool is_big_{false}; +}; + +class ActorMessage { + public: + ActorMessage() = default; + explicit ActorMessage(std::unique_ptr<ActorMessageImpl> impl) : impl_(std::move(impl)) { + } + void run() { + CHECK(impl_); + impl_->run(); + } + explicit operator bool() { + return bool(impl_); + } + friend class ActorMailbox; + + void set_link_token(uint64 link_token) { + impl_->link_token_ = link_token; + } + uint64 get_link_token() const { + return impl_->link_token_; + } + bool is_big() const { + return impl_->is_big_; + } + void set_big() { + impl_->is_big_ = true; + } + + private: + std::unique_ptr<ActorMessageImpl> impl_; + + template <class T> + friend class td::MpscLinkQueue; + + static ActorMessage from_mpsc_link_queue_node(MpscLinkQueueImpl::Node *node) { + return ActorMessage(std::unique_ptr<ActorMessageImpl>(ActorMessageImpl::from_mpsc_link_queue_node(node))); + } + MpscLinkQueueImpl::Node *to_mpsc_link_queue_node() { + return impl_.release()->to_mpsc_link_queue_node(); + } +}; + +class ActorMailbox { + public: + ActorMailbox() = default; + ActorMailbox(const ActorMailbox &) = delete; + ActorMailbox &operator=(const ActorMailbox &) = delete; + ActorMailbox(ActorMailbox &&other) = delete; + ActorMailbox &operator=(ActorMailbox &&other) = delete; + ~ActorMailbox() { + pop_all(); + while (reader_.read()) { + // skip + } + } + class Reader; + void push(ActorMessage message) { + queue_.push(std::move(message)); + } + void push_unsafe(ActorMessage message) { + queue_.push_unsafe(std::move(message)); + } + + td::MpscLinkQueue<ActorMessage>::Reader &reader() { + return reader_; + } + + void pop_all() { + queue_.pop_all(reader_); + } + void pop_all_unsafe() { + queue_.pop_all_unsafe(reader_); + } + + private: + td::MpscLinkQueue<ActorMessage> queue_; + td::MpscLinkQueue<ActorMessage>::Reader reader_; +}; + +class ActorInfo + : private HeapNode + , private ListNode { + public: + ActorInfo(std::unique_ptr<Actor> actor, ActorState::Flags state_flags, Slice name) + : actor_(std::move(actor)), name_(name.begin(), name.size()) { + state_.set_flags_unsafe(state_flags); + } + + bool has_actor() const { + return bool(actor_); + } + Actor &actor() { + CHECK(has_actor()); + return *actor_; + } + Actor *actor_ptr() const { + return actor_.get(); + } + void destroy_actor() { + actor_.reset(); + } + ActorState &state() { + return state_; + } + ActorMailbox &mailbox() { + return mailbox_; + } + CSlice get_name() const { + return name_; + } + + HeapNode *as_heap_node() { + return this; + } + static ActorInfo *from_heap_node(HeapNode *node) { + return static_cast<ActorInfo *>(node); + } + + Timestamp &alarm_timestamp() { + return alarm_timestamp_; + } + + private: + std::unique_ptr<Actor> actor_; + ActorState state_; + ActorMailbox mailbox_; + std::string name_; + Timestamp alarm_timestamp_; +}; + +using ActorInfoPtr = SharedObjectPool<ActorInfo>::Ptr; + +class Actor { + public: + Actor() = default; + Actor(const Actor &) = delete; + Actor &operator=(const Actor &) = delete; + Actor(Actor &&other) = delete; + Actor &operator=(Actor &&other) = delete; + virtual ~Actor() = default; + + void set_actor_info_ptr(ActorInfoPtr actor_info_ptr) { + actor_info_ptr_ = std::move(actor_info_ptr); + } + ActorInfoPtr get_actor_info_ptr() { + return actor_info_ptr_; + } + + protected: + // Signal handlers + virtual void start_up(); // StartUp signal handler + virtual void tear_down(); // TearDown signal handler (or Kill) + virtual void hang_up(); // HangUp signal handler + virtual void wake_up(); // WakeUp signal handler + virtual void alarm(); // Alarm signal handler + + friend class ActorMessageHangup; + + // Event handlers + //virtual void hangup_shared(); + // TODO: raw event? + + virtual void loop(); // default handler + + // Useful functions + void yield(); // send wakeup signal to itself + void stop(); // send Kill signal to itself + Timestamp &alarm_timestamp() { + return ActorExecuteContext::get()->alarm_timestamp(); + } + Timestamp get_alarm_timestamp() { + return ActorExecuteContext::get()->get_alarm_timestamp(); + } + + CSlice get_name() { + return actor_info_ptr_->get_name(); + } + + // Inteface to scheduler + // Query will be just passed to current scheduler + // Timeout functions + //bool has_timeout() const; + //void set_timeout_in(double timeout_in); + //void set_timeout_at(double timeout_at); + //void cancel_timeout(); + //uint64 get_link_token(); // get current request's link_token + //set context that will be inherited by all childrens + //void set_context(std::shared_ptr<ActorContext> context); + + //ActorShared<> actor_shared(); // ActorShared to itself + //template <class SelfT> + //ActorShared<SelfT> actor_shared(SelfT *self, uint64 id = static_cast<uint64>(-1)); // ActorShared with type + + // Create EventFull to itself + //template <class FuncT, class... ArgsT> + //auto self_closure(FuncT &&func, ArgsT &&... args); + //template <class SelfT, class FuncT, class... ArgsT> + //auto self_closure(SelfT *self, FuncT &&func, ArgsT &&... args); + //template <class LambdaT> + //auto self_lambda(LambdaT &&lambda); + + //void do_stop(); // process Kill signal immediately + + private: + friend class ActorExecutor; + ActorInfoPtr actor_info_ptr_; +}; +// Signal handlers +inline void Actor::start_up() { + yield(); +} +inline void Actor::tear_down() { + // noop +} +inline void Actor::hang_up() { + stop(); +} +inline void Actor::wake_up() { + loop(); +} +inline void Actor::alarm() { + loop(); +} + +inline void Actor::loop() { + // noop +} + +// Useful functions +inline void Actor::yield() { + // TODO +} +inline void Actor::stop() { + ActorExecuteContext::get()->set_stop(); +} + +class ActorInfoCreator { + public: + class Options { + public: + Options() = default; + + Options &with_name(Slice new_name) { + name = new_name; + return *this; + } + + Options &on_scheduler(SchedulerId new_scheduler_id) { + scheduler_id = new_scheduler_id; + return *this; + } + bool has_scheduler() const { + return scheduler_id.is_valid(); + } + Options &with_poll() { + is_shared = false; + return *this; + } + + private: + friend class ActorInfoCreator; + Slice name; + SchedulerId scheduler_id; + bool is_shared{true}; + bool in_queue{true}; + //TODO: rename + }; + + //Create unlocked actor. One must send StartUp signal immediately. + ActorInfoPtr create(std::unique_ptr<Actor> actor, const Options &args) { + ActorState::Flags flags; + flags.set_scheduler_id(args.scheduler_id); + flags.set_shared(args.is_shared); + flags.set_in_queue(args.in_queue); + flags.set_signals(ActorSignals::one(ActorSignals::StartUp)); + + auto actor_info_ptr = pool_.alloc(std::move(actor), flags, args.name); + actor_info_ptr->actor().set_actor_info_ptr(actor_info_ptr); + return actor_info_ptr; + } + + ActorInfoCreator() = default; + ActorInfoCreator(const ActorInfoCreator &) = delete; + ActorInfoCreator &operator=(const ActorInfoCreator &) = delete; + ActorInfoCreator(ActorInfoCreator &&other) = delete; + ActorInfoCreator &operator=(ActorInfoCreator &&other) = delete; + ~ActorInfoCreator() { + pool_.for_each([](auto &actor_info) { actor_info.destroy_actor(); }); + } + + private: + SharedObjectPool<ActorInfo> pool_; +}; + +using ActorOptions = ActorInfoCreator::Options; + +class SchedulerDispatcher { + public: + virtual SchedulerId get_scheduler_id() const = 0; + virtual void add_to_queue(ActorInfoPtr actor_info_ptr, SchedulerId scheduler_id, bool need_poll) = 0; + virtual void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) = 0; + + SchedulerDispatcher() = default; + SchedulerDispatcher(const SchedulerDispatcher &) = delete; + SchedulerDispatcher &operator=(const SchedulerDispatcher &) = delete; + SchedulerDispatcher(SchedulerDispatcher &&other) = delete; + SchedulerDispatcher &operator=(SchedulerDispatcher &&other) = delete; + virtual ~SchedulerDispatcher() = default; +}; + +class ActorExecutor { + public: + struct Options { + Options &with_from_queue() { + from_queue = true; + return *this; + } + Options &with_has_poll(bool new_has_poll) { + this->has_poll = new_has_poll; + return *this; + } + bool from_queue{false}; + bool has_poll{false}; + }; + ActorExecutor(ActorInfo &actor_info, SchedulerDispatcher &dispatcher, Options options) + : actor_info_(actor_info), dispatcher_(dispatcher), options_(options) { + //LOG(ERROR) << "START " << actor_info_.get_name() << " " << tag("from_queue", from_queue); + start(); + } + ActorExecutor(const ActorExecutor &) = delete; + ActorExecutor &operator=(const ActorExecutor &) = delete; + ActorExecutor(ActorExecutor &&other) = delete; + ActorExecutor &operator=(ActorExecutor &&other) = delete; + ~ActorExecutor() { + //LOG(ERROR) << "FINISH " << actor_info_.get_name() << " " << tag("own_lock", actor_locker_.own_lock()); + finish(); + } + + // our best guess if actor is closed or not + bool can_send() { + return !flags().is_closed(); + } + + bool can_send_immediate() { + return actor_locker_.own_lock() && !actor_execute_context_.has_flags() && actor_locker_.can_execute(); + } + + template <class F> + void send_immediate(F &&f, uint64 link_token) { + CHECK(can_send_immediate()); + if (!can_send()) { + return; + } + actor_execute_context_.set_link_token(link_token); + f(); + } + void send_immediate(ActorMessage message) { + CHECK(can_send_immediate()); + if (message.is_big()) { + actor_info_.mailbox().reader().delay(std::move(message)); + pending_signals_.add_signal(ActorSignals::Message); + actor_execute_context_.set_pause(); + return; + } + actor_execute_context_.set_link_token(message.get_link_token()); + message.run(); + } + void send_immediate(ActorSignals signals) { + CHECK(can_send_immediate()); + while (flush_one_signal(signals) && can_send_immediate()) { + } + pending_signals_.add_signals(signals); + } + + void send(ActorMessage message) { + if (!can_send()) { + return; + } + if (can_send_immediate()) { + return send_immediate(std::move(message)); + } + actor_info_.mailbox().push(std::move(message)); + pending_signals_.add_signal(ActorSignals::Message); + } + + void send(ActorSignals signals) { + if (!can_send()) { + return; + } + + pending_signals_.add_signals(signals); + } + + private: + ActorInfo &actor_info_; + SchedulerDispatcher &dispatcher_; + Options options_; + ActorLocker actor_locker_{ + &actor_info_.state(), + ActorLocker::Options().with_can_execute_paused(options_.from_queue).with_is_shared(!options_.has_poll)}; + + ActorExecuteContext actor_execute_context_{actor_info_.actor_ptr(), actor_info_.alarm_timestamp()}; + ActorExecuteContext::Guard guard{&actor_execute_context_}; + + ActorState::Flags flags_; + ActorSignals pending_signals_; + + ActorState::Flags &flags() { + return flags_; + } + + void start() { + if (!can_send()) { + return; + } + + ActorSignals signals; + SCOPE_EXIT { + pending_signals_.add_signals(signals); + }; + + if (options_.from_queue) { + signals.add_signal(ActorSignals::Pop); + } + + actor_locker_.try_lock(); + flags_ = actor_locker_.flags(); + + if (!actor_locker_.own_lock()) { + return; + } + + if (options_.from_queue) { + flags().set_pause(false); + } + if (!actor_locker_.can_execute()) { + CHECK(!options_.from_queue); + return; + } + + signals.add_signals(flags().get_signals()); + actor_info_.mailbox().pop_all(); + + while (!actor_execute_context_.has_flags() && flush_one(signals)) { + } + } + + void finish() { + if (!actor_locker_.own_lock()) { + if (!pending_signals_.empty() && actor_locker_.add_signals(pending_signals_)) { + flags_ = actor_locker_.flags(); + } else { + return; + } + } + CHECK(actor_locker_.own_lock()); + + if (actor_execute_context_.has_flags()) { + if (actor_execute_context_.get_stop()) { + if (actor_info_.alarm_timestamp()) { + dispatcher_.set_alarm_timestamp(actor_info_.actor().get_actor_info_ptr(), Timestamp::never()); + } + flags_.set_closed(true); + actor_info_.actor().tear_down(); + actor_info_.destroy_actor(); + return; + } + if (actor_execute_context_.get_pause()) { + flags_.set_pause(true); + } + if (actor_execute_context_.get_alarm_flag()) { + auto old_timestamp = actor_info_.alarm_timestamp(); + auto new_timestamp = actor_execute_context_.get_alarm_timestamp(); + if (!(old_timestamp == new_timestamp)) { + actor_info_.alarm_timestamp() = new_timestamp; + dispatcher_.set_alarm_timestamp(actor_info_.actor().get_actor_info_ptr(), new_timestamp); + } + } + } + flags_.set_signals(pending_signals_); + + bool add_to_queue = false; + while (true) { + // Drop InQueue flag if has pop signal + // Can't delay this signal + auto signals = flags().get_signals(); + if (signals.has_signal(ActorSignals::Pop)) { + signals.clear_signal(ActorSignals::Pop); + flags().set_signals(signals); + flags().set_in_queue(false); + } + + if (flags().has_signals() && !flags().is_in_queue()) { + add_to_queue = true; + flags().set_in_queue(true); + } + if (actor_locker_.try_unlock(flags())) { + if (add_to_queue) { + dispatcher_.add_to_queue(actor_info_.actor().get_actor_info_ptr(), flags().get_scheduler_id(), + !flags().is_shared()); + } + break; + } + flags_ = actor_locker_.flags(); + } + } + + bool flush_one(ActorSignals &signals) { + return flush_one_signal(signals) || flush_one_message(); + } + + bool flush_one_signal(ActorSignals &signals) { + auto signal = signals.first_signal(); + if (!signal) { + return false; + } + switch (signal) { + case ActorSignals::Wakeup: + actor_info_.actor().wake_up(); + break; + case ActorSignals::Alarm: + if (actor_execute_context_.get_alarm_timestamp().is_in_past()) { + actor_execute_context_.alarm_timestamp() = Timestamp::never(); + actor_info_.actor().alarm(); + } + break; + case ActorSignals::Kill: + actor_execute_context_.set_stop(); + break; + case ActorSignals::StartUp: + actor_info_.actor().start_up(); + break; + case ActorSignals::TearDown: + actor_info_.actor().tear_down(); + break; + case ActorSignals::Pop: + flags().set_in_queue(false); + break; + + case ActorSignals::Message: + break; + case ActorSignals::Io: + case ActorSignals::Cpu: + LOG(FATAL) << "TODO"; + default: + UNREACHABLE(); + } + signals.clear_signal(signal); + return true; + } + + bool flush_one_message() { + auto message = actor_info_.mailbox().reader().read(); + if (!message) { + return false; + } + if (message.is_big() && !options_.from_queue) { + actor_info_.mailbox().reader().delay(std::move(message)); + pending_signals_.add_signal(ActorSignals::Message); + actor_execute_context_.set_pause(); + return false; + } + + actor_execute_context_.set_link_token(message.get_link_token()); + message.run(); + return true; + } +}; + +using SchedulerMessage = ActorInfoPtr; + +struct WorkerInfo { + enum class Type { Io, Cpu } type{Type::Io}; + WorkerInfo() = default; + explicit WorkerInfo(Type type) : type(type) { + } + ActorInfoCreator actor_info_creator; +}; + +struct SchedulerInfo { + SchedulerId id; + // will be read by all workers is any thread + std::unique_ptr<MpmcQueue<SchedulerMessage>> cpu_queue; + std::unique_ptr<MpmcWaiter> cpu_queue_waiter; + // only scheduler itself may read from io_queue_ + std::unique_ptr<MpscPollableQueue<SchedulerMessage>> io_queue; + size_t cpu_threads_count{0}; + + std::unique_ptr<WorkerInfo> io_worker; + std::vector<std::unique_ptr<WorkerInfo>> cpu_workers; +}; + +struct SchedulerGroupInfo { + explicit SchedulerGroupInfo(size_t n) : schedulers(n) { + } + std::atomic<bool> is_stop_requested{false}; + + int active_scheduler_count{0}; + std::mutex active_scheduler_count_mutex; + std::condition_variable active_scheduler_count_condition_variable; + + std::vector<SchedulerInfo> schedulers; +}; + +class SchedulerContext + : public Context<SchedulerContext> + , public SchedulerDispatcher { + public: + // DispatcherInterface + SchedulerDispatcher &dispatcher() { + return *this; + } + + // ActorCreator Interface + virtual ActorInfoCreator &get_actor_info_creator() = 0; + + // Poll interface + virtual bool has_poll() = 0; + virtual Poll &get_poll() = 0; + + // Timeout interface + virtual bool has_heap() = 0; + virtual KHeap<double> &get_heap() = 0; + + // Stop all schedulers + virtual bool is_stop_requested() = 0; + virtual void stop() = 0; +}; + +#if !TD_THREAD_UNSUPPORTED +class Scheduler { + public: + Scheduler(std::shared_ptr<SchedulerGroupInfo> scheduler_group_info, SchedulerId id, size_t cpu_threads_count) + : scheduler_group_info_(std::move(scheduler_group_info)), cpu_threads_(cpu_threads_count) { + scheduler_group_info_->active_scheduler_count++; + info_ = &scheduler_group_info_->schedulers.at(id.value()); + info_->id = id; + if (cpu_threads_count != 0) { + info_->cpu_threads_count = cpu_threads_count; + info_->cpu_queue = std::make_unique<MpmcQueue<SchedulerMessage>>(1024, max_thread_count()); + info_->cpu_queue_waiter = std::make_unique<MpmcWaiter>(); + } + info_->io_queue = std::make_unique<MpscPollableQueue<SchedulerMessage>>(); + info_->io_queue->init(); + + info_->cpu_workers.resize(cpu_threads_count); + for (auto &worker : info_->cpu_workers) { + worker = std::make_unique<WorkerInfo>(WorkerInfo::Type::Cpu); + } + info_->io_worker = std::make_unique<WorkerInfo>(WorkerInfo::Type::Io); + + poll_.init(); + io_worker_ = std::make_unique<IoWorker>(*info_->io_queue); + } + + Scheduler(const Scheduler &) = delete; + Scheduler &operator=(const Scheduler &) = delete; + Scheduler(Scheduler &&other) = delete; + Scheduler &operator=(Scheduler &&other) = delete; + ~Scheduler() { + // should stop + stop(); + do_stop(); + } + + void start() { + for (size_t i = 0; i < cpu_threads_.size(); i++) { + cpu_threads_[i] = td::thread([this, i] { + this->run_in_context_impl(*this->info_->cpu_workers[i], + [this] { CpuWorker(*info_->cpu_queue, *info_->cpu_queue_waiter).run(); }); + }); + } + this->run_in_context([this] { this->io_worker_->start_up(); }); + } + + template <class F> + void run_in_context(F &&f) { + run_in_context_impl(*info_->io_worker, std::forward<F>(f)); + } + + bool run(double timeout) { + bool res; + run_in_context_impl(*info_->io_worker, [this, timeout, &res] { + if (SchedulerContext::get()->is_stop_requested()) { + res = false; + } else { + res = io_worker_->run_once(timeout); + } + if (!res) { + io_worker_->tear_down(); + } + }); + if (!res) { + do_stop(); + } + return res; + } + + // Just syntactic sugar + void stop() { + run_in_context([] { SchedulerContext::get()->stop(); }); + } + + SchedulerId get_scheduler_id() const { + return info_->id; + } + + private: + std::shared_ptr<SchedulerGroupInfo> scheduler_group_info_; + SchedulerInfo *info_; + std::vector<td::thread> cpu_threads_; + bool is_stopped_{false}; + Poll poll_; + KHeap<double> heap_; + class IoWorker; + std::unique_ptr<IoWorker> io_worker_; + + class SchedulerContextImpl : public SchedulerContext { + public: + SchedulerContextImpl(WorkerInfo *worker, SchedulerInfo *scheduler, SchedulerGroupInfo *scheduler_group, Poll *poll, + KHeap<double> *heap) + : worker_(worker), scheduler_(scheduler), scheduler_group_(scheduler_group), poll_(poll), heap_(heap) { + } + + SchedulerId get_scheduler_id() const override { + return scheduler()->id; + } + void add_to_queue(ActorInfoPtr actor_info_ptr, SchedulerId scheduler_id, bool need_poll) override { + if (!scheduler_id.is_valid()) { + scheduler_id = scheduler()->id; + } + auto &info = scheduler_group()->schedulers.at(scheduler_id.value()); + if (need_poll) { + info.io_queue->writer_put(std::move(actor_info_ptr)); + } else { + info.cpu_queue->push(std::move(actor_info_ptr), get_thread_id()); + info.cpu_queue_waiter->notify(); + } + } + + ActorInfoCreator &get_actor_info_creator() override { + return worker()->actor_info_creator; + } + + bool has_poll() override { + return poll_ != nullptr; + } + Poll &get_poll() override { + CHECK(has_poll()); + return *poll_; + } + + bool has_heap() override { + return heap_ != nullptr; + } + KHeap<double> &get_heap() override { + CHECK(has_heap()); + return *heap_; + } + + void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) override { + // we are in PollWorker + CHECK(has_heap()); + auto &heap = get_heap(); + auto *heap_node = actor_info_ptr->as_heap_node(); + if (timestamp) { + if (heap_node->in_heap()) { + heap.fix(timestamp.at(), heap_node); + } else { + heap.insert(timestamp.at(), heap_node); + } + } else { + if (heap_node->in_heap()) { + heap.erase(heap_node); + } + } + + // TODO: do something in plain worker + } + + bool is_stop_requested() override { + return scheduler_group()->is_stop_requested; + } + + void stop() override { + bool expect_false = false; + // Trying to set close_flag_ to true with CAS + auto &group = *scheduler_group(); + if (!group.is_stop_requested.compare_exchange_strong(expect_false, true)) { + return; + } + + // Notify all workers of all schedulers + for (auto &scheduler_info : group.schedulers) { + scheduler_info.io_queue->writer_put({}); + for (size_t i = 0; i < scheduler_info.cpu_threads_count; i++) { + scheduler_info.cpu_queue->push({}, get_thread_id()); + scheduler_info.cpu_queue_waiter->notify(); + } + } + } + + private: + WorkerInfo *worker() const { + return worker_; + } + SchedulerInfo *scheduler() const { + return scheduler_; + } + SchedulerGroupInfo *scheduler_group() const { + return scheduler_group_; + } + + WorkerInfo *worker_; + SchedulerInfo *scheduler_; + SchedulerGroupInfo *scheduler_group_; + Poll *poll_; + + KHeap<double> *heap_; + }; + + template <class F> + void run_in_context_impl(WorkerInfo &worker_info, F &&f) { + bool is_io_worker = worker_info.type == WorkerInfo::Type::Io; + SchedulerContextImpl context(&worker_info, info_, scheduler_group_info_.get(), is_io_worker ? &poll_ : nullptr, + is_io_worker ? &heap_ : nullptr); + SchedulerContext::Guard guard(&context); + f(); + } + + class CpuWorker { + public: + CpuWorker(MpmcQueue<SchedulerMessage> &queue, MpmcWaiter &waiter) : queue_(queue), waiter_(waiter) { + } + void run() { + auto thread_id = get_thread_id(); + auto &dispatcher = SchedulerContext::get()->dispatcher(); + + int yields = 0; + while (true) { + SchedulerMessage message; + if (queue_.try_pop(message, thread_id)) { + if (!message) { + return; + } + ActorExecutor executor(*message, dispatcher, ActorExecutor::Options().with_from_queue()); + yields = waiter_.stop_wait(yields, thread_id); + } else { + yields = waiter_.wait(yields, thread_id); + } + } + } + + private: + MpmcQueue<SchedulerMessage> &queue_; + MpmcWaiter &waiter_; + }; + + class IoWorker { + public: + explicit IoWorker(MpscPollableQueue<SchedulerMessage> &queue) : queue_(queue) { + } + + void start_up() { + auto &poll = SchedulerContext::get()->get_poll(); + poll.subscribe(queue_.reader_get_event_fd().get_fd(), Fd::Flag::Read); + } + void tear_down() { + auto &poll = SchedulerContext::get()->get_poll(); + poll.unsubscribe(queue_.reader_get_event_fd().get_fd()); + } + + bool run_once(double timeout) { + auto &dispatcher = SchedulerContext::get()->dispatcher(); + auto &poll = SchedulerContext::get()->get_poll(); + auto &heap = SchedulerContext::get()->get_heap(); + + auto now = Time::now(); // update Time::now_cached() + while (!heap.empty() && heap.top_key() <= now) { + auto *heap_node = heap.pop(); + auto *actor_info = ActorInfo::from_heap_node(heap_node); + + ActorExecutor executor(*actor_info, dispatcher, ActorExecutor::Options().with_has_poll(true)); + if (executor.can_send_immediate()) { + executor.send_immediate(ActorSignals::one(ActorSignals::Alarm)); + } else { + executor.send(ActorSignals::one(ActorSignals::Alarm)); + } + } + + const int size = queue_.reader_wait_nonblock(); + for (int i = 0; i < size; i++) { + auto message = queue_.reader_get_unsafe(); + if (!message) { + return false; + } + ActorExecutor executor(*message, dispatcher, ActorExecutor::Options().with_from_queue().with_has_poll(true)); + } + queue_.reader_flush(); + + bool can_sleep = size == 0 && timeout != 0; + int32 timeout_ms = 0; + if (can_sleep) { + auto wakeup_timestamp = Timestamp::in(timeout); + if (!heap.empty()) { + wakeup_timestamp.relax(Timestamp::at(heap.top_key())); + } + timeout_ms = static_cast<int>(wakeup_timestamp.in() * 1000) + 1; + if (timeout_ms < 0) { + timeout_ms = 0; + } + //const int thirty_seconds = 30 * 1000; + //if (timeout_ms > thirty_seconds) { + //timeout_ms = thirty_seconds; + //} + } + poll.run(timeout_ms); + return true; + } + + private: + MpscPollableQueue<SchedulerMessage> &queue_; + }; + + void do_stop() { + if (is_stopped_) { + return; + } + // wait other threads to finish + for (auto &thread : cpu_threads_) { + thread.join(); + } + // Can't do anything else, other schedulers may send queries to this one. + // Must wait till every scheduler is stopped first.. + is_stopped_ = true; + + io_worker_.reset(); + poll_.clear(); + + std::unique_lock<std::mutex> lock(scheduler_group_info_->active_scheduler_count_mutex); + scheduler_group_info_->active_scheduler_count--; + scheduler_group_info_->active_scheduler_count_condition_variable.notify_all(); + } + + public: + static void close_scheduler_group(SchedulerGroupInfo &group_info) { + LOG(ERROR) << "close scheduler group"; + // Cannot close scheduler group before somebody asked to stop them + CHECK(group_info.is_stop_requested); + { + std::unique_lock<std::mutex> lock(group_info.active_scheduler_count_mutex); + group_info.active_scheduler_count_condition_variable.wait(lock, + [&] { return group_info.active_scheduler_count == 0; }); + } + + // Drain all queues + // Just to destroy all elements should be ok. + for (auto &scheduler_info : group_info.schedulers) { + // Drain io queue + auto &io_queue = *scheduler_info.io_queue; + while (true) { + int n = io_queue.reader_wait_nonblock(); + if (n == 0) { + break; + } + while (n-- > 0) { + auto message = io_queue.reader_get_unsafe(); + // message's destructor is called + } + } + scheduler_info.io_queue.reset(); + + // Drain cpu queue + auto &cpu_queue = *scheduler_info.cpu_queue; + while (true) { + SchedulerMessage message; + if (!cpu_queue.try_pop(message, get_thread_id())) { + break; + } + // message's destructor is called + } + scheduler_info.cpu_queue.reset(); + + // Do not destroy worker infos. run_in_context will crash if they are empty + } + } +}; + +// Actor messages +template <class LambdaT> +class ActorMessageLambda : public ActorMessageImpl { + public: + template <class FromLambdaT> + explicit ActorMessageLambda(FromLambdaT &&lambda) : lambda_(std::forward<FromLambdaT>(lambda)) { + } + void run() override { + lambda_(); + } + + private: + LambdaT lambda_; +}; + +class ActorMessageHangup : public ActorMessageImpl { + public: + void run() override { + ActorExecuteContext::get()->actor().hang_up(); + } +}; + +class ActorMessageCreator { + public: + template <class F> + static ActorMessage lambda(F &&f) { + return ActorMessage(std::make_unique<ActorMessageLambda<F>>(std::forward<F>(f))); + } + + static ActorMessage hangup() { + return ActorMessage(std::make_unique<ActorMessageHangup>()); + } + + // Use faster allocation? +}; + +// SYNTAX SHUGAR +namespace detail { +struct ActorRef { + ActorRef(ActorInfo &actor_info, uint64 link_token = EmptyLinkToken) : actor_info(actor_info), link_token(link_token) { + } + + ActorInfo &actor_info; + uint64 link_token; +}; + +template <class T> +T ¤t_actor() { + return static_cast<T &>(ActorExecuteContext::get()->actor()); +} + +void send_message(ActorInfo &actor_info, ActorMessage message) { + ActorExecutor executor(actor_info, SchedulerContext::get()->dispatcher(), ActorExecutor::Options()); + executor.send(std::move(message)); +} + +void send_message(ActorRef actor_ref, ActorMessage message) { + message.set_link_token(actor_ref.link_token); + send_message(actor_ref.actor_info, std::move(message)); +} +void send_message_later(ActorInfo &actor_info, ActorMessage message) { + ActorExecutor executor(actor_info, SchedulerContext::get()->dispatcher(), ActorExecutor::Options()); + executor.send(std::move(message)); +} + +void send_message_later(ActorRef actor_ref, ActorMessage message) { + message.set_link_token(actor_ref.link_token); + send_message_later(actor_ref.actor_info, std::move(message)); +} + +template <class ExecuteF, class ToMessageF> +void send_immediate(ActorRef actor_ref, ExecuteF &&execute, ToMessageF &&to_message) { + auto &scheduler_context = *SchedulerContext::get(); + ActorExecutor executor(actor_ref.actor_info, scheduler_context.dispatcher(), + ActorExecutor::Options().with_has_poll(scheduler_context.has_poll())); + if (executor.can_send_immediate()) { + return executor.send_immediate(execute, actor_ref.link_token); + } + auto message = to_message(); + message.set_link_token(actor_ref.link_token); + executor.send(std::move(message)); +} + +template <class F> +void send_lambda(ActorRef actor_ref, F &&lambda) { + send_immediate(actor_ref, lambda, [&lambda]() mutable { return ActorMessageCreator::lambda(std::move(lambda)); }); +} +template <class F> +void send_lambda_later(ActorRef actor_ref, F &&lambda) { + send_message_later(actor_ref, ActorMessageCreator::lambda(std::move(lambda))); +} + +template <class ClosureT> +void send_closure_impl(ActorRef actor_ref, ClosureT &&closure) { + using ActorType = typename ClosureT::ActorType; + send_immediate(actor_ref, [&closure]() mutable { closure.run(¤t_actor<ActorType>()); }, + [&closure]() mutable { + return ActorMessageCreator::lambda([closure = to_delayed_closure(std::move(closure))]() mutable { + closure.run(¤t_actor<ActorType>()); + }); + }); +} + +template <class... ArgsT> +void send_closure(ActorRef actor_ref, ArgsT &&... args) { + send_closure_impl(actor_ref, create_immediate_closure(std::forward<ArgsT>(args)...)); +} + +template <class ClosureT> +void send_closure_later_impl(ActorRef actor_ref, ClosureT &&closure) { + using ActorType = typename ClosureT::ActorType; + send_message_later(actor_ref, ActorMessageCreator::lambda([closure = std::move(closure)]() mutable { + closure.run(¤t_actor<ActorType>()); + })); +} + +template <class... ArgsT> +void send_closure_later(ActorRef actor_ref, ArgsT &&... args) { + send_closure_later_impl(actor_ref, create_delayed_closure(std::forward<ArgsT>(args)...)); +} + +void register_actor_info_ptr(ActorInfoPtr actor_info_ptr) { + auto state = actor_info_ptr->state().get_flags_unsafe(); + SchedulerContext::get()->add_to_queue(std::move(actor_info_ptr), state.get_scheduler_id(), !state.is_shared()); +} + +template <class T, class... ArgsT> +ActorInfoPtr create_actor(ActorOptions &options, ArgsT &&... args) { + auto *scheduler_context = SchedulerContext::get(); + if (!options.has_scheduler()) { + options.on_scheduler(scheduler_context->get_scheduler_id()); + } + auto res = + scheduler_context->get_actor_info_creator().create(std::make_unique<T>(std::forward<ArgsT>(args)...), options); + register_actor_info_ptr(res); + return res; +} +} // namespace detail + +// Essentially ActorInfoWeakPtr with Type +template <class ActorType = Actor> +class ActorId { + public: + using ActorT = ActorType; + ActorId() = default; + ActorId(const ActorId &) = default; + ActorId &operator=(const ActorId &) = default; + ActorId(ActorId &&other) = default; + ActorId &operator=(ActorId &&other) = default; + + // allow only conversion from child to parent + template <class ToActorType, class = std::enable_if_t<std::is_base_of<ToActorType, ActorType>::value>> + explicit operator ActorId<ToActorType>() const { + return ActorId<ToActorType>(ptr_); + } + + const ActorInfoPtr &actor_info_ptr() const { + return ptr_; + } + + ActorInfo &actor_info() const { + CHECK(ptr_); + return *ptr_; + } + bool empty() const { + return !ptr_; + } + + template <class... ArgsT> + static ActorId<ActorType> create(ActorOptions &options, ArgsT &&... args) { + return ActorId<ActorType>(detail::create_actor<ActorType>(options, std::forward<ArgsT>(args)...)); + } + + detail::ActorRef as_actor_ref() const { + CHECK(!empty()); + return detail::ActorRef(*actor_info_ptr()); + } + + private: + ActorInfoPtr ptr_; + + explicit ActorId(ActorInfoPtr ptr) : ptr_(std::move(ptr)) { + } + + template <class SelfT> + friend ActorId<SelfT> actor_id(SelfT *self); +}; + +template <class ActorType = Actor> +class ActorOwn { + public: + using ActorT = ActorType; + ActorOwn() = default; + explicit ActorOwn(ActorId<ActorType> id) : id_(std::move(id)) { + } + template <class OtherActorType> + explicit ActorOwn(ActorId<OtherActorType> id) : id_(std::move(id)) { + } + template <class OtherActorType> + explicit ActorOwn(ActorOwn<OtherActorType> &&other) : id_(other.release()) { + } + template <class OtherActorType> + ActorOwn &operator=(ActorOwn<OtherActorType> &&other) { + reset(other.release()); + } + ActorOwn(ActorOwn &&other) : id_(other.release()) { + } + ActorOwn &operator=(ActorOwn &&other) { + reset(other.release()); + } + ActorOwn(const ActorOwn &) = delete; + ActorOwn &operator=(const ActorOwn &) = delete; + ~ActorOwn() { + reset(); + } + + bool empty() const { + return id_.empty(); + } + bool is_alive() const { + return id_.is_alive(); + } + ActorId<ActorType> get() const { + return id_; + } + ActorId<ActorType> release() { + return std::move(id_); + } + void reset(ActorId<ActorType> other = ActorId<ActorType>()) { + static_assert(sizeof(ActorType) > 0, "Can't use ActorOwn with incomplete type"); + hangup(); + id_ = std::move(other); + } + const ActorId<ActorType> *operator->() const { + return &id_; + } + + detail::ActorRef as_actor_ref() const { + CHECK(!empty()); + return detail::ActorRef(*id_.actor_info_ptr(), 0); + } + + private: + ActorId<ActorType> id_; + void hangup() const { + if (empty()) { + return; + } + detail::send_message(as_actor_ref(), ActorMessageCreator::hangup()); + } +}; + +template <class ActorType = Actor> +class ActorShared { + public: + using ActorT = ActorType; + ActorShared() = default; + template <class OtherActorType> + ActorShared(ActorId<OtherActorType> id, uint64 token) : id_(std::move(id)), token_(token) { + CHECK(token_ != 0); + } + template <class OtherActorType> + ActorShared(ActorShared<OtherActorType> &&other) : id_(other.release()), token_(other.token()) { + } + template <class OtherActorType> + ActorShared(ActorOwn<OtherActorType> &&other) : id_(other.release()), token_(other.token()) { + } + template <class OtherActorType> + ActorShared &operator=(ActorShared<OtherActorType> &&other) { + reset(other.release(), other.token()); + } + ActorShared(ActorShared &&other) : id_(other.release()), token_(other.token()) { + } + ActorShared &operator=(ActorShared &&other) { + reset(other.release(), other.token()); + } + ActorShared(const ActorShared &) = delete; + ActorShared &operator=(const ActorShared &) = delete; + ~ActorShared() { + reset(); + } + + uint64 token() const { + return token_; + } + bool empty() const { + return id_.empty(); + } + bool is_alive() const { + return id_.is_alive(); + } + ActorId<ActorType> get() const { + return id_; + } + ActorId<ActorType> release(); + void reset(ActorId<ActorType> other = ActorId<ActorType>(), uint64 link_token = EmptyLinkToken) { + static_assert(sizeof(ActorType) > 0, "Can't use ActorShared with incomplete type"); + hangup(); + id_ = other; + token_ = link_token; + } + const ActorId<ActorType> *operator->() const { + return &id_; + } + + detail::ActorRef as_actor_ref() const { + CHECK(!empty()); + return detail::ActorRef(*id_.actor_info_ptr(), token_); + } + + private: + ActorId<ActorType> id_; + uint64 token_; + + void hangup() const { + } +}; + +// common interface +template <class SelfT> +ActorId<SelfT> actor_id(SelfT *self) { + CHECK(self); + CHECK(static_cast<Actor *>(self) == &ActorExecuteContext::get()->actor()); + return ActorId<SelfT>(ActorExecuteContext::get()->actor().get_actor_info_ptr()); +} + +inline ActorId<> actor_id() { + return actor_id(&ActorExecuteContext::get()->actor()); +} + +template <class T, class... ArgsT> +ActorOwn<T> create_actor(ActorOptions options, ArgsT &&... args) { + return ActorOwn<T>(ActorId<T>::create(options, std::forward<ArgsT>(args)...)); +} + +template <class T, class... ArgsT> +ActorOwn<T> create_actor(Slice name, ArgsT &&... args) { + return ActorOwn<T>(ActorId<T>::create(ActorOptions().with_name(name), std::forward<ArgsT>(args)...)); +} + +template <class ActorIdT, class FunctionT, class... ArgsT> +void send_closure(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) { + using ActorT = typename std::decay_t<ActorIdT>::ActorT; + using FunctionClassT = member_function_class_t<FunctionT>; + static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure"); + + ActorIdT id = std::forward<ActorIdT>(actor_id); + detail::send_closure(id.as_actor_ref(), function, std::forward<ArgsT>(args)...); +} + +template <class ActorIdT, class FunctionT, class... ArgsT> +void send_closure_later(ActorIdT &&actor_id, FunctionT function, ArgsT &&... args) { + using ActorT = typename std::decay_t<ActorIdT>::ActorT; + using FunctionClassT = member_function_class_t<FunctionT>; + static_assert(std::is_base_of<FunctionClassT, ActorT>::value, "unsafe send_closure"); + + ActorIdT id = std::forward<ActorIdT>(actor_id); + detail::send_closure_later(id.as_actor_ref(), function, std::forward<ArgsT>(args)...); +} + +template <class ActorIdT, class... ArgsT> +void send_lambda(ActorIdT &&actor_id, ArgsT &&... args) { + ActorIdT id = std::forward<ActorIdT>(actor_id); + detail::send_lambda(id.as_actor_ref(), std::forward<ArgsT>(args)...); +} + +#endif //!TD_THREAD_UNSUPPORTED +} // namespace actor2 +} // namespace td diff --git a/libs/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h b/libs/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h new file mode 100644 index 0000000000..5850f1a94c --- /dev/null +++ b/libs/tdlib/td/tdactor/td/actor/impl2/SchedulerId.h @@ -0,0 +1,32 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/common.h" +#include "td/utils/logging.h" + +namespace td { +namespace actor2 { +class SchedulerId { + public: + SchedulerId() : id_(-1) { + } + explicit SchedulerId(uint8 id) : id_(id) { + } + bool is_valid() const { + return id_ >= 0; + } + uint8 value() const { + CHECK(is_valid()); + return static_cast<uint8>(id_); + } + + private: + int32 id_{0}; +}; +} // namespace actor2 +} // namespace td diff --git a/libs/tdlib/td/tdactor/test/actors_bugs.cpp b/libs/tdlib/td/tdactor/test/actors_bugs.cpp new file mode 100644 index 0000000000..f4267f2818 --- /dev/null +++ b/libs/tdlib/td/tdactor/test/actors_bugs.cpp @@ -0,0 +1,47 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/utils/tests.h" + +#include "td/actor/Timeout.h" + +using namespace td; + +TEST(MultiTimeout, bug) { + ConcurrentScheduler sched; + int threads_n = 0; + sched.init(threads_n); + + sched.start(); + std::unique_ptr<MultiTimeout> multi_timeout; + struct Data { + MultiTimeout *multi_timeout; + }; + Data data; + + { + auto guard = sched.get_current_guard(); + multi_timeout = std::make_unique<MultiTimeout>(); + data.multi_timeout = multi_timeout.get(); + multi_timeout->set_callback([](void *void_data, int64 key) { + auto &data = *static_cast<Data *>(void_data); + if (key == 1) { + data.multi_timeout->cancel_timeout(key + 1); + data.multi_timeout->set_timeout_in(key + 2, 1); + } else { + Scheduler::instance()->finish(); + } + }); + multi_timeout->set_callback_data(&data); + multi_timeout->set_timeout_in(1, 1); + multi_timeout->set_timeout_in(2, 2); + } + + while (sched.run_main(10)) { + // empty + } + sched.finish(); +} diff --git a/libs/tdlib/td/tdactor/test/actors_impl2.cpp b/libs/tdlib/td/tdactor/test/actors_impl2.cpp new file mode 100644 index 0000000000..9185fe8858 --- /dev/null +++ b/libs/tdlib/td/tdactor/test/actors_impl2.cpp @@ -0,0 +1,535 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/actor/impl2/ActorLocker.h" +#include "td/actor/impl2/Scheduler.h" + +#include "td/utils/format.h" +#include "td/utils/logging.h" +#include "td/utils/port/thread.h" +#include "td/utils/Slice.h" +#include "td/utils/StringBuilder.h" +#include "td/utils/tests.h" +#include "td/utils/Time.h" + +#include <array> +#include <atomic> +#include <deque> +#include <memory> + +using td::actor2::ActorLocker; +using td::actor2::ActorSignals; +using td::actor2::ActorState; +using td::actor2::SchedulerId; + +TEST(Actor2, signals) { + ActorSignals signals; + signals.add_signal(ActorSignals::Wakeup); + signals.add_signal(ActorSignals::Cpu); + signals.add_signal(ActorSignals::Kill); + signals.clear_signal(ActorSignals::Cpu); + + bool was_kill = false; + bool was_wakeup = false; + while (!signals.empty()) { + auto s = signals.first_signal(); + if (s == ActorSignals::Kill) { + was_kill = true; + } else if (s == ActorSignals::Wakeup) { + was_wakeup = true; + } else { + UNREACHABLE(); + } + signals.clear_signal(s); + } + CHECK(was_kill && was_wakeup); +} + +TEST(Actors2, flags) { + ActorState::Flags flags; + CHECK(!flags.is_locked()); + flags.set_locked(true); + CHECK(flags.is_locked()); + flags.set_locked(false); + CHECK(!flags.is_locked()); + flags.set_pause(true); + + flags.set_scheduler_id(SchedulerId{123}); + + auto signals = flags.get_signals(); + CHECK(signals.empty()); + signals.add_signal(ActorSignals::Cpu); + signals.add_signal(ActorSignals::Kill); + CHECK(signals.has_signal(ActorSignals::Cpu)); + CHECK(signals.has_signal(ActorSignals::Kill)); + flags.set_signals(signals); + CHECK(flags.get_signals().raw() == signals.raw()) << flags.get_signals().raw() << " " << signals.raw(); + + auto wakeup = ActorSignals{}; + wakeup.add_signal(ActorSignals::Wakeup); + + flags.add_signals(wakeup); + signals.add_signal(ActorSignals::Wakeup); + CHECK(flags.get_signals().raw() == signals.raw()); + + flags.clear_signals(); + CHECK(flags.get_signals().empty()); + + CHECK(flags.get_scheduler_id().value() == 123); + CHECK(flags.is_pause()); +} + +TEST(Actor2, locker) { + ActorState state; + + ActorSignals kill_signal; + kill_signal.add_signal(ActorSignals::Kill); + + ActorSignals wakeup_signal; + kill_signal.add_signal(ActorSignals::Wakeup); + + ActorSignals cpu_signal; + kill_signal.add_signal(ActorSignals::Cpu); + + { + ActorLocker lockerA(&state); + ActorLocker lockerB(&state); + ActorLocker lockerC(&state); + + CHECK(lockerA.try_lock()); + CHECK(lockerA.own_lock()); + auto flagsA = lockerA.flags(); + CHECK(lockerA.try_unlock(flagsA)); + CHECK(!lockerA.own_lock()); + + CHECK(lockerA.try_lock()); + CHECK(!lockerB.try_lock()); + CHECK(!lockerC.try_lock()); + + CHECK(lockerB.try_add_signals(kill_signal)); + CHECK(!lockerC.try_add_signals(wakeup_signal)); + CHECK(lockerC.try_add_signals(wakeup_signal)); + CHECK(!lockerC.add_signals(cpu_signal)); + CHECK(!lockerA.flags().has_signals()); + CHECK(!lockerA.try_unlock(lockerA.flags())); + { + auto flags = lockerA.flags(); + auto signals = flags.get_signals(); + bool was_kill = false; + bool was_wakeup = false; + bool was_cpu = false; + while (!signals.empty()) { + auto s = signals.first_signal(); + if (s == ActorSignals::Kill) { + was_kill = true; + } else if (s == ActorSignals::Wakeup) { + was_wakeup = true; + } else if (s == ActorSignals::Cpu) { + was_cpu = true; + } else { + UNREACHABLE(); + } + signals.clear_signal(s); + } + CHECK(was_kill && was_wakeup && was_cpu); + flags.clear_signals(); + CHECK(lockerA.try_unlock(flags)); + } + } + + { + ActorLocker lockerB(&state); + CHECK(lockerB.try_lock()); + CHECK(lockerB.try_unlock(lockerB.flags())); + CHECK(lockerB.add_signals(kill_signal)); + CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill)); + auto flags = lockerB.flags(); + flags.clear_signals(); + ActorLocker lockerA(&state); + CHECK(!lockerA.add_signals(kill_signal)); + CHECK(!lockerB.try_unlock(flags)); + CHECK(!lockerA.add_signals(kill_signal)); // do not loose this signal! + CHECK(!lockerB.try_unlock(flags)); + CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill)); + CHECK(lockerB.try_unlock(flags)); + } + + { + ActorLocker lockerA(&state); + CHECK(lockerA.try_lock()); + auto flags = lockerA.flags(); + flags.set_pause(true); + CHECK(lockerA.try_unlock(flags)); + //We have to lock, though we can't execute. + CHECK(lockerA.add_signals(wakeup_signal)); + } +} + +#if !TD_THREAD_UNSUPPORTED +TEST(Actor2, locker_stress) { + ActorState state; + + constexpr size_t threads_n = 5; + auto stage = [&](std::atomic<int> &value, int need) { + value.fetch_add(1, std::memory_order_release); + while (value.load(std::memory_order_acquire) < need) { + td::this_thread::yield(); + } + }; + + struct Node { + std::atomic<td::uint32> request{0}; + td::uint32 response = 0; + char pad[64]; + }; + std::array<Node, threads_n> nodes; + auto do_work = [&]() { + for (auto &node : nodes) { + auto query = node.request.load(std::memory_order_acquire); + if (query) { + node.response = query * query; + node.request.store(0, std::memory_order_relaxed); + } + } + }; + + std::atomic<int> begin{0}; + std::atomic<int> ready{0}; + std::atomic<int> check{0}; + std::atomic<int> finish{0}; + std::vector<td::thread> threads; + for (size_t i = 0; i < threads_n; i++) { + threads.push_back(td::thread([&, id = i] { + for (size_t i = 1; i < 1000000; i++) { + ActorLocker locker(&state); + auto need = static_cast<int>(threads_n * i); + auto query = static_cast<td::uint32>(id + need); + stage(begin, need); + nodes[id].request = 0; + nodes[id].response = 0; + stage(ready, need); + if (locker.try_lock()) { + nodes[id].response = query * query; + } else { + auto cpu = ActorSignals::one(ActorSignals::Cpu); + nodes[id].request.store(query, std::memory_order_release); + locker.add_signals(cpu); + } + while (locker.own_lock()) { + auto flags = locker.flags(); + auto signals = flags.get_signals(); + if (!signals.empty()) { + do_work(); + } + flags.clear_signals(); + locker.try_unlock(flags); + } + + stage(check, need); + if (id == 0) { + CHECK(locker.add_signals(ActorSignals{})); + CHECK(!locker.flags().has_signals()); + CHECK(locker.try_unlock(locker.flags())); + for (size_t thread_id = 0; thread_id < threads_n; thread_id++) { + CHECK(nodes[thread_id].response == + static_cast<td::uint32>(thread_id + need) * static_cast<td::uint32>(thread_id + need)) + << td::tag("thread", thread_id) << " " << nodes[thread_id].response << " " + << nodes[thread_id].request.load(); + } + } + } + })); + } + for (auto &thread : threads) { + thread.join(); + } +} + +namespace { +const size_t BUF_SIZE = 1024 * 1024; +char buf[BUF_SIZE]; +td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1)); +} // namespace + +TEST(Actor2, executor_simple) { + using namespace td; + using namespace td::actor2; + struct Dispatcher : public SchedulerDispatcher { + void add_to_queue(ActorInfoPtr ptr, SchedulerId scheduler_id, bool need_poll) override { + queue.push_back(std::move(ptr)); + } + void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr, Timestamp timestamp) override { + UNREACHABLE(); + } + SchedulerId get_scheduler_id() const override { + return SchedulerId{0}; + } + std::deque<ActorInfoPtr> queue; + }; + Dispatcher dispatcher; + + class TestActor : public Actor { + public: + void close() { + stop(); + } + + private: + void start_up() override { + sb << "StartUp"; + } + void tear_down() override { + sb << "TearDown"; + } + }; + ActorInfoCreator actor_info_creator; + auto actor = actor_info_creator.create( + std::make_unique<TestActor>(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor")); + dispatcher.add_to_queue(actor, SchedulerId{0}, false); + + { + ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); + CHECK(executor.can_send()); + CHECK(executor.can_send_immediate()); + CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice(); + sb.clear(); + executor.send(ActorMessageCreator::lambda([&] { sb << "A"; })); + CHECK(sb.as_cslice() == "A") << sb.as_cslice(); + sb.clear(); + auto big_message = ActorMessageCreator::lambda([&] { sb << "big"; }); + big_message.set_big(); + executor.send(std::move(big_message)); + CHECK(sb.as_cslice() == "") << sb.as_cslice(); + executor.send(ActorMessageCreator::lambda([&] { sb << "A"; })); + CHECK(sb.as_cslice() == "") << sb.as_cslice(); + } + CHECK(dispatcher.queue.size() == 1); + { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue()); } + CHECK(dispatcher.queue.size() == 1); + dispatcher.queue.clear(); + CHECK(sb.as_cslice() == "bigA") << sb.as_cslice(); + sb.clear(); + { + ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); + executor.send( + ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); })); + } + CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice(); + sb.clear(); + CHECK(!actor->has_actor()); + { + ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); + executor.send( + ActorMessageCreator::lambda([&] { static_cast<TestActor &>(ActorExecuteContext::get()->actor()).close(); })); + } + CHECK(dispatcher.queue.empty()); + CHECK(sb.as_cslice() == ""); +} + +using namespace td::actor2; +using td::uint32; +static std::atomic<int> cnt; +class Worker : public Actor { + public: + void query(uint32 x, ActorInfoPtr master); + void close() { + stop(); + } +}; +class Master : public Actor { + public: + void on_result(uint32 x, uint32 y) { + loop(); + } + + private: + uint32 l = 0; + uint32 r = 100000; + ActorInfoPtr worker; + void start_up() override { + worker = detail::create_actor<Worker>(ActorOptions().with_name("Master")); + loop(); + } + void loop() override { + l++; + if (l == r) { + if (!--cnt) { + SchedulerContext::get()->stop(); + } + detail::send_closure(*worker, &Worker::close); + stop(); + return; + } + detail::send_lambda(*worker, + [x = l, self = get_actor_info_ptr()] { detail::current_actor<Worker>().query(x, self); }); + } +}; + +void Worker::query(uint32 x, ActorInfoPtr master) { + auto y = x; + for (int i = 0; i < 100; i++) { + y = y * y; + } + detail::send_lambda(*master, [result = y, x] { detail::current_actor<Master>().on_result(x, result); }); +} + +TEST(Actor2, scheduler_simple) { + auto group_info = std::make_shared<SchedulerGroupInfo>(1); + Scheduler scheduler{group_info, SchedulerId{0}, 2}; + scheduler.start(); + scheduler.run_in_context([] { + cnt = 10; + for (int i = 0; i < 10; i++) { + detail::create_actor<Master>(ActorOptions().with_name("Master")); + } + }); + while (scheduler.run(1000)) { + } + Scheduler::close_scheduler_group(*group_info); +} + +TEST(Actor2, actor_id_simple) { + auto group_info = std::make_shared<SchedulerGroupInfo>(1); + Scheduler scheduler{group_info, SchedulerId{0}, 2}; + sb.clear(); + scheduler.start(); + + scheduler.run_in_context([] { + class A : public Actor { + public: + A(int value) : value_(value) { + sb << "A" << value_; + } + void hello() { + sb << "hello"; + } + ~A() { + sb << "~A"; + if (--cnt <= 0) { + SchedulerContext::get()->stop(); + } + } + + private: + int value_; + }; + cnt = 1; + auto id = create_actor<A>("A", 123); + CHECK(sb.as_cslice() == "A123"); + sb.clear(); + send_closure(id, &A::hello); + }); + while (scheduler.run(1000)) { + } + CHECK(sb.as_cslice() == "hello~A"); + Scheduler::close_scheduler_group(*group_info); + sb.clear(); +} + +TEST(Actor2, actor_creation) { + auto group_info = std::make_shared<SchedulerGroupInfo>(1); + Scheduler scheduler{group_info, SchedulerId{0}, 1}; + scheduler.start(); + + scheduler.run_in_context([]() mutable { + class B; + class A : public Actor { + public: + void f() { + check(); + stop(); + } + + private: + void start_up() override { + check(); + create_actor<B>("Simple", actor_id(this)).release(); + } + + void check() { + auto &context = *SchedulerContext::get(); + CHECK(context.has_poll()); + context.get_poll(); + } + + void tear_down() override { + if (--cnt <= 0) { + SchedulerContext::get()->stop(); + } + } + }; + + class B : public Actor { + public: + B(ActorId<A> a) : a_(a) { + } + + private: + void start_up() override { + auto &context = *SchedulerContext::get(); + CHECK(!context.has_poll()); + send_closure(a_, &A::f); + stop(); + } + void tear_down() override { + if (--cnt <= 0) { + SchedulerContext::get()->stop(); + } + } + ActorId<A> a_; + }; + cnt = 2; + create_actor<A>(ActorOptions().with_name("Poll").with_poll()).release(); + }); + while (scheduler.run(1000)) { + } + scheduler.stop(); + Scheduler::close_scheduler_group(*group_info); +} + +TEST(Actor2, actor_timeout_simple) { + auto group_info = std::make_shared<SchedulerGroupInfo>(1); + Scheduler scheduler{group_info, SchedulerId{0}, 2}; + sb.clear(); + scheduler.start(); + + scheduler.run_in_context([] { + class A : public Actor { + public: + void start_up() override { + set_timeout(); + } + void alarm() override { + double diff = td::Time::now() - expected_timeout_; + CHECK(-0.001 < diff && diff < 0.1) << diff; + if (cnt_-- > 0) { + set_timeout(); + } else { + stop(); + } + } + + void tear_down() override { + SchedulerContext::get()->stop(); + } + + private: + double expected_timeout_; + int cnt_ = 5; + void set_timeout() { + auto wakeup_timestamp = td::Timestamp::in(0.1); + expected_timeout_ = wakeup_timestamp.at(); + alarm_timestamp() = wakeup_timestamp; + } + }; + create_actor<A>(ActorInfoCreator::Options().with_name("A").with_poll()).release(); + }); + while (scheduler.run(1000)) { + } + Scheduler::close_scheduler_group(*group_info); + sb.clear(); +} +#endif //!TD_THREAD_UNSUPPORTED diff --git a/libs/tdlib/td/tdactor/test/actors_main.cpp b/libs/tdlib/td/tdactor/test/actors_main.cpp new file mode 100644 index 0000000000..ffceacc595 --- /dev/null +++ b/libs/tdlib/td/tdactor/test/actors_main.cpp @@ -0,0 +1,463 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/utils/tests.h" + +#include "td/actor/actor.h" +#include "td/actor/PromiseFuture.h" + +#include "td/utils/logging.h" +#include "td/utils/Random.h" + +#include <limits> +#include <map> +#include <utility> + +using namespace td; + +REGISTER_TESTS(actors_main); + +namespace { + +template <class ContainerT> +static typename ContainerT::value_type &rand_elem(ContainerT &cont) { + CHECK(0 < cont.size() && cont.size() <= static_cast<size_t>(std::numeric_limits<int>::max())); + return cont[Random::fast(0, static_cast<int>(cont.size()) - 1)]; +} + +static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) { + uint32 res = 1; + while (p) { + if (p & 1) { + res *= x; + } + x *= x; + p >>= 1; + } + return res; +} + +static uint32 slow_pow_mod_uint32(uint32 x, uint32 p) { + uint32 res = 1; + for (uint32 i = 0; i < p; i++) { + res *= x; + } + return res; +} + +struct Query { + uint32 query_id; + uint32 result; + std::vector<int> todo; + Query() = default; + Query(const Query &) = delete; + Query &operator=(const Query &) = delete; + Query(Query &&) = default; + Query &operator=(Query &&) = default; + ~Query() { + CHECK(todo.empty()) << "Query lost"; + } + int next_pow() { + CHECK(!todo.empty()); + int res = todo.back(); + todo.pop_back(); + return res; + } + bool ready() { + return todo.empty(); + } +}; + +static uint32 fast_calc(Query &q) { + uint32 result = q.result; + for (auto x : q.todo) { + result = fast_pow_mod_uint32(result, x); + } + return result; +} + +class Worker final : public Actor { + public: + explicit Worker(int threads_n) : threads_n_(threads_n) { + } + void query(PromiseActor<uint32> &&promise, uint32 x, uint32 p) { + uint32 result = slow_pow_mod_uint32(x, p); + promise.set_value(std::move(result)); + + (void)threads_n_; + // if (threads_n_ > 1 && Random::fast(0, 9) == 0) { + // migrate(Random::fast(2, threads_n)); + //} + } + + private: + int threads_n_; +}; + +class QueryActor final : public Actor { + public: + class Callback { + public: + Callback() = default; + Callback(const Callback &) = delete; + Callback &operator=(const Callback &) = delete; + Callback(Callback &&) = delete; + Callback &operator=(Callback &&) = delete; + virtual ~Callback() = default; + virtual void on_result(Query &&query) = 0; + virtual void on_closed() = 0; + }; + + explicit QueryActor(int threads_n) : threads_n_(threads_n) { + } + + void set_callback(std::unique_ptr<Callback> callback) { + callback_ = std::move(callback); + } + void set_workers(std::vector<ActorId<Worker>> workers) { + workers_ = std::move(workers); + } + + void query(Query &&query) { + uint32 x = query.result; + uint32 p = query.next_pow(); + if (Random::fast(0, 3) && (p <= 1000 || workers_.empty())) { + query.result = slow_pow_mod_uint32(x, p); + callback_->on_result(std::move(query)); + } else { + auto future = send_promise(rand_elem(workers_), Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, x, p); + if (future.is_ready()) { + query.result = future.move_as_ok(); + callback_->on_result(std::move(query)); + } else { + future.set_event(EventCreator::raw(actor_id(), query.query_id)); + auto query_id = query.query_id; + pending_.insert(std::make_pair(query_id, std::make_pair(std::move(future), std::move(query)))); + } + } + if (threads_n_ > 1 && Random::fast(0, 9) == 0) { + migrate(Random::fast(2, threads_n_)); + } + } + + void raw_event(const Event::Raw &event) override { + uint32 id = event.u32; + auto it = pending_.find(id); + auto future = std::move(it->second.first); + auto query = std::move(it->second.second); + pending_.erase(it); + CHECK(future.is_ready()); + query.result = future.move_as_ok(); + callback_->on_result(std::move(query)); + } + + void close() { + callback_->on_closed(); + stop(); + } + + void on_start_migrate(int32 sched_id) override { + for (auto &it : pending_) { + start_migrate(it.second.first, sched_id); + } + } + void on_finish_migrate() override { + for (auto &it : pending_) { + finish_migrate(it.second.first); + } + } + + private: + unique_ptr<Callback> callback_; + std::map<uint32, std::pair<FutureActor<uint32>, Query>> pending_; + std::vector<ActorId<Worker>> workers_; + int threads_n_; +}; + +class MainQueryActor final : public Actor { + class QueryActorCallback : public QueryActor::Callback { + public: + void on_result(Query &&query) override { + if (query.ready()) { + send_closure(parent_id_, &MainQueryActor::on_result, std::move(query)); + } else { + send_closure(next_solver_, &QueryActor::query, std::move(query)); + } + } + void on_closed() override { + send_closure(parent_id_, &MainQueryActor::on_closed); + } + QueryActorCallback(ActorId<MainQueryActor> parent_id, ActorId<QueryActor> next_solver) + : parent_id_(parent_id), next_solver_(next_solver) { + } + + private: + ActorId<MainQueryActor> parent_id_; + ActorId<QueryActor> next_solver_; + }; + + const int ACTORS_CNT = 10; + const int WORKERS_CNT = 4; + + public: + explicit MainQueryActor(int threads_n) : threads_n_(threads_n) { + } + + void start_up() override { + actors_.resize(ACTORS_CNT); + for (auto &actor : actors_) { + auto actor_ptr = make_unique<QueryActor>(threads_n_); + actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0) + .release(); + } + + workers_.resize(WORKERS_CNT); + for (auto &worker : workers_) { + auto actor_ptr = make_unique<Worker>(threads_n_); + worker = + register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release(); + } + + for (int i = 0; i < ACTORS_CNT; i++) { + ref_cnt_++; + send_closure(actors_[i], &QueryActor::set_callback, + make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT])); + send_closure(actors_[i], &QueryActor::set_workers, workers_); + } + yield(); + } + + void on_result(Query &&query) { + CHECK(query.ready()); + CHECK(query.result == expected_[query.query_id]); + in_cnt_++; + wakeup(); + } + + Query create_query() { + Query q; + q.query_id = (query_id_ += 2); + q.result = q.query_id; + q.todo = {1, 1, 1, 1, 1, 1, 1, 1, 10000}; + expected_[q.query_id] = fast_calc(q); + return q; + } + + void on_closed() { + ref_cnt_--; + if (ref_cnt_ == 0) { + Scheduler::instance()->finish(); + } + } + + void wakeup() override { + int cnt = 100000; + while (out_cnt_ < in_cnt_ + 100 && out_cnt_ < cnt) { + if (Random::fast(0, 1)) { + send_closure(rand_elem(actors_), &QueryActor::query, create_query()); + } else { + send_closure_later(rand_elem(actors_), &QueryActor::query, create_query()); + } + out_cnt_++; + } + if (in_cnt_ == cnt) { + in_cnt_++; + ref_cnt_--; + for (auto &actor : actors_) { + send_closure(actor, &QueryActor::close); + } + } + } + + private: + std::map<uint32, uint32> expected_; + std::vector<ActorId<QueryActor>> actors_; + std::vector<ActorId<Worker>> workers_; + int out_cnt_ = 0; + int in_cnt_ = 0; + int query_id_ = 1; + int ref_cnt_ = 1; + int threads_n_; +}; + +class SimpleActor final : public Actor { + public: + explicit SimpleActor(int32 threads_n) : threads_n_(threads_n) { + } + void start_up() override { + auto actor_ptr = make_unique<Worker>(threads_n_); + worker_ = + register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release(); + yield(); + } + + void wakeup() override { + if (q_ == 100000) { + Scheduler::instance()->finish(); + stop(); + return; + } + q_++; + p_ = Random::fast(0, 1) ? 1 : 10000; + auto future = send_promise(worker_, Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, q_, p_); + if (future.is_ready()) { + auto result = future.move_as_ok(); + CHECK(result == fast_pow_mod_uint32(q_, p_)); + yield(); + } else { + future.set_event(EventCreator::raw(actor_id(), nullptr)); + future_ = std::move(future); + } + // if (threads_n_ > 1 && Random::fast(0, 2) == 0) { + // migrate(Random::fast(1, threads_n)); + //} + } + void raw_event(const Event::Raw &event) override { + auto result = future_.move_as_ok(); + CHECK(result == fast_pow_mod_uint32(q_, p_)); + yield(); + } + + void on_start_migrate(int32 sched_id) override { + start_migrate(future_, sched_id); + } + void on_finish_migrate() override { + finish_migrate(future_); + } + + private: + int32 threads_n_; + ActorId<Worker> worker_; + FutureActor<uint32> future_; + uint32 q_ = 1; + uint32 p_; +}; +} // namespace + +class SendToDead : public Actor { + public: + class Parent : public Actor { + public: + explicit Parent(ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) { + } + void start_up() override { + set_timeout_in(Random::fast_uint32() % 3 * 0.001); + if (ttl_ != 0) { + child_ = create_actor_on_scheduler<Parent>( + "Child", Random::fast_uint32() % Scheduler::instance()->sched_count(), actor_shared(), ttl_ - 1); + } + } + void timeout_expired() override { + stop(); + } + + private: + ActorOwn<Parent> child_; + ActorShared<> parent_; + int ttl_; + }; + + void start_up() override { + for (int i = 0; i < 2000; i++) { + create_actor_on_scheduler<Parent>("Parent", Random::fast_uint32() % Scheduler::instance()->sched_count(), + create_reference(), 4) + .release(); + } + } + + ActorShared<> create_reference() { + ref_cnt_++; + return actor_shared(); + } + void hangup_shared() override { + ref_cnt_--; + if (ref_cnt_ == 0) { + ttl_--; + if (ttl_ <= 0) { + Scheduler::instance()->finish(); + stop(); + } else { + start_up(); + } + } + } + + uint32 ttl_{50}; + uint32 ref_cnt_{0}; +}; + +TEST(Actors, send_to_dead) { + //TODO: fix CHECK(storage_count_.load() == 0) + return; + ConcurrentScheduler sched; + int threads_n = 5; + sched.init(threads_n); + + sched.create_actor_unsafe<SendToDead>(0, "manager").release(); + sched.start(); + while (sched.run_main(10)) { + // empty + } + sched.finish(); +} + +TEST(Actors, main_simple) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); + + ConcurrentScheduler sched; + int threads_n = 3; + sched.init(threads_n); + + sched.create_actor_unsafe<SimpleActor>(threads_n > 1 ? 1 : 0, "simple", threads_n).release(); + sched.start(); + while (sched.run_main(10)) { + // empty + } + sched.finish(); +} + +TEST(Actors, main) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); + + ConcurrentScheduler sched; + int threads_n = 9; + sched.init(threads_n); + + sched.create_actor_unsafe<MainQueryActor>(threads_n > 1 ? 1 : 0, "manager", threads_n).release(); + sched.start(); + while (sched.run_main(10)) { + // empty + } + sched.finish(); +} + +class DoAfterStop : public Actor { + public: + void loop() override { + ptr = std::make_unique<int>(10); + stop(); + CHECK(*ptr == 10); + Scheduler::instance()->finish(); + } + + private: + std::unique_ptr<int> ptr; +}; + +TEST(Actors, do_after_stop) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); + + ConcurrentScheduler sched; + int threads_n = 0; + sched.init(threads_n); + + sched.create_actor_unsafe<DoAfterStop>(0, "manager").release(); + sched.start(); + while (sched.run_main(10)) { + // empty + } + sched.finish(); +} diff --git a/libs/tdlib/td/tdactor/test/actors_simple.cpp b/libs/tdlib/td/tdactor/test/actors_simple.cpp new file mode 100644 index 0000000000..c0a6c32b61 --- /dev/null +++ b/libs/tdlib/td/tdactor/test/actors_simple.cpp @@ -0,0 +1,622 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/utils/tests.h" + +#include "td/actor/actor.h" +#include "td/actor/MultiPromise.h" +#include "td/actor/PromiseFuture.h" +#include "td/actor/SleepActor.h" +#include "td/actor/Timeout.h" + +#include "td/utils/logging.h" +#include "td/utils/Observer.h" +#include "td/utils/port/FileFd.h" +#include "td/utils/Slice.h" +#include "td/utils/Status.h" +#include "td/utils/StringBuilder.h" + +#include <tuple> + +REGISTER_TESTS(actors_simple) + +namespace { +using namespace td; + +static const size_t BUF_SIZE = 1024 * 1024; +static char buf[BUF_SIZE]; +static char buf2[BUF_SIZE]; +static StringBuilder sb(MutableSlice(buf, BUF_SIZE - 1)); +static StringBuilder sb2(MutableSlice(buf2, BUF_SIZE - 1)); + +TEST(Actors, SendLater) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); + sb.clear(); + Scheduler scheduler; + scheduler.init(); + + auto guard = scheduler.get_guard(); + class Worker : public Actor { + public: + void f() { + sb << "A"; + } + }; + auto id = create_actor<Worker>("Worker"); + scheduler.run_no_guard(0); + send_closure(id, &Worker::f); + send_closure_later(id, &Worker::f); + send_closure(id, &Worker::f); + ASSERT_STREQ("A", sb.as_cslice().c_str()); + scheduler.run_no_guard(0); + ASSERT_STREQ("AAA", sb.as_cslice().c_str()); +} + +class X { + public: + X() { + sb << "[cnstr_default]"; + } + X(const X &) { + sb << "[cnstr_copy]"; + } + X(X &&) { + sb << "[cnstr_move]"; + } + X &operator=(const X &) { + sb << "[set_copy]"; + return *this; + } + X &operator=(X &&) { + sb << "[set_move]"; + return *this; + } + ~X() = default; +}; + +class XReceiver final : public Actor { + public: + void by_const_ref(const X &) { + sb << "[by_const_ref]"; + } + void by_lvalue_ref(const X &) { + sb << "[by_lvalue_ref]"; + } + void by_value(X) { + sb << "[by_value]"; + } +}; + +TEST(Actors, simple_pass_event_arguments) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); + Scheduler scheduler; + scheduler.init(); + + auto guard = scheduler.get_guard(); + auto id = create_actor<XReceiver>("XR").release(); + scheduler.run_no_guard(0); + + X x; + + // check tuple + // std::tuple<X> tx; + // sb.clear(); + // std::tuple<X> ty(std::move(tx)); + // tx = std::move(ty); + // ASSERT_STREQ("[cnstr_move]", sb.as_cslice().c_str()); + + // Send temporary object + + // Tmp-->ConstRef + sb.clear(); + send_closure(id, &XReceiver::by_const_ref, X()); + ASSERT_STREQ("[cnstr_default][by_const_ref]", sb.as_cslice().c_str()); + + // Tmp-->ConstRef (Delayed) + sb.clear(); + send_closure_later(id, &XReceiver::by_const_ref, X()); + scheduler.run_no_guard(0); + // LOG(ERROR) << sb.as_cslice(); + ASSERT_STREQ("[cnstr_default][cnstr_move][by_const_ref]", sb.as_cslice().c_str()); + + // Tmp-->LvalueRef + sb.clear(); + send_closure(id, &XReceiver::by_lvalue_ref, X()); + ASSERT_STREQ("[cnstr_default][by_lvalue_ref]", sb.as_cslice().c_str()); + + // Tmp-->LvalueRef (Delayed) + sb.clear(); + send_closure_later(id, &XReceiver::by_lvalue_ref, X()); + scheduler.run_no_guard(0); + ASSERT_STREQ("[cnstr_default][cnstr_move][by_lvalue_ref]", sb.as_cslice().c_str()); + + // Tmp-->Value + sb.clear(); + send_closure(id, &XReceiver::by_value, X()); + ASSERT_STREQ("[cnstr_default][cnstr_move][by_value]", sb.as_cslice().c_str()); + + // Tmp-->Value (Delayed) + sb.clear(); + send_closure_later(id, &XReceiver::by_value, X()); + scheduler.run_no_guard(0); + ASSERT_STREQ("[cnstr_default][cnstr_move][cnstr_move][by_value]", sb.as_cslice().c_str()); + + // Var-->ConstRef + sb.clear(); + send_closure(id, &XReceiver::by_const_ref, x); + ASSERT_STREQ("[by_const_ref]", sb.as_cslice().c_str()); + + // Var-->ConstRef (Delayed) + sb.clear(); + send_closure_later(id, &XReceiver::by_const_ref, x); + scheduler.run_no_guard(0); + ASSERT_STREQ("[cnstr_copy][by_const_ref]", sb.as_cslice().c_str()); + + // Var-->LvalueRef + // Var-->LvalueRef (Delayed) + // CE or stange behaviour + + // Var-->Value + sb.clear(); + send_closure(id, &XReceiver::by_value, x); + ASSERT_STREQ("[cnstr_copy][by_value]", sb.as_cslice().c_str()); + + // Var-->Value (Delayed) + sb.clear(); + send_closure_later(id, &XReceiver::by_value, x); + scheduler.run_no_guard(0); + ASSERT_STREQ("[cnstr_copy][cnstr_move][by_value]", sb.as_cslice().c_str()); +} + +class PrintChar final : public Actor { + public: + PrintChar(char c, int cnt) : char_(c), cnt_(cnt) { + } + void start_up() override { + yield(); + } + void wakeup() override { + if (cnt_ == 0) { + stop(); + } else { + sb << char_; + cnt_--; + yield(); + } + } + + private: + char char_; + int cnt_; +}; +} // namespace + +// +// Yield must add actor to the end of queue +// +TEST(Actors, simple_hand_yield) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); + Scheduler scheduler; + scheduler.init(); + sb.clear(); + int cnt = 1000; + { + auto guard = scheduler.get_guard(); + create_actor<PrintChar>("PrintA", 'A', cnt).release(); + create_actor<PrintChar>("PrintB", 'B', cnt).release(); + create_actor<PrintChar>("PrintC", 'C', cnt).release(); + } + scheduler.run(0); + std::string expected; + for (int i = 0; i < cnt; i++) { + expected += "ABC"; + } + ASSERT_STREQ(expected.c_str(), sb.as_cslice().c_str()); +} + +class Ball { + public: + friend void start_migrate(Ball &ball, int32 sched_id) { + sb << "start"; + } + friend void finish_migrate(Ball &ball) { + sb2 << "finish"; + } +}; + +class Pong final : public Actor { + public: + void pong(Ball ball) { + Scheduler::instance()->finish(); + } +}; + +class Ping final : public Actor { + public: + explicit Ping(ActorId<Pong> pong) : pong_(pong) { + } + void start_up() override { + send_closure(pong_, &Pong::pong, Ball()); + } + + private: + ActorId<Pong> pong_; +}; + +TEST(Actors, simple_migrate) { + sb.clear(); + sb2.clear(); + + ConcurrentScheduler scheduler; + scheduler.init(2); + auto pong = scheduler.create_actor_unsafe<Pong>(2, "Pong").release(); + scheduler.create_actor_unsafe<Ping>(1, "Ping", pong).release(); + scheduler.start(); + while (scheduler.run_main(10)) { + } + scheduler.finish(); +#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED + ASSERT_STREQ("", sb.as_cslice().c_str()); + ASSERT_STREQ("", sb2.as_cslice().c_str()); +#else + ASSERT_STREQ("start", sb.as_cslice().c_str()); + ASSERT_STREQ("finish", sb2.as_cslice().c_str()); +#endif +} + +class OpenClose final : public Actor { + public: + explicit OpenClose(int cnt) : cnt_(cnt) { + } + void start_up() override { + yield(); + } + void wakeup() override { + ObserverBase *observer = reinterpret_cast<ObserverBase *>(123); + if (cnt_ > 0) { + auto r_file_fd = FileFd::open("server", FileFd::Read | FileFd::Create); + CHECK(r_file_fd.is_ok()) << r_file_fd.error(); + auto file_fd = r_file_fd.move_as_ok(); + // LOG(ERROR) << file_fd.get_native_fd(); + file_fd.get_fd().set_observer(observer); + file_fd.close(); + cnt_--; + yield(); + } else { + Scheduler::instance()->finish(); + } + } + + private: + int cnt_; +}; + +TEST(Actors, open_close) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); + ConcurrentScheduler scheduler; + scheduler.init(2); + int cnt = 1000000; +#if TD_WINDOWS || TD_ANDROID + // TODO(perf) optimize + cnt = 100; +#endif + scheduler.create_actor_unsafe<OpenClose>(1, "A", cnt).release(); + scheduler.create_actor_unsafe<OpenClose>(2, "B", cnt).release(); + scheduler.start(); + while (scheduler.run_main(10)) { + } + scheduler.finish(); +} + +namespace { +class MsgActor : public Actor { + public: + virtual void msg() = 0; +}; + +class Slave : public Actor { + public: + ActorId<MsgActor> msg; + explicit Slave(ActorId<MsgActor> msg) : msg(msg) { + } + void hangup() override { + send_closure(msg, &MsgActor::msg); + } +}; + +class MasterActor : public MsgActor { + public: + void loop() override { + alive_ = true; + slave = create_actor<Slave>("slave", static_cast<ActorId<MsgActor>>(actor_id(this))); + stop(); + } + ActorOwn<Slave> slave; + + MasterActor() = default; + MasterActor(const MasterActor &) = delete; + MasterActor &operator=(const MasterActor &) = delete; + MasterActor(MasterActor &&) = delete; + MasterActor &operator=(MasterActor &&) = delete; + ~MasterActor() override { + alive_ = 987654321; + } + void msg() override { + CHECK(alive_ == 123456789); + } + uint64 alive_ = 123456789; +}; +} // namespace + +TEST(Actors, call_after_destruct) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); + Scheduler scheduler; + scheduler.init(); + { + auto guard = scheduler.get_guard(); + create_actor<MasterActor>("Master").release(); + } + scheduler.run(0); +} + +class LinkTokenSlave : public Actor { + public: + explicit LinkTokenSlave(ActorShared<> parent) : parent_(std::move(parent)) { + } + void add(uint64 link_token) { + CHECK(link_token == get_link_token()); + } + void close() { + stop(); + } + + private: + ActorShared<> parent_; +}; + +class LinkTokenMasterActor : public Actor { + public: + explicit LinkTokenMasterActor(int cnt) : cnt_(cnt) { + } + void start_up() override { + child_ = create_actor<LinkTokenSlave>("Slave", actor_shared(this, 123)).release(); + yield(); + } + void loop() override { + for (int i = 0; i < 100 && cnt_ > 0; cnt_--, i++) { + switch (i % 4) { + case 0: { + send_closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1); + break; + } + case 1: { + send_closure_later(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1); + break; + } + case 2: { + EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1) + .try_emit(); + break; + } + case 3: { + EventCreator::closure(ActorShared<LinkTokenSlave>(child_, cnt_ + 1), &LinkTokenSlave::add, cnt_ + 1) + .try_emit_later(); + break; + } + } + } + if (cnt_ == 0) { + send_closure(child_, &LinkTokenSlave::close); + } else { + yield(); + } + } + + void hangup_shared() override { + CHECK(get_link_token() == 123); + Scheduler::instance()->finish(); + stop(); + } + + private: + int cnt_; + ActorId<LinkTokenSlave> child_; +}; + +TEST(Actors, link_token) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); + ConcurrentScheduler scheduler; + scheduler.init(0); + auto cnt = 100000; + scheduler.create_actor_unsafe<LinkTokenMasterActor>(0, "A", cnt).release(); + scheduler.start(); + while (scheduler.run_main(10)) { + } + scheduler.finish(); +} + +TEST(Actors, promise) { + int value = -1; + Promise<int> p1 = PromiseCreator::lambda([&](int x) { value = x; }); + p1.set_error(Status::Error("Test error")); + ASSERT_EQ(0, value); + Promise<int32> p2 = PromiseCreator::lambda([&](Result<int32> x) { value = 1; }); + p2.set_error(Status::Error("Test error")); + ASSERT_EQ(1, value); +} + +class LaterSlave : public Actor { + public: + explicit LaterSlave(ActorShared<> parent) : parent_(std::move(parent)) { + } + + private: + ActorShared<> parent_; + + void hangup() override { + sb << "A"; + send_closure(actor_id(this), &LaterSlave::finish); + } + void finish() { + sb << "B"; + stop(); + } +}; + +class LaterMasterActor : public Actor { + int cnt_ = 3; + std::vector<ActorOwn<LaterSlave>> children_; + void start_up() override { + for (int i = 0; i < cnt_; i++) { + children_.push_back(create_actor<LaterSlave>("B", actor_shared())); + } + yield(); + } + void loop() override { + children_.clear(); + } + void hangup_shared() override { + if (!--cnt_) { + Scheduler::instance()->finish(); + stop(); + } + } +}; + +TEST(Actors, later) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); + sb.clear(); + ConcurrentScheduler scheduler; + scheduler.init(0); + scheduler.create_actor_unsafe<LaterMasterActor>(0, "A").release(); + scheduler.start(); + while (scheduler.run_main(10)) { + } + scheduler.finish(); + ASSERT_STREQ(sb.as_cslice().c_str(), "AAABBB"); +} + +class MultiPromise2 : public Actor { + public: + void start_up() override { + auto promise = PromiseCreator::lambda([](Result<Unit> result) { + result.ensure(); + Scheduler::instance()->finish(); + }); + + MultiPromiseActorSafe multi_promise; + multi_promise.add_promise(std::move(promise)); + for (int i = 0; i < 10; i++) { + create_actor<SleepActor>("Sleep", 0.1, multi_promise.get_promise()).release(); + } + } +}; + +class MultiPromise1 : public Actor { + public: + void start_up() override { + auto promise = PromiseCreator::lambda([](Result<Unit> result) { + CHECK(result.is_error()); + create_actor<MultiPromise2>("B").release(); + }); + MultiPromiseActorSafe multi_promise; + multi_promise.add_promise(std::move(promise)); + } +}; + +TEST(Actors, MultiPromise) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); + sb.clear(); + ConcurrentScheduler scheduler; + scheduler.init(0); + scheduler.create_actor_unsafe<MultiPromise1>(0, "A").release(); + scheduler.start(); + while (scheduler.run_main(10)) { + } + scheduler.finish(); +} + +class FastPromise : public Actor { + public: + void start_up() override { + PromiseFuture<int> pf; + auto promise = pf.move_promise(); + auto future = pf.move_future(); + promise.set_value(123); + CHECK(future.move_as_ok() == 123); + Scheduler::instance()->finish(); + } +}; + +TEST(Actors, FastPromise) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); + sb.clear(); + ConcurrentScheduler scheduler; + scheduler.init(0); + scheduler.create_actor_unsafe<FastPromise>(0, "A").release(); + scheduler.start(); + while (scheduler.run_main(10)) { + } + scheduler.finish(); +} + +class StopInTeardown : public Actor { + void loop() override { + stop(); + } + void tear_down() override { + stop(); + Scheduler::instance()->finish(); + } +}; + +TEST(Actors, stop_in_teardown) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); + sb.clear(); + ConcurrentScheduler scheduler; + scheduler.init(0); + scheduler.create_actor_unsafe<StopInTeardown>(0, "A").release(); + scheduler.start(); + while (scheduler.run_main(10)) { + } + scheduler.finish(); +} + +class AlwaysWaitForMailbox : public Actor { + public: + void start_up() override { + always_wait_for_mailbox(); + create_actor<SleepActor>("Sleep", 0.1, PromiseCreator::lambda([actor_id = actor_id(this), ptr = this](Unit) { + send_closure(actor_id, &AlwaysWaitForMailbox::g); + send_closure(actor_id, &AlwaysWaitForMailbox::g); + CHECK(!ptr->was_f_); + })) + .release(); + } + + void f() { + was_f_ = true; + Scheduler::instance()->finish(); + } + void g() { + send_closure(actor_id(this), &AlwaysWaitForMailbox::f); + } + + private: + Timeout timeout_; + bool was_f_{false}; +}; + +TEST(Actors, always_wait_for_mailbox) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); + ConcurrentScheduler scheduler; + scheduler.init(0); + scheduler.create_actor_unsafe<AlwaysWaitForMailbox>(0, "A").release(); + scheduler.start(); + while (scheduler.run_main(10)) { + } + scheduler.finish(); +} diff --git a/libs/tdlib/td/tdactor/test/actors_workers.cpp b/libs/tdlib/td/tdactor/test/actors_workers.cpp new file mode 100644 index 0000000000..b97a258a44 --- /dev/null +++ b/libs/tdlib/td/tdactor/test/actors_workers.cpp @@ -0,0 +1,156 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/utils/tests.h" + +#include "td/actor/actor.h" + +#include "td/utils/logging.h" + +REGISTER_TESTS(actors_workers); + +namespace { + +using namespace td; + +class PowerWorker final : public Actor { + public: + class Callback { + public: + Callback() = default; + Callback(const Callback &) = delete; + Callback &operator=(const Callback &) = delete; + Callback(Callback &&) = delete; + Callback &operator=(Callback &&) = delete; + virtual ~Callback() = default; + virtual void on_ready(int query, int res) = 0; + virtual void on_closed() = 0; + }; + void set_callback(unique_ptr<Callback> callback) { + callback_ = std::move(callback); + } + void task(uint32 x, uint32 p) { + uint32 res = 1; + for (uint32 i = 0; i < p; i++) { + res *= x; + } + callback_->on_ready(x, res); + } + void close() { + callback_->on_closed(); + stop(); + } + + private: + std::unique_ptr<Callback> callback_; +}; + +class Manager final : public Actor { + public: + Manager(int queries_n, int query_size, std::vector<ActorId<PowerWorker>> workers) + : workers_(std::move(workers)), left_query_(queries_n), query_size_(query_size) { + } + + class Callback : public PowerWorker::Callback { + public: + Callback(ActorId<Manager> actor_id, int worker_id) : actor_id_(actor_id), worker_id_(worker_id) { + } + void on_ready(int query, int result) override { + send_closure(actor_id_, &Manager::on_ready, worker_id_, query, result); + } + void on_closed() override { + send_closure_later(actor_id_, &Manager::on_closed, worker_id_); + } + + private: + ActorId<Manager> actor_id_; + int worker_id_; + }; + + void start_up() override { + ref_cnt_ = static_cast<int>(workers_.size()); + int i = 0; + for (auto &worker : workers_) { + ref_cnt_++; + send_closure_later(worker, &PowerWorker::set_callback, make_unique<Callback>(actor_id(this), i)); + i++; + send_closure_later(worker, &PowerWorker::task, 3, query_size_); + left_query_--; + } + } + + void on_ready(int worker_id, int query, int res) { + ref_cnt_--; + if (left_query_ == 0) { + send_closure(workers_[worker_id], &PowerWorker::close); + } else { + ref_cnt_++; + send_closure(workers_[worker_id], &PowerWorker::task, 3, query_size_); + left_query_--; + } + } + + void on_closed(int worker_id) { + ref_cnt_--; + if (ref_cnt_ == 0) { + Scheduler::instance()->finish(); + stop(); + } + } + + private: + std::vector<ActorId<PowerWorker>> workers_; + int left_query_; + int ref_cnt_; + int query_size_; +}; + +void test_workers(int threads_n, int workers_n, int queries_n, int query_size) { + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); + + ConcurrentScheduler sched; + sched.init(threads_n); + + std::vector<ActorId<PowerWorker>> workers; + for (int i = 0; i < workers_n; i++) { + int thread_id = threads_n ? i % (threads_n - 1) + 2 : 0; + workers.push_back(sched.create_actor_unsafe<PowerWorker>(thread_id, "worker" + to_string(i)).release()); + } + sched.create_actor_unsafe<Manager>(threads_n ? 1 : 0, "manager", queries_n, query_size, std::move(workers)).release(); + + sched.start(); + while (sched.run_main(10)) { + // empty + } + sched.finish(); + + // sched.test_one_thread_run(); +} +} // namespace + +TEST(Actors, workers_big_query_one_thread) { + test_workers(0, 10, 1000, 300000); +} + +TEST(Actors, workers_big_query_two_threads) { + test_workers(2, 10, 1000, 300000); +} + +TEST(Actors, workers_big_query_nine_threads) { + test_workers(9, 10, 1000, 300000); +} + +TEST(Actors, workers_small_query_one_thread) { + test_workers(0, 10, 1000000, 1); +} + +TEST(Actors, workers_small_query_two_threads) { + test_workers(2, 10, 1000000, 1); +} + +TEST(Actors, workers_small_query_nine_threads) { + test_workers(9, 10, 1000000, 1); +} |