diff options
Diffstat (limited to 'libs/tdlib/td/benchmark/bench_queue.cpp')
-rw-r--r-- | libs/tdlib/td/benchmark/bench_queue.cpp | 943 |
1 files changed, 0 insertions, 943 deletions
diff --git a/libs/tdlib/td/benchmark/bench_queue.cpp b/libs/tdlib/td/benchmark/bench_queue.cpp deleted file mode 100644 index 13288e6cd7..0000000000 --- a/libs/tdlib/td/benchmark/bench_queue.cpp +++ /dev/null @@ -1,943 +0,0 @@ -// -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -#include "td/utils/benchmark.h" -#include "td/utils/common.h" -#include "td/utils/logging.h" -#include "td/utils/MpscPollableQueue.h" -#include "td/utils/queue.h" - -// TODO: check system calls -// TODO: all return values must be checked - -#include <atomic> -#include <cstdio> -#include <cstdlib> -#include <vector> - -#include <pthread.h> -#include <sched.h> -#include <semaphore.h> -#include <sys/syscall.h> -#include <unistd.h> - -#if TD_LINUX -#include <sys/eventfd.h> -#endif - -using std::atomic; -using std::vector; - -using td::int32; -using td::uint32; - -#define MODE std::memory_order_relaxed - -// void set_affinity(int mask) { -// int err, syscallres; -// pid_t pid = gettid(); -// syscallres = syscall(__NR_sched_setaffinity, pid, sizeof(mask), &mask); -// if (syscallres) { -// err = errno; -// perror("oppa"); -//} -//} - -// TODO: warnings and asserts. There should be no warnings or debug output in production. -using qvalue_t = int; - -// Just for testing, not production -class PipeQueue { - int input; - int output; - - public: - void init() { - int new_pipe[2]; - pipe(new_pipe); - output = new_pipe[0]; - input = new_pipe[1]; - } - - void put(qvalue_t value) { - write(input, &value, sizeof(value)); - } - - qvalue_t get() { - qvalue_t res; - read(output, &res, sizeof(res)); - return res; - } - - void destroy() { - close(input); - close(output); - } -}; - -class Backoff { - int cnt; - - public: - Backoff() : cnt(0) { - } - - bool next() { - cnt++; - if (cnt < 50) { - return true; - } else { - sched_yield(); - return cnt < 500; - } - } -}; - -class VarQueue { - atomic<qvalue_t> data; - - public: - void init() { - data.store(-1, MODE); - } - - void put(qvalue_t value) { - data.store(value, MODE); - } - - qvalue_t try_get() { - __sync_synchronize(); // TODO: it is wrong place for barrier, but it results in fastest queue - qvalue_t res = data.load(MODE); - return res; - } - - void acquire() { - data.store(-1, MODE); - } - - qvalue_t get() { - qvalue_t res; - Backoff backoff; - - do { - res = try_get(); - } while (res == -1 && (backoff.next(), true)); - acquire(); - - return res; - } - - void destroy() { - } -}; - -class SemQueue { - sem_t sem; - VarQueue q; - - public: - void init() { - q.init(); - sem_init(&sem, 0, 0); - } - - void put(qvalue_t value) { - q.put(value); - sem_post(&sem); - } - - qvalue_t get() { - sem_wait(&sem); - qvalue_t res = q.get(); - return res; - } - - void destroy() { - q.destroy(); - sem_destroy(&sem); - } - - // HACK for benchmark - void reader_flush() { - } - - void writer_flush() { - } - - void writer_put(qvalue_t value) { - put(value); - } - - int reader_wait() { - return 1; - } - - qvalue_t reader_get_unsafe() { - return get(); - } -}; - -#if TD_LINUX -class EventfdQueue { - int fd; - VarQueue q; - - public: - void init() { - q.init(); - fd = eventfd(0, 0); - } - void put(qvalue_t value) { - q.put(value); - td::int64 x = 1; - write(fd, &x, sizeof(x)); - } - qvalue_t get() { - td::int64 x; - read(fd, &x, sizeof(x)); - return q.get(); - } - void destroy() { - q.destroy(); - close(fd); - } -}; -#endif - -const int queue_buf_size = 1 << 10; - -class BufferQueue { - struct node { - qvalue_t val; - char pad[64 - sizeof(atomic<qvalue_t>)]; - }; - node q[queue_buf_size]; - - struct Position { - atomic<uint32> i; - char pad[64 - sizeof(atomic<uint32>)]; - - uint32 local_read_i; - uint32 local_write_i; - char pad2[64 - sizeof(uint32) * 2]; - - void init() { - i = 0; - local_read_i = 0; - local_write_i = 0; - } - }; - - Position writer; - Position reader; - - public: - void init() { - writer.init(); - reader.init(); - } - - bool reader_empty() { - return reader.local_write_i == reader.local_read_i; - } - - bool writer_empty() { - return writer.local_write_i == writer.local_read_i + queue_buf_size; - } - - int reader_ready() { - return static_cast<int>(reader.local_write_i - reader.local_read_i); - } - - int writer_ready() { - return static_cast<int>(writer.local_read_i + queue_buf_size - writer.local_write_i); - } - - qvalue_t get_unsafe() { - return q[reader.local_read_i++ & (queue_buf_size - 1)].val; - } - - void flush_reader() { - reader.i.store(reader.local_read_i, std::memory_order_release); - } - - int update_reader() { - reader.local_write_i = writer.i.load(std::memory_order_acquire); - return reader_ready(); - } - - void put_unsafe(qvalue_t val) { - q[writer.local_write_i++ & (queue_buf_size - 1)].val = val; - } - - void flush_writer() { - writer.i.store(writer.local_write_i, std::memory_order_release); - } - - int update_writer() { - writer.local_read_i = reader.i.load(std::memory_order_acquire); - return writer_ready(); - } - - int wait_reader() { - Backoff backoff; - int res = 0; - while (res == 0) { - backoff.next(); - res = update_reader(); - } - return res; - } - - qvalue_t get_noflush() { - if (!reader_empty()) { - return get_unsafe(); - } - - Backoff backoff; - while (true) { - backoff.next(); - if (update_reader()) { - return get_unsafe(); - } - } - } - - qvalue_t get() { - qvalue_t res = get_noflush(); - flush_reader(); - return res; - } - - void put_noflush(qvalue_t val) { - if (!writer_empty()) { - put_unsafe(val); - return; - } - if (!update_writer()) { - std::fprintf(stderr, "put strong failed\n"); - std::exit(0); - } - put_unsafe(val); - } - - void put(qvalue_t val) { - put_noflush(val); - flush_writer(); - } - - void destroy() { - } -}; - -#if TD_LINUX -class BufferedFdQueue { - int fd; - atomic<int> wait_flag; - BufferQueue q; - char pad[64]; - - public: - void init() { - q.init(); - fd = eventfd(0, 0); - (void)pad[0]; - } - void put(qvalue_t value) { - q.put(value); - td::int64 x = 1; - __sync_synchronize(); - if (wait_flag.load(MODE)) { - write(fd, &x, sizeof(x)); - } - } - void put_noflush(qvalue_t value) { - q.put_noflush(value); - } - void flush_writer() { - q.flush_writer(); - td::int64 x = 1; - __sync_synchronize(); - if (wait_flag.load(MODE)) { - write(fd, &x, sizeof(x)); - } - } - void flush_reader() { - q.flush_reader(); - } - - qvalue_t get_unsafe_flush() { - qvalue_t res = q.get_unsafe(); - q.flush_reader(); - return res; - } - - qvalue_t get_unsafe() { - return q.get_unsafe(); - } - - int wait_reader() { - int res = 0; - Backoff backoff; - while (res == 0 && backoff.next()) { - res = q.update_reader(); - } - if (res != 0) { - return res; - } - - td::int64 x; - wait_flag.store(1, MODE); - __sync_synchronize(); - while (!(res = q.update_reader())) { - read(fd, &x, sizeof(x)); - __sync_synchronize(); - } - wait_flag.store(0, MODE); - return res; - } - - qvalue_t get() { - if (!q.reader_empty()) { - return get_unsafe_flush(); - } - - Backoff backoff; - while (backoff.next()) { - if (q.update_reader()) { - return get_unsafe_flush(); - } - } - - td::int64 x; - wait_flag.store(1, MODE); - __sync_synchronize(); - while (!q.update_reader()) { - read(fd, &x, sizeof(x)); - __sync_synchronize(); - } - wait_flag.store(0, MODE); - return get_unsafe_flush(); - } - void destroy() { - q.destroy(); - close(fd); - } -}; - -class FdQueue { - int fd; - atomic<int> wait_flag; - VarQueue q; - char pad[64]; - - public: - void init() { - q.init(); - fd = eventfd(0, 0); - (void)pad[0]; - } - void put(qvalue_t value) { - q.put(value); - td::int64 x = 1; - __sync_synchronize(); - if (wait_flag.load(MODE)) { - write(fd, &x, sizeof(x)); - } - } - qvalue_t get() { - // td::int64 x; - // read(fd, &x, sizeof(x)); - // return q.get(); - - Backoff backoff; - qvalue_t res = -1; - do { - res = q.try_get(); - } while (res == -1 && backoff.next()); - if (res != -1) { - q.acquire(); - return res; - } - - td::int64 x; - wait_flag.store(1, MODE); - __sync_synchronize(); - // std::fprintf(stderr, "!\n"); - // while (res == -1 && read(fd, &x, sizeof(x))) { - // res = q.try_get(); - //} - do { - __sync_synchronize(); - res = q.try_get(); - } while (res == -1 && read(fd, &x, sizeof(x))); - q.acquire(); - wait_flag.store(0, MODE); - return res; - } - void destroy() { - q.destroy(); - close(fd); - } -}; -#endif - -class SemBackoffQueue { - sem_t sem; - VarQueue q; - - public: - void init() { - q.init(); - sem_init(&sem, 0, 0); - } - - void put(qvalue_t value) { - q.put(value); - sem_post(&sem); - } - - qvalue_t get() { - Backoff backoff; - int sem_flag = -1; - do { - sem_flag = sem_trywait(&sem); - } while (sem_flag != 0 && backoff.next()); - if (sem_flag != 0) { - sem_wait(&sem); - } - return q.get(); - } - - void destroy() { - q.destroy(); - sem_destroy(&sem); - } -}; - -class SemCheatQueue { - sem_t sem; - VarQueue q; - - public: - void init() { - q.init(); - sem_init(&sem, 0, 0); - } - - void put(qvalue_t value) { - q.put(value); - sem_post(&sem); - } - - qvalue_t get() { - Backoff backoff; - qvalue_t res = -1; - do { - res = q.try_get(); - } while (res == -1 && backoff.next()); - sem_wait(&sem); - if (res != -1) { - q.acquire(); - return res; - } - return q.get(); - } - - void destroy() { - q.destroy(); - sem_destroy(&sem); - } -}; - -template <class QueueT> -class QueueBenchmark2 : public td::Benchmark { - QueueT client, server; - int connections_n, queries_n; - - int server_active_connections; - int client_active_connections; - vector<td::int64> server_conn; - vector<td::int64> client_conn; - - public: - explicit QueueBenchmark2(int connections_n = 1) : connections_n(connections_n) { - } - - std::string get_description() const override { - return "QueueBenchmark2"; - } - - void start_up() override { - client.init(); - server.init(); - } - - void tear_down() override { - client.destroy(); - server.destroy(); - } - - void server_process(qvalue_t value) { - int no = value & 0x00FFFFFF; - int co = static_cast<int>(static_cast<unsigned int>(value) >> 24); - // std::fprintf(stderr, "-->%d %d\n", co, no); - if (co < 0 || co >= connections_n || no != server_conn[co]++) { - std::fprintf(stderr, "%d %d\n", co, no); - std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(server_conn[co] - 1)); - std::fprintf(stderr, "Server BUG\n"); - while (true) { - } - } - // std::fprintf(stderr, "no = %d/%d\n", no, queries_n); - // std::fprintf(stderr, "answer: %d %d\n", no, co); - - client.writer_put(value); - client.writer_flush(); - if (no + 1 >= queries_n) { - server_active_connections--; - } - } - - void *server_run(void *) { - server_conn = vector<td::int64>(connections_n); - server_active_connections = connections_n; - - while (server_active_connections > 0) { - int cnt = server.reader_wait(); - if (cnt == 0) { - std::fprintf(stderr, "ERROR!\n"); - std::exit(0); - } - while (cnt-- > 0) { - server_process(server.reader_get_unsafe()); - server.reader_flush(); - } - // client.writer_flush(); - server.reader_flush(); - } - return nullptr; - } - - void client_process(qvalue_t value) { - int no = value & 0x00FFFFFF; - int co = static_cast<int>(static_cast<unsigned int>(value) >> 24); - // std::fprintf(stderr, "<--%d %d\n", co, no); - if (co < 0 || co >= connections_n || no != client_conn[co]++) { - std::fprintf(stderr, "%d %d\n", co, no); - std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(client_conn[co] - 1)); - std::fprintf(stderr, "BUG\n"); - while (true) { - } - std::exit(0); - } - if (no + 1 < queries_n) { - // std::fprintf(stderr, "query: %d %d\n", no + 1, co); - server.writer_put(value + 1); - server.writer_flush(); - } else { - client_active_connections--; - } - } - - void *client_run(void *) { - client_conn = vector<td::int64>(connections_n); - client_active_connections = connections_n; - if (queries_n >= (1 << 24)) { - std::fprintf(stderr, "Too big queries_n\n"); - std::exit(0); - } - - for (int i = 0; i < connections_n; i++) { - server.writer_put(static_cast<qvalue_t>(i) << 24); - } - server.writer_flush(); - - while (client_active_connections > 0) { - int cnt = client.reader_wait(); - if (cnt == 0) { - std::fprintf(stderr, "ERROR!\n"); - std::exit(0); - } - while (cnt-- > 0) { - client_process(client.reader_get_unsafe()); - client.reader_flush(); - } - // server.writer_flush(); - client.reader_flush(); - } - // system("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq"); - return nullptr; - } - - static void *client_run_gateway(void *arg) { - return static_cast<QueueBenchmark2 *>(arg)->client_run(nullptr); - } - - static void *server_run_gateway(void *arg) { - return static_cast<QueueBenchmark2 *>(arg)->server_run(nullptr); - } - - void run(int n) override { - pthread_t client_thread_id; - pthread_t server_thread_id; - - queries_n = (n + connections_n - 1) / connections_n; - - pthread_create(&client_thread_id, nullptr, client_run_gateway, this); - pthread_create(&server_thread_id, nullptr, server_run_gateway, this); - - pthread_join(client_thread_id, nullptr); - pthread_join(server_thread_id, nullptr); - } -}; - -template <class QueueT> -class QueueBenchmark : public td::Benchmark { - QueueT client, server; - const int connections_n; - int queries_n; - - public: - explicit QueueBenchmark(int connections_n = 1) : connections_n(connections_n) { - } - - std::string get_description() const override { - return "QueueBenchmark"; - } - - void start_up() override { - client.init(); - server.init(); - } - - void tear_down() override { - client.destroy(); - server.destroy(); - } - - void *server_run(void *) { - vector<td::int64> conn(connections_n); - int active_connections = connections_n; - while (active_connections > 0) { - qvalue_t value = server.get(); - int no = value & 0x00FFFFFF; - int co = static_cast<int>(value >> 24); - // std::fprintf(stderr, "-->%d %d\n", co, no); - if (co < 0 || co >= connections_n || no != conn[co]++) { - std::fprintf(stderr, "%d %d\n", co, no); - std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(conn[co] - 1)); - std::fprintf(stderr, "Server BUG\n"); - while (true) { - } - } - // std::fprintf(stderr, "no = %d/%d\n", no, queries_n); - client.put(value); - if (no + 1 >= queries_n) { - active_connections--; - } - } - return nullptr; - } - - void *client_run(void *) { - vector<td::int64> conn(connections_n); - if (queries_n >= (1 << 24)) { - std::fprintf(stderr, "Too big queries_n\n"); - std::exit(0); - } - for (int i = 0; i < connections_n; i++) { - server.put(static_cast<qvalue_t>(i) << 24); - } - int active_connections = connections_n; - while (active_connections > 0) { - qvalue_t value = client.get(); - int no = value & 0x00FFFFFF; - int co = static_cast<int>(value >> 24); - // std::fprintf(stderr, "<--%d %d\n", co, no); - if (co < 0 || co >= connections_n || no != conn[co]++) { - std::fprintf(stderr, "%d %d\n", co, no); - std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(conn[co] - 1)); - std::fprintf(stderr, "BUG\n"); - while (true) { - } - std::exit(0); - } - if (no + 1 < queries_n) { - server.put(value + 1); - } else { - active_connections--; - } - } - // system("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq"); - return nullptr; - } - - void *client_run2(void *) { - vector<td::int64> conn(connections_n); - if (queries_n >= (1 << 24)) { - std::fprintf(stderr, "Too big queries_n\n"); - std::exit(0); - } - for (int it = 0; it < queries_n; it++) { - for (int i = 0; i < connections_n; i++) { - server.put((static_cast<td::int64>(i) << 24) + it); - } - for (int i = 0; i < connections_n; i++) { - qvalue_t value = client.get(); - int no = value & 0x00FFFFFF; - int co = static_cast<int>(value >> 24); - // std::fprintf(stderr, "<--%d %d\n", co, no); - if (co < 0 || co >= connections_n || no != conn[co]++) { - std::fprintf(stderr, "%d %d\n", co, no); - std::fprintf(stderr, "expected %d %lld\n", co, static_cast<long long>(conn[co] - 1)); - std::fprintf(stderr, "BUG\n"); - while (true) { - } - std::exit(0); - } - } - } - // system("cat /sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq"); - return nullptr; - } - - static void *client_run_gateway(void *arg) { - return static_cast<QueueBenchmark *>(arg)->client_run(nullptr); - } - - static void *server_run_gateway(void *arg) { - return static_cast<QueueBenchmark *>(arg)->server_run(nullptr); - } - - void run(int n) override { - pthread_t client_thread_id; - pthread_t server_thread_id; - - queries_n = (n + connections_n - 1) / connections_n; - - pthread_create(&client_thread_id, nullptr, client_run_gateway, this); - pthread_create(&server_thread_id, nullptr, server_run_gateway, this); - - pthread_join(client_thread_id, nullptr); - pthread_join(server_thread_id, nullptr); - } -}; - -template <class QueueT> -class RingBenchmark : public td::Benchmark { - enum { QN = 504 }; - - struct Thread { - int int_id; - pthread_t id; - QueueT queue; - Thread *next; - char pad[64]; - - void *run() { - qvalue_t value; - // std::fprintf(stderr, "start %d\n", int_id); - do { - int cnt = queue.reader_wait(); - CHECK(cnt == 1); - value = queue.reader_get_unsafe(); - queue.reader_flush(); - - next->queue.writer_put(value - 1); - next->queue.writer_flush(); - } while (value >= QN); - return nullptr; - } - }; - - Thread q[QN]; - - public: - static void *run_gateway(void *arg) { - return static_cast<Thread *>(arg)->run(); - } - - void start_up() override { - for (int i = 0; i < QN; i++) { - q[i].int_id = i; - q[i].queue.init(); - q[i].next = &q[(i + 1) % QN]; - } - } - - void tear_down() override { - for (int i = 0; i < QN; i++) { - q[i].queue.destroy(); - } - } - - void run(int n) override { - for (int i = 0; i < QN; i++) { - pthread_create(&q[i].id, nullptr, run_gateway, &q[i]); - } - - std::fprintf(stderr, "run %d\n", n); - if (n < 1000) { - n = 1000; - } - q[0].queue.writer_put(n); - q[0].queue.writer_flush(); - - for (int i = 0; i < QN; i++) { - pthread_join(q[i].id, nullptr); - } - } -}; - -int main() { - SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG)); -#define BENCH_Q2(Q, N) \ - std::fprintf(stderr, "!%s %d:\t", #Q, N); \ - td::bench(QueueBenchmark2<Q>(N)); -#define BENCH_Q(Q, N) \ - std::fprintf(stderr, "%s %d:\t", #Q, N); \ - td::bench(QueueBenchmark<Q>(N)); - -#define BENCH_R(Q) \ - std::fprintf(stderr, "%s:\t", #Q); \ - td::bench(RingBenchmark<Q>()); - // TODO: yield makes it extremely slow. Yet some backoff may be necessary. - // BENCH_R(SemQueue); - // BENCH_R(td::PollQueue<qvalue_t>); - - BENCH_Q2(td::PollQueue<qvalue_t>, 1); - BENCH_Q2(td::MpscPollableQueue<qvalue_t>, 1); - BENCH_Q2(td::PollQueue<qvalue_t>, 100); - BENCH_Q2(td::MpscPollableQueue<qvalue_t>, 100); - BENCH_Q2(td::PollQueue<qvalue_t>, 10); - BENCH_Q2(td::MpscPollableQueue<qvalue_t>, 10); - - BENCH_Q(VarQueue, 1); - // BENCH_Q(FdQueue, 1); - // BENCH_Q(BufferedFdQueue, 1); - BENCH_Q(PipeQueue, 1); - BENCH_Q(SemCheatQueue, 1); - BENCH_Q(SemQueue, 1); - - // BENCH_Q2(td::PollQueue<qvalue_t>, 100); - // BENCH_Q2(td::PollQueue<qvalue_t>, 10); - // BENCH_Q2(td::PollQueue<qvalue_t>, 4); - // BENCH_Q2(td::InfBackoffQueue<qvalue_t>, 100); - - // BENCH_Q2(td::InfBackoffQueue<qvalue_t>, 1); - // BENCH_Q(SemCheatQueue, 1); - - // BENCH_Q(BufferedFdQueue, 100); - // BENCH_Q(BufferedFdQueue, 10); - - // BENCH_Q(BufferQueue, 4); - // BENCH_Q(BufferQueue, 100); - // BENCH_Q(BufferQueue, 10); - // BENCH_Q(BufferQueue, 1); - - return 0; -} |