summaryrefslogtreecommitdiff
path: root/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp')
-rw-r--r--protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp322
1 files changed, 183 insertions, 139 deletions
diff --git a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp
index ffceacc595..628b74a94c 100644
--- a/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp
+++ b/protocols/Telegram/tdlib/td/tdactor/test/actors_main.cpp
@@ -1,35 +1,32 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
-#include "td/utils/tests.h"
-
#include "td/actor/actor.h"
+#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/PromiseFuture.h"
+#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/Random.h"
+#include "td/utils/ScopeGuard.h"
+#include "td/utils/tests.h"
#include <limits>
#include <map>
+#include <memory>
#include <utility>
-using namespace td;
-
-REGISTER_TESTS(actors_main);
-
-namespace {
-
template <class ContainerT>
static typename ContainerT::value_type &rand_elem(ContainerT &cont) {
CHECK(0 < cont.size() && cont.size() <= static_cast<size_t>(std::numeric_limits<int>::max()));
- return cont[Random::fast(0, static_cast<int>(cont.size()) - 1)];
+ return cont[td::Random::fast(0, static_cast<int>(cont.size()) - 1)];
}
-static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) {
- uint32 res = 1;
+static td::uint32 fast_pow_mod_uint32(td::uint32 x, td::uint32 p) {
+ td::uint32 res = 1;
while (p) {
if (p & 1) {
res *= x;
@@ -40,25 +37,25 @@ static uint32 fast_pow_mod_uint32(uint32 x, uint32 p) {
return res;
}
-static uint32 slow_pow_mod_uint32(uint32 x, uint32 p) {
- uint32 res = 1;
- for (uint32 i = 0; i < p; i++) {
+static td::uint32 slow_pow_mod_uint32(td::uint32 x, td::uint32 p) {
+ td::uint32 res = 1;
+ for (td::uint32 i = 0; i < p; i++) {
res *= x;
}
return res;
}
-struct Query {
- uint32 query_id;
- uint32 result;
- std::vector<int> todo;
- Query() = default;
- Query(const Query &) = delete;
- Query &operator=(const Query &) = delete;
- Query(Query &&) = default;
- Query &operator=(Query &&) = default;
- ~Query() {
- CHECK(todo.empty()) << "Query lost";
+struct ActorQuery {
+ td::uint32 query_id{};
+ td::uint32 result{};
+ td::vector<int> todo;
+ ActorQuery() = default;
+ ActorQuery(const ActorQuery &) = delete;
+ ActorQuery &operator=(const ActorQuery &) = delete;
+ ActorQuery(ActorQuery &&) = default;
+ ActorQuery &operator=(ActorQuery &&) = default;
+ ~ActorQuery() {
+ LOG_CHECK(todo.empty()) << "ActorQuery lost";
}
int next_pow() {
CHECK(!todo.empty());
@@ -71,25 +68,25 @@ struct Query {
}
};
-static uint32 fast_calc(Query &q) {
- uint32 result = q.result;
+static td::uint32 fast_calc(ActorQuery &q) {
+ td::uint32 result = q.result;
for (auto x : q.todo) {
result = fast_pow_mod_uint32(result, x);
}
return result;
}
-class Worker final : public Actor {
+class Worker final : public td::Actor {
public:
explicit Worker(int threads_n) : threads_n_(threads_n) {
}
- void query(PromiseActor<uint32> &&promise, uint32 x, uint32 p) {
- uint32 result = slow_pow_mod_uint32(x, p);
+ void query(td::PromiseActor<td::uint32> &&promise, td::uint32 x, td::uint32 p) {
+ td::uint32 result = slow_pow_mod_uint32(x, p);
promise.set_value(std::move(result));
(void)threads_n_;
- // if (threads_n_ > 1 && Random::fast(0, 9) == 0) {
- // migrate(Random::fast(2, threads_n));
+ // if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) {
+ // migrate(td::Random::fast(2, threads_n));
//}
}
@@ -97,7 +94,7 @@ class Worker final : public Actor {
int threads_n_;
};
-class QueryActor final : public Actor {
+class QueryActor final : public td::Actor {
public:
class Callback {
public:
@@ -107,44 +104,46 @@ class QueryActor final : public Actor {
Callback(Callback &&) = delete;
Callback &operator=(Callback &&) = delete;
virtual ~Callback() = default;
- virtual void on_result(Query &&query) = 0;
+ virtual void on_result(ActorQuery &&query) = 0;
virtual void on_closed() = 0;
};
explicit QueryActor(int threads_n) : threads_n_(threads_n) {
}
- void set_callback(std::unique_ptr<Callback> callback) {
+ void set_callback(td::unique_ptr<Callback> callback) {
callback_ = std::move(callback);
}
- void set_workers(std::vector<ActorId<Worker>> workers) {
+ void set_workers(td::vector<td::ActorId<Worker>> workers) {
workers_ = std::move(workers);
}
- void query(Query &&query) {
- uint32 x = query.result;
- uint32 p = query.next_pow();
- if (Random::fast(0, 3) && (p <= 1000 || workers_.empty())) {
+ void query(ActorQuery &&query) {
+ td::uint32 x = query.result;
+ td::uint32 p = query.next_pow();
+ if (td::Random::fast(0, 3) && (p <= 1000 || workers_.empty())) {
query.result = slow_pow_mod_uint32(x, p);
callback_->on_result(std::move(query));
} else {
- auto future = send_promise(rand_elem(workers_), Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, x, p);
+ auto future = td::Random::fast(0, 3) == 0
+ ? td::send_promise<td::ActorSendType::Immediate>(rand_elem(workers_), &Worker::query, x, p)
+ : td::send_promise<td::ActorSendType::Later>(rand_elem(workers_), &Worker::query, x, p);
if (future.is_ready()) {
query.result = future.move_as_ok();
callback_->on_result(std::move(query));
} else {
- future.set_event(EventCreator::raw(actor_id(), query.query_id));
+ future.set_event(td::EventCreator::raw(actor_id(), query.query_id));
auto query_id = query.query_id;
- pending_.insert(std::make_pair(query_id, std::make_pair(std::move(future), std::move(query))));
+ pending_.emplace(query_id, std::make_pair(std::move(future), std::move(query)));
}
}
- if (threads_n_ > 1 && Random::fast(0, 9) == 0) {
- migrate(Random::fast(2, threads_n_));
+ if (threads_n_ > 1 && td::Random::fast(0, 9) == 0) {
+ migrate(td::Random::fast(2, threads_n_));
}
}
- void raw_event(const Event::Raw &event) override {
- uint32 id = event.u32;
+ void raw_event(const td::Event::Raw &event) final {
+ td::uint32 id = event.u32;
auto it = pending_.find(id);
auto future = std::move(it->second.first);
auto query = std::move(it->second.second);
@@ -159,44 +158,44 @@ class QueryActor final : public Actor {
stop();
}
- void on_start_migrate(int32 sched_id) override {
+ void on_start_migrate(td::int32 sched_id) final {
for (auto &it : pending_) {
start_migrate(it.second.first, sched_id);
}
}
- void on_finish_migrate() override {
+ void on_finish_migrate() final {
for (auto &it : pending_) {
finish_migrate(it.second.first);
}
}
private:
- unique_ptr<Callback> callback_;
- std::map<uint32, std::pair<FutureActor<uint32>, Query>> pending_;
- std::vector<ActorId<Worker>> workers_;
+ td::unique_ptr<Callback> callback_;
+ std::map<td::uint32, std::pair<td::FutureActor<td::uint32>, ActorQuery>> pending_;
+ td::vector<td::ActorId<Worker>> workers_;
int threads_n_;
};
-class MainQueryActor final : public Actor {
- class QueryActorCallback : public QueryActor::Callback {
+class MainQueryActor final : public td::Actor {
+ class QueryActorCallback final : public QueryActor::Callback {
public:
- void on_result(Query &&query) override {
+ void on_result(ActorQuery &&query) final {
if (query.ready()) {
send_closure(parent_id_, &MainQueryActor::on_result, std::move(query));
} else {
send_closure(next_solver_, &QueryActor::query, std::move(query));
}
}
- void on_closed() override {
+ void on_closed() final {
send_closure(parent_id_, &MainQueryActor::on_closed);
}
- QueryActorCallback(ActorId<MainQueryActor> parent_id, ActorId<QueryActor> next_solver)
+ QueryActorCallback(td::ActorId<MainQueryActor> parent_id, td::ActorId<QueryActor> next_solver)
: parent_id_(parent_id), next_solver_(next_solver) {
}
private:
- ActorId<MainQueryActor> parent_id_;
- ActorId<QueryActor> next_solver_;
+ td::ActorId<MainQueryActor> parent_id_;
+ td::ActorId<QueryActor> next_solver_;
};
const int ACTORS_CNT = 10;
@@ -206,39 +205,39 @@ class MainQueryActor final : public Actor {
explicit MainQueryActor(int threads_n) : threads_n_(threads_n) {
}
- void start_up() override {
+ void start_up() final {
actors_.resize(ACTORS_CNT);
for (auto &actor : actors_) {
- auto actor_ptr = make_unique<QueryActor>(threads_n_);
- actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0)
+ auto actor_ptr = td::make_unique<QueryActor>(threads_n_);
+ actor = register_actor("QueryActor", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0)
.release();
}
workers_.resize(WORKERS_CNT);
for (auto &worker : workers_) {
- auto actor_ptr = make_unique<Worker>(threads_n_);
- worker =
- register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release();
+ auto actor_ptr = td::make_unique<Worker>(threads_n_);
+ worker = register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0)
+ .release();
}
for (int i = 0; i < ACTORS_CNT; i++) {
ref_cnt_++;
send_closure(actors_[i], &QueryActor::set_callback,
- make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT]));
+ td::make_unique<QueryActorCallback>(actor_id(this), actors_[(i + 1) % ACTORS_CNT]));
send_closure(actors_[i], &QueryActor::set_workers, workers_);
}
yield();
}
- void on_result(Query &&query) {
+ void on_result(ActorQuery &&query) {
CHECK(query.ready());
CHECK(query.result == expected_[query.query_id]);
in_cnt_++;
wakeup();
}
- Query create_query() {
- Query q;
+ ActorQuery create_query() {
+ ActorQuery q;
q.query_id = (query_id_ += 2);
q.result = q.query_id;
q.todo = {1, 1, 1, 1, 1, 1, 1, 1, 10000};
@@ -249,14 +248,14 @@ class MainQueryActor final : public Actor {
void on_closed() {
ref_cnt_--;
if (ref_cnt_ == 0) {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
}
- void wakeup() override {
- int cnt = 100000;
+ void wakeup() final {
+ int cnt = 10000;
while (out_cnt_ < in_cnt_ + 100 && out_cnt_ < cnt) {
- if (Random::fast(0, 1)) {
+ if (td::Random::fast_bool()) {
send_closure(rand_elem(actors_), &QueryActor::query, create_query());
} else {
send_closure_later(rand_elem(actors_), &QueryActor::query, create_query());
@@ -273,9 +272,9 @@ class MainQueryActor final : public Actor {
}
private:
- std::map<uint32, uint32> expected_;
- std::vector<ActorId<QueryActor>> actors_;
- std::vector<ActorId<Worker>> workers_;
+ std::map<td::uint32, td::uint32> expected_;
+ td::vector<td::ActorId<QueryActor>> actors_;
+ td::vector<td::ActorId<Worker>> workers_;
int out_cnt_ = 0;
int in_cnt_ = 0;
int query_id_ = 1;
@@ -283,101 +282,104 @@ class MainQueryActor final : public Actor {
int threads_n_;
};
-class SimpleActor final : public Actor {
+class SimpleActor final : public td::Actor {
public:
- explicit SimpleActor(int32 threads_n) : threads_n_(threads_n) {
+ explicit SimpleActor(td::int32 threads_n) : threads_n_(threads_n) {
}
- void start_up() override {
- auto actor_ptr = make_unique<Worker>(threads_n_);
+ void start_up() final {
+ auto actor_ptr = td::make_unique<Worker>(threads_n_);
worker_ =
- register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? Random::fast(2, threads_n_) : 0).release();
+ register_actor("Worker", std::move(actor_ptr), threads_n_ > 1 ? td::Random::fast(2, threads_n_) : 0).release();
yield();
}
- void wakeup() override {
- if (q_ == 100000) {
- Scheduler::instance()->finish();
+ void wakeup() final {
+ if (q_ == 10000) {
+ td::Scheduler::instance()->finish();
stop();
return;
}
q_++;
- p_ = Random::fast(0, 1) ? 1 : 10000;
- auto future = send_promise(worker_, Random::fast(0, 3) == 0 ? 0 : Send::later, &Worker::query, q_, p_);
+ p_ = td::Random::fast_bool() ? 1 : 10000;
+ auto future = td::Random::fast(0, 3) == 0
+ ? td::send_promise<td::ActorSendType::Immediate>(worker_, &Worker::query, q_, p_)
+ : td::send_promise<td::ActorSendType::Later>(worker_, &Worker::query, q_, p_);
if (future.is_ready()) {
auto result = future.move_as_ok();
CHECK(result == fast_pow_mod_uint32(q_, p_));
yield();
} else {
- future.set_event(EventCreator::raw(actor_id(), nullptr));
+ future.set_event(td::EventCreator::raw(actor_id(), nullptr));
future_ = std::move(future);
}
- // if (threads_n_ > 1 && Random::fast(0, 2) == 0) {
- // migrate(Random::fast(1, threads_n));
+ // if (threads_n_ > 1 && td::Random::fast(0, 2) == 0) {
+ // migrate(td::Random::fast(1, threads_n));
//}
}
- void raw_event(const Event::Raw &event) override {
+ void raw_event(const td::Event::Raw &event) final {
auto result = future_.move_as_ok();
CHECK(result == fast_pow_mod_uint32(q_, p_));
yield();
}
- void on_start_migrate(int32 sched_id) override {
+ void on_start_migrate(td::int32 sched_id) final {
start_migrate(future_, sched_id);
}
- void on_finish_migrate() override {
+ void on_finish_migrate() final {
finish_migrate(future_);
}
private:
- int32 threads_n_;
- ActorId<Worker> worker_;
- FutureActor<uint32> future_;
- uint32 q_ = 1;
- uint32 p_;
+ td::int32 threads_n_;
+ td::ActorId<Worker> worker_;
+ td::FutureActor<td::uint32> future_;
+ td::uint32 q_ = 1;
+ td::uint32 p_ = 0;
};
-} // namespace
-class SendToDead : public Actor {
+class SendToDead final : public td::Actor {
public:
- class Parent : public Actor {
+ class Parent final : public td::Actor {
public:
- explicit Parent(ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) {
+ explicit Parent(td::ActorShared<> parent, int ttl = 3) : parent_(std::move(parent)), ttl_(ttl) {
}
- void start_up() override {
- set_timeout_in(Random::fast_uint32() % 3 * 0.001);
+ void start_up() final {
+ set_timeout_in(td::Random::fast_uint32() % 3 * 0.001);
if (ttl_ != 0) {
- child_ = create_actor_on_scheduler<Parent>(
- "Child", Random::fast_uint32() % Scheduler::instance()->sched_count(), actor_shared(), ttl_ - 1);
+ child_ = td::create_actor_on_scheduler<Parent>(
+ "Child", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), actor_shared(this),
+ ttl_ - 1);
}
}
- void timeout_expired() override {
+ void timeout_expired() final {
stop();
}
private:
- ActorOwn<Parent> child_;
- ActorShared<> parent_;
+ td::ActorOwn<Parent> child_;
+ td::ActorShared<> parent_;
int ttl_;
};
- void start_up() override {
+ void start_up() final {
for (int i = 0; i < 2000; i++) {
- create_actor_on_scheduler<Parent>("Parent", Random::fast_uint32() % Scheduler::instance()->sched_count(),
- create_reference(), 4)
+ td::create_actor_on_scheduler<Parent>(
+ "Parent", td::Random::fast_uint32() % td::Scheduler::instance()->sched_count(), create_reference(), 4)
.release();
}
}
- ActorShared<> create_reference() {
+ td::ActorShared<> create_reference() {
ref_cnt_++;
- return actor_shared();
+ return actor_shared(this);
}
- void hangup_shared() override {
+
+ void hangup_shared() final {
ref_cnt_--;
if (ref_cnt_ == 0) {
ttl_--;
if (ttl_ <= 0) {
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
stop();
} else {
start_up();
@@ -385,18 +387,17 @@ class SendToDead : public Actor {
}
}
- uint32 ttl_{50};
- uint32 ref_cnt_{0};
+ td::uint32 ttl_{50};
+ td::uint32 ref_cnt_{0};
};
TEST(Actors, send_to_dead) {
//TODO: fix CHECK(storage_count_.load() == 0)
return;
- ConcurrentScheduler sched;
int threads_n = 5;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(threads_n, 0);
- sched.create_actor_unsafe<SendToDead>(0, "manager").release();
+ sched.create_actor_unsafe<SendToDead>(0, "SendToDead").release();
sched.start();
while (sched.run_main(10)) {
// empty
@@ -405,11 +406,8 @@ TEST(Actors, send_to_dead) {
}
TEST(Actors, main_simple) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
-
- ConcurrentScheduler sched;
int threads_n = 3;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(threads_n, 0);
sched.create_actor_unsafe<SimpleActor>(threads_n > 1 ? 1 : 0, "simple", threads_n).release();
sched.start();
@@ -420,13 +418,10 @@ TEST(Actors, main_simple) {
}
TEST(Actors, main) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
-
- ConcurrentScheduler sched;
int threads_n = 9;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(threads_n, 0);
- sched.create_actor_unsafe<MainQueryActor>(threads_n > 1 ? 1 : 0, "manager", threads_n).release();
+ sched.create_actor_unsafe<MainQueryActor>(threads_n > 1 ? 1 : 0, "MainQuery", threads_n).release();
sched.start();
while (sched.run_main(10)) {
// empty
@@ -434,27 +429,76 @@ TEST(Actors, main) {
sched.finish();
}
-class DoAfterStop : public Actor {
+class DoAfterStop final : public td::Actor {
public:
- void loop() override {
- ptr = std::make_unique<int>(10);
+ void loop() final {
+ ptr = td::make_unique<int>(10);
stop();
CHECK(*ptr == 10);
- Scheduler::instance()->finish();
+ td::Scheduler::instance()->finish();
}
private:
- std::unique_ptr<int> ptr;
+ td::unique_ptr<int> ptr;
};
TEST(Actors, do_after_stop) {
- SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
+ int threads_n = 0;
+ td::ConcurrentScheduler sched(threads_n, 0);
- ConcurrentScheduler sched;
+ sched.create_actor_unsafe<DoAfterStop>(0, "DoAfterStop").release();
+ sched.start();
+ while (sched.run_main(10)) {
+ // empty
+ }
+ sched.finish();
+}
+
+class XContext final : public td::ActorContext {
+ public:
+ td::int32 get_id() const final {
+ return 123456789;
+ }
+
+ void validate() {
+ CHECK(x == 1234);
+ }
+ ~XContext() final {
+ x = 0;
+ }
+ int x = 1234;
+};
+
+class WithXContext final : public td::Actor {
+ public:
+ void start_up() final {
+ auto old_context = set_context(std::make_shared<XContext>());
+ }
+ void f(td::unique_ptr<td::Guard> guard) {
+ }
+ void close() {
+ stop();
+ }
+};
+
+static void check_context() {
+ auto ptr = static_cast<XContext *>(td::Scheduler::context());
+ CHECK(ptr != nullptr);
+ ptr->validate();
+}
+
+TEST(Actors, context_during_destruction) {
int threads_n = 0;
- sched.init(threads_n);
+ td::ConcurrentScheduler sched(threads_n, 0);
- sched.create_actor_unsafe<DoAfterStop>(0, "manager").release();
+ {
+ auto guard = sched.get_main_guard();
+ auto with_context = td::create_actor<WithXContext>("WithXContext").release();
+ send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); }));
+ send_closure_later(with_context, &WithXContext::close);
+ send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { check_context(); }));
+ send_closure(with_context, &WithXContext::f, td::create_lambda_guard([] { td::Scheduler::instance()->finish(); }));
+ }
sched.start();
while (sched.run_main(10)) {
// empty