diff options
author | George Hazan <ghazan@miranda.im> | 2022-11-30 17:48:47 +0300 |
---|---|---|
committer | George Hazan <ghazan@miranda.im> | 2022-11-30 17:48:47 +0300 |
commit | 0ece30dc7c0e34b4c5911969b8fa99c33c6d023c (patch) | |
tree | 671325d3fec09b999411e4e3ab84ef8259261818 /protocols/Telegram/tdlib/td/benchmark/bench_queue.cpp | |
parent | 46c53ffc6809c67e4607e99951a2846c382b63b2 (diff) |
Telegram: update for TDLIB
Diffstat (limited to 'protocols/Telegram/tdlib/td/benchmark/bench_queue.cpp')
-rw-r--r-- | protocols/Telegram/tdlib/td/benchmark/bench_queue.cpp | 400 |
1 files changed, 195 insertions, 205 deletions
diff --git a/protocols/Telegram/tdlib/td/benchmark/bench_queue.cpp b/protocols/Telegram/tdlib/td/benchmark/bench_queue.cpp index 13288e6cd7..6f7cf20bc9 100644 --- a/protocols/Telegram/tdlib/td/benchmark/bench_queue.cpp +++ b/protocols/Telegram/tdlib/td/benchmark/bench_queue.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -8,47 +8,56 @@ #include "td/utils/common.h" #include "td/utils/logging.h" #include "td/utils/MpscPollableQueue.h" +#include "td/utils/port/sleep.h" +#include "td/utils/port/thread.h" #include "td/utils/queue.h" +#include "td/utils/Random.h" // TODO: check system calls // TODO: all return values must be checked #include <atomic> -#include <cstdio> -#include <cstdlib> -#include <vector> +#if TD_PORT_POSIX #include <pthread.h> #include <sched.h> #include <semaphore.h> #include <sys/syscall.h> #include <unistd.h> +#endif #if TD_LINUX #include <sys/eventfd.h> #endif -using std::atomic; -using std::vector; - -using td::int32; -using td::uint32; - #define MODE std::memory_order_relaxed // void set_affinity(int mask) { -// int err, syscallres; -// pid_t pid = gettid(); -// syscallres = syscall(__NR_sched_setaffinity, pid, sizeof(mask), &mask); -// if (syscallres) { -// err = errno; -// perror("oppa"); -//} -//} - -// TODO: warnings and asserts. There should be no warnings or debug output in production. +// pid_t pid = gettid(); +// int syscallres = syscall(__NR_sched_setaffinity, pid, sizeof(mask), &mask); +// if (syscallres) { +// perror("Failed to set affinity"); +// } +// } + using qvalue_t = int; +class Backoff { + int cnt = 0; + + public: + bool next() { + cnt++; + if (cnt < 50) { + return true; + } else { + td::usleep_for(1); + return cnt < 500; + } + } +}; + +#if TD_PORT_POSIX // Just for testing, not production class PipeQueue { int input; @@ -57,18 +66,21 @@ class PipeQueue { public: void init() { int new_pipe[2]; - pipe(new_pipe); + int res = pipe(new_pipe); + CHECK(res == 0); output = new_pipe[0]; input = new_pipe[1]; } void put(qvalue_t value) { - write(input, &value, sizeof(value)); + auto len = write(input, &value, sizeof(value)); + CHECK(len == sizeof(value)); } qvalue_t get() { qvalue_t res; - read(output, &res, sizeof(res)); + auto len = read(output, &res, sizeof(res)); + CHECK(len == sizeof(res)); return res; } @@ -78,26 +90,8 @@ class PipeQueue { } }; -class Backoff { - int cnt; - - public: - Backoff() : cnt(0) { - } - - bool next() { - cnt++; - if (cnt < 50) { - return true; - } else { - sched_yield(); - return cnt < 500; - } - } -}; - class VarQueue { - atomic<qvalue_t> data; + std::atomic<qvalue_t> data{0}; public: void init() { @@ -179,6 +173,7 @@ class SemQueue { return get(); } }; +#endif #if TD_LINUX class EventfdQueue { @@ -193,11 +188,14 @@ class EventfdQueue { void put(qvalue_t value) { q.put(value); td::int64 x = 1; - write(fd, &x, sizeof(x)); + auto len = write(fd, &x, sizeof(x)); + CHECK(len == sizeof(x)); } qvalue_t get() { td::int64 x; - read(fd, &x, sizeof(x)); + auto len = read(fd, &x, sizeof(x)); + CHECK(len == sizeof(x)); + CHECK(x == 1); return q.get(); } void destroy() { @@ -212,17 +210,17 @@ const int queue_buf_size = 1 << 10; class BufferQueue { struct node { qvalue_t val; - char pad[64 - sizeof(atomic<qvalue_t>)]; + char pad[64 - sizeof(std::atomic<qvalue_t>)]; }; node q[queue_buf_size]; struct Position { - atomic<uint32> i; - char pad[64 - sizeof(atomic<uint32>)]; + std::atomic<td::uint32> i{0}; + char pad[64 - sizeof(std::atomic<td::uint32>)]; - uint32 local_read_i; - uint32 local_write_i; - char pad2[64 - sizeof(uint32) * 2]; + td::uint32 local_read_i; + td::uint32 local_write_i; + char pad2[64 - sizeof(td::uint32) * 2]; void init() { i = 0; @@ -318,8 +316,7 @@ class BufferQueue { return; } if (!update_writer()) { - std::fprintf(stderr, "put strong failed\n"); - std::exit(0); + LOG(FATAL) << "Put strong failed"; } put_unsafe(val); } @@ -336,7 +333,7 @@ class BufferQueue { #if TD_LINUX class BufferedFdQueue { int fd; - atomic<int> wait_flag; + std::atomic<int> wait_flag{0}; BufferQueue q; char pad[64]; @@ -351,7 +348,8 @@ class BufferedFdQueue { td::int64 x = 1; __sync_synchronize(); if (wait_flag.load(MODE)) { - write(fd, &x, sizeof(x)); + auto len = write(fd, &x, sizeof(x)); + CHECK(len == sizeof(x)); } } void put_noflush(qvalue_t value) { @@ -362,7 +360,8 @@ class BufferedFdQueue { td::int64 x = 1; __sync_synchronize(); if (wait_flag.load(MODE)) { - write(fd, &x, sizeof(x)); + auto len = write(fd, &x, sizeof(x)); + CHECK(len == sizeof(x)); } } void flush_reader() { @@ -393,7 +392,8 @@ class BufferedFdQueue { wait_flag.store(1, MODE); __sync_synchronize(); while (!(res = q.update_reader())) { - read(fd, &x, sizeof(x)); + auto len = read(fd, &x, sizeof(x)); + CHECK(len == sizeof(x)); __sync_synchronize(); } wait_flag.store(0, MODE); @@ -416,7 +416,8 @@ class BufferedFdQueue { wait_flag.store(1, MODE); __sync_synchronize(); while (!q.update_reader()) { - read(fd, &x, sizeof(x)); + auto len = read(fd, &x, sizeof(x)); + CHECK(len == sizeof(x)); __sync_synchronize(); } wait_flag.store(0, MODE); @@ -430,7 +431,7 @@ class BufferedFdQueue { class FdQueue { int fd; - atomic<int> wait_flag; + std::atomic<int> wait_flag{0}; VarQueue q; char pad[64]; @@ -445,12 +446,14 @@ class FdQueue { td::int64 x = 1; __sync_synchronize(); if (wait_flag.load(MODE)) { - write(fd, &x, sizeof(x)); + auto len = write(fd, &x, sizeof(x)); + CHECK(len == sizeof(x)); } } qvalue_t get() { // td::int64 x; - // read(fd, &x, sizeof(x)); + // auto len = read(fd, &x, sizeof(x)); + // CHECK(len == sizeof(x)); // return q.get(); Backoff backoff; @@ -466,14 +469,13 @@ class FdQueue { td::int64 x; wait_flag.store(1, MODE); __sync_synchronize(); - // std::fprintf(stderr, "!\n"); - // while (res == -1 && read(fd, &x, sizeof(x))) { - // res = q.try_get(); - //} + // while (res == -1 && read(fd, &x, sizeof(x)) == sizeof(x)) { + // res = q.try_get(); + // } do { __sync_synchronize(); res = q.try_get(); - } while (res == -1 && read(fd, &x, sizeof(x))); + } while (res == -1 && read(fd, &x, sizeof(x)) == sizeof(x)); q.acquire(); wait_flag.store(0, MODE); return res; @@ -485,6 +487,7 @@ class FdQueue { }; #endif +#if TD_PORT_POSIX class SemBackoffQueue { sem_t sem; VarQueue q; @@ -554,46 +557,40 @@ class SemCheatQueue { }; template <class QueueT> -class QueueBenchmark2 : public td::Benchmark { +class QueueBenchmark2 final : public td::Benchmark { QueueT client, server; int connections_n, queries_n; int server_active_connections; int client_active_connections; - vector<td::int64> server_conn; - vector<td::int64> client_conn; + td::vector<td::int64> server_conn; + td::vector<td::int64> client_conn; + + td::string name; public: - explicit QueueBenchmark2(int connections_n = 1) : connections_n(connections_n) { + QueueBenchmark2(int connections_n, td::string name) : connections_n(connections_n), name(std::move(name)) { } - std::string get_description() const override { - return "QueueBenchmark2"; + td::string get_description() const final { + return name; } - void start_up() override { + void start_up() final { client.init(); server.init(); } - void tear_down() override { + void tear_down() final { client.destroy(); server.destroy(); } void server_process(qvalue_t value) { int no = value & 0x00FFFFFF; - int co = static_cast<int>(static_cast<unsigned int>(value) >> 24); - // std::fprintf(stderr, "-->%d %d\n", co, no); - if (co < 0 || co >= connections_n || no != server_conn[co]++) { - std::fprintf(stderr, "%d %d\n", co, no); - std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(server_conn[co] - 1)); - std::fprintf(stderr, "Server BUG\n"); - while (true) { - } - } - // std::fprintf(stderr, "no = %d/%d\n", no, queries_n); - // std::fprintf(stderr, "answer: %d %d\n", no, co); + auto co = static_cast<int>(static_cast<td::uint32>(value) >> 24); + CHECK(co >= 0 && co < connections_n); + CHECK(no == server_conn[co]++); client.writer_put(value); client.writer_flush(); @@ -603,15 +600,12 @@ class QueueBenchmark2 : public td::Benchmark { } void *server_run(void *) { - server_conn = vector<td::int64>(connections_n); + server_conn = td::vector<td::int64>(connections_n); server_active_connections = connections_n; while (server_active_connections > 0) { int cnt = server.reader_wait(); - if (cnt == 0) { - std::fprintf(stderr, "ERROR!\n"); - std::exit(0); - } + CHECK(cnt != 0); while (cnt-- > 0) { server_process(server.reader_get_unsafe()); server.reader_flush(); @@ -624,18 +618,10 @@ class QueueBenchmark2 : public td::Benchmark { void client_process(qvalue_t value) { int no = value & 0x00FFFFFF; - int co = static_cast<int>(static_cast<unsigned int>(value) >> 24); - // std::fprintf(stderr, "<--%d %d\n", co, no); - if (co < 0 || co >= connections_n || no != client_conn[co]++) { - std::fprintf(stderr, "%d %d\n", co, no); - std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(client_conn[co] - 1)); - std::fprintf(stderr, "BUG\n"); - while (true) { - } - std::exit(0); - } + auto co = static_cast<int>(static_cast<td::uint32>(value) >> 24); + CHECK(co >= 0 && co < connections_n); + CHECK(no == client_conn[co]++); if (no + 1 < queries_n) { - // std::fprintf(stderr, "query: %d %d\n", no + 1, co); server.writer_put(value + 1); server.writer_flush(); } else { @@ -644,12 +630,9 @@ class QueueBenchmark2 : public td::Benchmark { } void *client_run(void *) { - client_conn = vector<td::int64>(connections_n); + client_conn = td::vector<td::int64>(connections_n); client_active_connections = connections_n; - if (queries_n >= (1 << 24)) { - std::fprintf(stderr, "Too big queries_n\n"); - std::exit(0); - } + CHECK(queries_n < (1 << 24)); for (int i = 0; i < connections_n; i++) { server.writer_put(static_cast<qvalue_t>(i) << 24); @@ -658,10 +641,7 @@ class QueueBenchmark2 : public td::Benchmark { while (client_active_connections > 0) { int cnt = client.reader_wait(); - if (cnt == 0) { - std::fprintf(stderr, "ERROR!\n"); - std::exit(0); - } + CHECK(cnt != 0); while (cnt-- > 0) { client_process(client.reader_get_unsafe()); client.reader_flush(); @@ -681,7 +661,7 @@ class QueueBenchmark2 : public td::Benchmark { return static_cast<QueueBenchmark2 *>(arg)->server_run(nullptr); } - void run(int n) override { + void run(int n) final { pthread_t client_thread_id; pthread_t server_thread_id; @@ -696,45 +676,40 @@ class QueueBenchmark2 : public td::Benchmark { }; template <class QueueT> -class QueueBenchmark : public td::Benchmark { +class QueueBenchmark final : public td::Benchmark { QueueT client, server; const int connections_n; int queries_n; + td::string name; + public: - explicit QueueBenchmark(int connections_n = 1) : connections_n(connections_n) { + QueueBenchmark(int connections_n, td::string name) : connections_n(connections_n), name(std::move(name)) { } - std::string get_description() const override { - return "QueueBenchmark"; + td::string get_description() const final { + return name; } - void start_up() override { + void start_up() final { client.init(); server.init(); } - void tear_down() override { + void tear_down() final { client.destroy(); server.destroy(); } void *server_run(void *) { - vector<td::int64> conn(connections_n); + td::vector<td::int64> conn(connections_n); int active_connections = connections_n; while (active_connections > 0) { qvalue_t value = server.get(); int no = value & 0x00FFFFFF; - int co = static_cast<int>(value >> 24); - // std::fprintf(stderr, "-->%d %d\n", co, no); - if (co < 0 || co >= connections_n || no != conn[co]++) { - std::fprintf(stderr, "%d %d\n", co, no); - std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(conn[co] - 1)); - std::fprintf(stderr, "Server BUG\n"); - while (true) { - } - } - // std::fprintf(stderr, "no = %d/%d\n", no, queries_n); + auto co = static_cast<int>(value >> 24); + CHECK(co >= 0 && co < connections_n); + CHECK(no == conn[co]++); client.put(value); if (no + 1 >= queries_n) { active_connections--; @@ -744,11 +719,8 @@ class QueueBenchmark : public td::Benchmark { } void *client_run(void *) { - vector<td::int64> conn(connections_n); - if (queries_n >= (1 << 24)) { - std::fprintf(stderr, "Too big queries_n\n"); - std::exit(0); - } + td::vector<td::int64> conn(connections_n); + CHECK(queries_n < (1 << 24)); for (int i = 0; i < connections_n; i++) { server.put(static_cast<qvalue_t>(i) << 24); } @@ -756,16 +728,9 @@ class QueueBenchmark : public td::Benchmark { while (active_connections > 0) { qvalue_t value = client.get(); int no = value & 0x00FFFFFF; - int co = static_cast<int>(value >> 24); - // std::fprintf(stderr, "<--%d %d\n", co, no); - if (co < 0 || co >= connections_n || no != conn[co]++) { - std::fprintf(stderr, "%d %d\n", co, no); - std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(conn[co] - 1)); - std::fprintf(stderr, "BUG\n"); - while (true) { - } - std::exit(0); - } + auto co = static_cast<int>(value >> 24); + CHECK(co >= 0 && co < connections_n); + CHECK(no == conn[co]++); if (no + 1 < queries_n) { server.put(value + 1); } else { @@ -777,28 +742,18 @@ class QueueBenchmark : public td::Benchmark { } void *client_run2(void *) { - vector<td::int64> conn(connections_n); - if (queries_n >= (1 << 24)) { - std::fprintf(stderr, "Too big queries_n\n"); - std::exit(0); - } - for (int it = 0; it < queries_n; it++) { + td::vector<td::int64> conn(connections_n); + CHECK(queries_n < (1 << 24)); + for (int query = 0; query < queries_n; query++) { for (int i = 0; i < connections_n; i++) { - server.put((static_cast<td::int64>(i) << 24) + it); + server.put((static_cast<td::int64>(i) << 24) + query); } for (int i = 0; i < connections_n; i++) { qvalue_t value = client.get(); int no = value & 0x00FFFFFF; - int co = static_cast<int>(value >> 24); - // std::fprintf(stderr, "<--%d %d\n", co, no); - if (co < 0 || co >= connections_n || no != conn[co]++) { - std::fprintf(stderr, "%d %d\n", co, no); - std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(conn[co] - 1)); - std::fprintf(stderr, "BUG\n"); - while (true) { - } - std::exit(0); - } + auto co = static_cast<int>(value >> 24); + CHECK(co >= 0 && co < connections_n); + CHECK(no == conn[co]++); } } // system("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq"); @@ -813,7 +768,7 @@ class QueueBenchmark : public td::Benchmark { return static_cast<QueueBenchmark *>(arg)->server_run(nullptr); } - void run(int n) override { + void run(int n) final { pthread_t client_thread_id; pthread_t server_thread_id; @@ -828,8 +783,8 @@ class QueueBenchmark : public td::Benchmark { }; template <class QueueT> -class RingBenchmark : public td::Benchmark { - enum { QN = 504 }; +class RingBenchmark final : public td::Benchmark { + static constexpr int QN = 504; struct Thread { int int_id; @@ -840,7 +795,6 @@ class RingBenchmark : public td::Benchmark { void *run() { qvalue_t value; - // std::fprintf(stderr, "start %d\n", int_id); do { int cnt = queue.reader_wait(); CHECK(cnt == 1); @@ -861,7 +815,7 @@ class RingBenchmark : public td::Benchmark { return static_cast<Thread *>(arg)->run(); } - void start_up() override { + void start_up() final { for (int i = 0; i < QN; i++) { q[i].int_id = i; q[i].queue.init(); @@ -869,18 +823,17 @@ class RingBenchmark : public td::Benchmark { } } - void tear_down() override { + void tear_down() final { for (int i = 0; i < QN; i++) { q[i].queue.destroy(); } } - void run(int n) override { + void run(int n) final { for (int i = 0; i < QN; i++) { pthread_create(&q[i].id, nullptr, run_gateway, &q[i]); } - std::fprintf(stderr, "run %d\n", n); if (n < 1000) { n = 1000; } @@ -892,52 +845,89 @@ class RingBenchmark : public td::Benchmark { } } }; +#endif + +/* +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED +static void test_queue() { + td::vector<td::thread> threads; + static constexpr size_t THREAD_COUNT = 100; + td::vector<td::MpscPollableQueue<int>> queues(THREAD_COUNT); + for (auto &q : queues) { + q.init(); + } + for (size_t i = 0; i < THREAD_COUNT; i++) { + threads.emplace_back([&q = queues[i]] { + while (true) { + auto got = q.reader_wait_nonblock(); + while (got-- > 0) { + q.reader_get_unsafe(); + } + q.reader_get_event_fd().wait(1000); + } + }); + } + + for (size_t iter = 0; iter < THREAD_COUNT; iter++) { + td::usleep_for(100); + for (int i = 0; i < 5; i++) { + queues[td::Random::fast(0, THREAD_COUNT - 1)].writer_put(1); + } + } + + for (size_t i = 0; i < THREAD_COUNT; i++) { + threads[i].join(); + } +} +#endif +*/ int main() { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); -#define BENCH_Q2(Q, N) \ - std::fprintf(stderr, "!%s %d:\t", #Q, N); \ - td::bench(QueueBenchmark2<Q>(N)); -#define BENCH_Q(Q, N) \ - std::fprintf(stderr, "%s %d:\t", #Q, N); \ - td::bench(QueueBenchmark<Q>(N)); - -#define BENCH_R(Q) \ - std::fprintf(stderr, "%s:\t", #Q); \ - td::bench(RingBenchmark<Q>()); +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + // test_queue(); +#endif + +#if TD_PORT_POSIX // TODO: yield makes it extremely slow. Yet some backoff may be necessary. - // BENCH_R(SemQueue); - // BENCH_R(td::PollQueue<qvalue_t>); + // td::bench(RingBenchmark<SemQueue>()); + // td::bench(RingBenchmark<td::PollQueue<qvalue_t>>()); - BENCH_Q2(td::PollQueue<qvalue_t>, 1); - BENCH_Q2(td::MpscPollableQueue<qvalue_t>, 1); - BENCH_Q2(td::PollQueue<qvalue_t>, 100); - BENCH_Q2(td::MpscPollableQueue<qvalue_t>, 100); - BENCH_Q2(td::PollQueue<qvalue_t>, 10); - BENCH_Q2(td::MpscPollableQueue<qvalue_t>, 10); +#define BENCH_Q2(Q, N) td::bench(QueueBenchmark2<Q<qvalue_t>>(N, #Q "(" #N ")")) - BENCH_Q(VarQueue, 1); - // BENCH_Q(FdQueue, 1); - // BENCH_Q(BufferedFdQueue, 1); - BENCH_Q(PipeQueue, 1); - BENCH_Q(SemCheatQueue, 1); - BENCH_Q(SemQueue, 1); +#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED + BENCH_Q2(td::InfBackoffQueue, 1); + BENCH_Q2(td::MpscPollableQueue, 1); + BENCH_Q2(td::PollQueue, 1); + + BENCH_Q2(td::InfBackoffQueue, 10); + BENCH_Q2(td::MpscPollableQueue, 10); + BENCH_Q2(td::PollQueue, 10); - // BENCH_Q2(td::PollQueue<qvalue_t>, 100); - // BENCH_Q2(td::PollQueue<qvalue_t>, 10); - // BENCH_Q2(td::PollQueue<qvalue_t>, 4); - // BENCH_Q2(td::InfBackoffQueue<qvalue_t>, 100); + BENCH_Q2(td::InfBackoffQueue, 100); + BENCH_Q2(td::MpscPollableQueue, 100); + BENCH_Q2(td::PollQueue, 100); - // BENCH_Q2(td::InfBackoffQueue<qvalue_t>, 1); - // BENCH_Q(SemCheatQueue, 1); + BENCH_Q2(td::PollQueue, 4); + BENCH_Q2(td::PollQueue, 10); + BENCH_Q2(td::PollQueue, 100); +#endif - // BENCH_Q(BufferedFdQueue, 100); - // BENCH_Q(BufferedFdQueue, 10); +#define BENCH_Q(Q, N) td::bench(QueueBenchmark<Q>(N, #Q "(" #N ")")) - // BENCH_Q(BufferQueue, 4); - // BENCH_Q(BufferQueue, 100); - // BENCH_Q(BufferQueue, 10); - // BENCH_Q(BufferQueue, 1); +#if TD_LINUX + BENCH_Q(BufferQueue, 1); + BENCH_Q(BufferedFdQueue, 1); + BENCH_Q(FdQueue, 1); +#endif + BENCH_Q(PipeQueue, 1); + BENCH_Q(SemCheatQueue, 1); + BENCH_Q(SemQueue, 1); + BENCH_Q(VarQueue, 1); - return 0; +#if TD_LINUX + BENCH_Q(BufferQueue, 4); + BENCH_Q(BufferQueue, 10); + BENCH_Q(BufferQueue, 100); +#endif +#endif } |