summaryrefslogtreecommitdiff
path: root/libs/tdlib/td/tdutils/td/utils/buffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'libs/tdlib/td/tdutils/td/utils/buffer.h')
-rw-r--r--libs/tdlib/td/tdutils/td/utils/buffer.h708
1 files changed, 0 insertions, 708 deletions
diff --git a/libs/tdlib/td/tdutils/td/utils/buffer.h b/libs/tdlib/td/tdutils/td/utils/buffer.h
deleted file mode 100644
index aa4ef8db26..0000000000
--- a/libs/tdlib/td/tdutils/td/utils/buffer.h
+++ /dev/null
@@ -1,708 +0,0 @@
-//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
-//
-// Distributed under the Boost Software License, Version 1.0. (See accompanying
-// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-//
-#pragma once
-
-#include "td/utils/common.h"
-#include "td/utils/logging.h"
-#include "td/utils/port/thread_local.h"
-#include "td/utils/Slice.h"
-
-#include <atomic>
-#include <cstring>
-#include <limits>
-
-namespace td {
-
-struct BufferRaw {
- size_t data_size_;
-
- // Constant after first reader is created.
- // May be change by writer before it.
- // So writer may do prepends till there is no reader created.
- size_t begin_;
-
- // Write by writer.
- // Read by reader.
- std::atomic<size_t> end_;
-
- mutable std::atomic<int32> ref_cnt_;
- std::atomic<bool> has_writer_;
- bool was_reader_;
-
- alignas(4) char data_[1];
-};
-
-class BufferAllocator {
- public:
- class DeleteWriterPtr {
- public:
- void operator()(BufferRaw *ptr) {
- ptr->has_writer_.store(false, std::memory_order_release);
- dec_ref_cnt(ptr);
- }
- };
- class DeleteReaderPtr {
- public:
- void operator()(BufferRaw *ptr) {
- dec_ref_cnt(ptr);
- }
- };
-
- using WriterPtr = std::unique_ptr<BufferRaw, DeleteWriterPtr>;
- using ReaderPtr = std::unique_ptr<BufferRaw, DeleteReaderPtr>;
-
- static WriterPtr create_writer(size_t size);
-
- static WriterPtr create_writer(size_t size, size_t prepend, size_t append);
-
- static ReaderPtr create_reader(size_t size);
-
- static ReaderPtr create_reader(const WriterPtr &raw);
-
- static ReaderPtr create_reader(const ReaderPtr &raw);
-
- static size_t get_buffer_mem();
-
- static void clear_thread_local();
-
- private:
- static ReaderPtr create_reader_fast(size_t size);
-
- static WriterPtr create_writer_exact(size_t size);
-
- struct BufferRawDeleter {
- void operator()(BufferRaw *ptr) {
- dec_ref_cnt(ptr);
- }
- };
- struct BufferRawTls {
- std::unique_ptr<BufferRaw, BufferRawDeleter> buffer_raw;
- };
-
- static TD_THREAD_LOCAL BufferRawTls *buffer_raw_tls;
-
- static void dec_ref_cnt(BufferRaw *ptr);
-
- static BufferRaw *create_buffer_raw(size_t size);
-
- static std::atomic<size_t> buffer_mem;
-};
-
-using BufferWriterPtr = BufferAllocator::WriterPtr;
-using BufferReaderPtr = BufferAllocator::ReaderPtr;
-
-class BufferSlice {
- public:
- BufferSlice() = default;
- explicit BufferSlice(BufferReaderPtr buffer_ptr) : buffer_(std::move(buffer_ptr)) {
- if (is_null()) {
- return;
- }
- begin_ = buffer_->begin_;
- sync_with_writer();
- }
- BufferSlice(BufferReaderPtr buffer_ptr, size_t begin, size_t end)
- : buffer_(std::move(buffer_ptr)), begin_(begin), end_(end) {
- }
-
- explicit BufferSlice(size_t size) : buffer_(BufferAllocator::create_reader(size)) {
- end_ = buffer_->end_.load(std::memory_order_relaxed);
- begin_ = end_ - ((size + 7) & -8);
- end_ = begin_ + size;
- }
-
- explicit BufferSlice(Slice slice) : BufferSlice(slice.size()) {
- std::memcpy(as_slice().begin(), slice.begin(), slice.size());
- }
-
- BufferSlice(const char *ptr, size_t size) : BufferSlice(Slice(ptr, size)) {
- }
-
- BufferSlice clone() const {
- if (is_null()) {
- return BufferSlice(BufferReaderPtr(), begin_, end_);
- }
- return BufferSlice(BufferAllocator::create_reader(buffer_), begin_, end_);
- }
-
- BufferSlice copy() const {
- if (is_null()) {
- return BufferSlice(BufferReaderPtr(), begin_, end_);
- }
- return BufferSlice(as_slice());
- }
-
- Slice as_slice() const {
- if (is_null()) {
- return Slice();
- }
- return Slice(buffer_->data_ + begin_, size());
- }
-
- MutableSlice as_slice() {
- if (is_null()) {
- return MutableSlice();
- }
- return MutableSlice(buffer_->data_ + begin_, size());
- }
-
- Slice prepare_read() const {
- return as_slice();
- }
-
- Slice after(size_t offset) const {
- auto full = as_slice();
- full.remove_prefix(offset);
- return full;
- }
-
- bool confirm_read(size_t size) {
- begin_ += size;
- CHECK(begin_ <= end_);
- return begin_ == end_;
- }
-
- void truncate(size_t limit) {
- if (size() > limit) {
- end_ = begin_ + limit;
- }
- }
-
- BufferSlice from_slice(Slice slice) const {
- auto res = BufferSlice(BufferAllocator::create_reader(buffer_));
- res.begin_ = slice.begin() - buffer_->data_;
- res.end_ = slice.end() - buffer_->data_;
- CHECK(buffer_->begin_ <= res.begin_);
- CHECK(res.begin_ <= res.end_);
- CHECK(res.end_ <= buffer_->end_.load(std::memory_order_relaxed));
- return res;
- }
-
- // like in std::string
- char *data() {
- return as_slice().data();
- }
- const char *data() const {
- return as_slice().data();
- }
- char operator[](size_t at) const {
- return as_slice()[at];
- }
-
- bool empty() const {
- return size() == 0;
- }
-
- bool is_null() const {
- return !buffer_;
- }
-
- size_t size() const {
- return end_ - begin_;
- }
-
- // set end_ into writer's end_
- size_t sync_with_writer() {
- CHECK(!is_null());
- auto old_end = end_;
- end_ = buffer_->end_.load(std::memory_order_acquire);
- return end_ - old_end;
- }
- bool is_writer_alive() const {
- CHECK(!is_null());
- return buffer_->has_writer_.load(std::memory_order_acquire);
- }
-
- private:
- BufferReaderPtr buffer_;
- size_t begin_ = 0;
- size_t end_ = 0;
-};
-
-template <class StorerT>
-void store(const BufferSlice &buffer_slice, StorerT &storer) {
- storer.store_string(buffer_slice);
-}
-
-template <class ParserT>
-void parse(BufferSlice &buffer_slice, ParserT &parser) {
- buffer_slice = parser.template fetch_string<BufferSlice>();
-}
-
-class BufferWriter {
- public:
- BufferWriter() = default;
- explicit BufferWriter(size_t size) : BufferWriter(BufferAllocator::create_writer(size)) {
- }
- BufferWriter(size_t size, size_t prepend, size_t append)
- : BufferWriter(BufferAllocator::create_writer(size, prepend, append)) {
- }
- explicit BufferWriter(BufferWriterPtr buffer_ptr) : buffer_(std::move(buffer_ptr)) {
- }
-
- BufferSlice as_buffer_slice() const {
- return BufferSlice(BufferAllocator::create_reader(buffer_));
- }
- bool is_null() const {
- return !buffer_;
- }
- bool empty() const {
- return size() == 0;
- }
- size_t size() const {
- if (is_null()) {
- return 0;
- }
- return buffer_->end_.load(std::memory_order_relaxed) - buffer_->begin_;
- }
- MutableSlice as_slice() {
- auto end = buffer_->end_.load(std::memory_order_relaxed);
- return MutableSlice(buffer_->data_ + buffer_->begin_, buffer_->data_ + end);
- }
-
- MutableSlice prepare_prepend() {
- if (is_null()) {
- return MutableSlice();
- }
- CHECK(!buffer_->was_reader_);
- return MutableSlice(buffer_->data_, buffer_->begin_);
- }
- MutableSlice prepare_append() {
- if (is_null()) {
- return MutableSlice();
- }
- auto end = buffer_->end_.load(std::memory_order_relaxed);
- return MutableSlice(buffer_->data_ + end, buffer_->data_size_ - end);
- }
- void confirm_append(size_t size) {
- if (is_null()) {
- CHECK(size == 0);
- return;
- }
- auto new_end = buffer_->end_.load(std::memory_order_relaxed) + size;
- CHECK(new_end <= buffer_->data_size_);
- buffer_->end_.store(new_end, std::memory_order_release);
- }
- void confirm_prepend(size_t size) {
- if (is_null()) {
- CHECK(size == 0);
- return;
- }
- CHECK(buffer_->begin_ >= size);
- buffer_->begin_ -= size;
- }
-
- private:
- BufferWriterPtr buffer_;
-};
-
-struct ChainBufferNode {
- friend struct DeleteWriterPtr;
- struct DeleteWriterPtr {
- void operator()(ChainBufferNode *ptr) {
- ptr->has_writer_.store(false, std::memory_order_release);
- dec_ref_cnt(ptr);
- }
- };
- friend struct DeleteReaderPtr;
- struct DeleteReaderPtr {
- void operator()(ChainBufferNode *ptr) {
- dec_ref_cnt(ptr);
- }
- };
- using WriterPtr = std::unique_ptr<ChainBufferNode, DeleteWriterPtr>;
- using ReaderPtr = std::unique_ptr<ChainBufferNode, DeleteReaderPtr>;
-
- static WriterPtr make_writer_ptr(ChainBufferNode *ptr) {
- ptr->ref_cnt_.store(1, std::memory_order_relaxed);
- ptr->has_writer_.store(true, std::memory_order_relaxed);
- return WriterPtr(ptr);
- }
- static ReaderPtr make_reader_ptr(ChainBufferNode *ptr) {
- ptr->ref_cnt_.fetch_add(1, std::memory_order_acq_rel);
- return ReaderPtr(ptr);
- }
-
- bool has_writer() {
- return has_writer_.load(std::memory_order_acquire);
- }
-
- bool unique() {
- return ref_cnt_.load(std::memory_order_acquire) == 1;
- }
-
- ChainBufferNode(BufferSlice slice, bool sync_flag) : slice_(std::move(slice)), sync_flag_(sync_flag) {
- }
-
- // reader
- // There are two options
- // 1. Fixed slice of Buffer
- // 2. Slice with non-fixed right end
- // In each case slice_ is const. Reader should read it and use sync_with_writer on its own copy.
- const BufferSlice slice_;
- const bool sync_flag_{false}; // should we call slice_.sync_with_writer or not.
-
- // writer
- ReaderPtr next_{nullptr};
-
- private:
- std::atomic<int> ref_cnt_{0};
- std::atomic<bool> has_writer_{false};
-
- static void clear_nonrecursive(ReaderPtr ptr) {
- while (ptr && ptr->unique()) {
- ptr = std::move(ptr->next_);
- }
- }
- static void dec_ref_cnt(ChainBufferNode *ptr) {
- int left = --ptr->ref_cnt_;
- if (left == 0) {
- clear_nonrecursive(std::move(ptr->next_));
- // TODO(refact): move memory management into allocator (?)
- delete ptr;
- }
- }
-};
-
-using ChainBufferNodeWriterPtr = ChainBufferNode::WriterPtr;
-using ChainBufferNodeReaderPtr = ChainBufferNode::ReaderPtr;
-
-class ChainBufferNodeAllocator {
- public:
- static ChainBufferNodeWriterPtr create(BufferSlice slice, bool sync_flag) {
- auto *ptr = new ChainBufferNode(std::move(slice), sync_flag);
- return ChainBufferNode::make_writer_ptr(ptr);
- }
- static ChainBufferNodeReaderPtr clone(const ChainBufferNodeReaderPtr &ptr) {
- if (!ptr) {
- return ChainBufferNodeReaderPtr();
- }
- return ChainBufferNode::make_reader_ptr(ptr.get());
- }
- static ChainBufferNodeReaderPtr clone(ChainBufferNodeWriterPtr &ptr) {
- if (!ptr) {
- return ChainBufferNodeReaderPtr();
- }
- return ChainBufferNode::make_reader_ptr(ptr.get());
- }
-};
-
-class ChainBufferIterator {
- public:
- ChainBufferIterator() = default;
- explicit ChainBufferIterator(ChainBufferNodeReaderPtr head) : head_(std::move(head)) {
- load_head();
- }
- ChainBufferIterator clone() const {
- return ChainBufferIterator(ChainBufferNodeAllocator::clone(head_), reader_.clone(), need_sync_, offset_);
- }
-
- size_t offset() const {
- return offset_;
- }
-
- void clear() {
- *this = ChainBufferIterator();
- }
-
- Slice prepare_read() {
- if (!head_) {
- return Slice();
- }
- while (true) {
- auto res = reader_.prepare_read();
- if (!res.empty()) {
- return res;
- }
- auto has_writer = head_->has_writer();
- if (need_sync_) {
- reader_.sync_with_writer();
- res = reader_.prepare_read();
- if (!res.empty()) {
- return res;
- }
- }
- if (has_writer) {
- return Slice();
- }
- head_ = ChainBufferNodeAllocator::clone(head_->next_);
- if (!head_) {
- return Slice();
- }
- load_head();
- }
- }
-
- // returns only head
- BufferSlice read_as_buffer_slice(size_t limit) {
- prepare_read();
- auto res = reader_.clone();
- res.truncate(limit);
- confirm_read(res.size());
- return res;
- }
-
- const BufferSlice &head() const {
- return reader_;
- }
-
- void confirm_read(size_t size) {
- offset_ += size;
- reader_.confirm_read(size);
- }
-
- void advance_till_end() {
- while (true) {
- auto ready = prepare_read();
- if (ready.empty()) {
- break;
- }
- confirm_read(ready.size());
- }
- }
-
- size_t advance(size_t offset, MutableSlice dest = MutableSlice()) {
- size_t skipped = 0;
- while (offset != 0) {
- auto ready = prepare_read();
- if (ready.empty()) {
- break;
- }
-
- // read no more than offset
- ready.truncate(offset);
- offset -= ready.size();
- skipped += ready.size();
-
- // copy to dest if possible
- auto to_dest_size = min(ready.size(), dest.size());
- if (to_dest_size != 0) {
- std::memcpy(dest.data(), ready.data(), to_dest_size);
- dest.remove_prefix(to_dest_size);
- }
-
- confirm_read(ready.size());
- }
- return skipped;
- }
-
- private:
- ChainBufferNodeReaderPtr head_;
- BufferSlice reader_; // copy of head_->slice_
- bool need_sync_ = false; // copy of head_->sync_flag_
- size_t offset_ = 0; // position in the union of all nodes
-
- ChainBufferIterator(ChainBufferNodeReaderPtr head, BufferSlice reader, bool need_sync, size_t offset)
- : head_(std::move(head)), reader_(std::move(reader)), need_sync_(need_sync), offset_(offset) {
- }
- void load_head() {
- reader_ = head_->slice_.clone();
- need_sync_ = head_->sync_flag_;
- }
-};
-
-class ChainBufferReader {
- public:
- ChainBufferReader() = default;
- explicit ChainBufferReader(ChainBufferNodeReaderPtr head)
- : begin_(ChainBufferNodeAllocator::clone(head)), end_(std::move(head)) {
- end_.advance_till_end();
- }
- ChainBufferReader(ChainBufferIterator begin, ChainBufferIterator end, bool sync_flag)
- : begin_(std::move(begin)), end_(std::move(end)), sync_flag_(sync_flag) {
- }
- ChainBufferReader(ChainBufferNodeReaderPtr head, size_t size)
- : begin_(ChainBufferNodeAllocator::clone(head)), end_(std::move(head)) {
- auto advanced = end_.advance(size);
- CHECK(advanced == size);
- }
- ChainBufferReader(ChainBufferReader &&) = default;
- ChainBufferReader &operator=(ChainBufferReader &&) = default;
- ChainBufferReader(const ChainBufferReader &) = delete;
- ChainBufferReader &operator=(const ChainBufferReader &) = delete;
- ~ChainBufferReader() = default;
-
- ChainBufferReader clone() {
- return ChainBufferReader(begin_.clone(), end_.clone(), sync_flag_);
- }
-
- Slice prepare_read() {
- auto res = begin_.prepare_read();
- res.truncate(size());
- return res;
- }
-
- void confirm_read(size_t size) {
- CHECK(size <= this->size());
- begin_.confirm_read(size);
- }
-
- size_t advance(size_t offset, MutableSlice dest = MutableSlice()) {
- CHECK(offset <= size());
- return begin_.advance(offset, dest);
- }
-
- size_t size() const {
- return end_.offset() - begin_.offset();
- }
- bool empty() const {
- return size() == 0;
- }
-
- void sync_with_writer() {
- if (sync_flag_) {
- end_.advance_till_end();
- }
- }
- void advance_end(size_t size) {
- end_.advance(size);
- }
- const ChainBufferIterator &begin() {
- return begin_;
- }
- const ChainBufferIterator &end() {
- return end_;
- }
-
- // Return [begin_, tail.begin_)
- // *this = tail
- ChainBufferReader cut_head(ChainBufferIterator pos) {
- auto tmp = begin_.clone();
- begin_ = pos.clone();
- return ChainBufferReader(std::move(tmp), std::move(pos), false);
- }
-
- ChainBufferReader cut_head(size_t offset) {
- CHECK(offset <= size()) << offset << " " << size();
- auto it = begin_.clone();
- it.advance(offset);
- return cut_head(std::move(it));
- }
-
- BufferSlice move_as_buffer_slice() {
- BufferSlice res;
- if (begin_.head().size() >= size()) {
- res = begin_.read_as_buffer_slice(size());
- } else {
- auto save_size = size();
- res = BufferSlice{save_size};
- advance(save_size, res.as_slice());
- }
- *this = ChainBufferReader();
- return res;
- }
-
- BufferSlice read_as_buffer_slice(size_t limit = std::numeric_limits<size_t>::max()) {
- return begin_.read_as_buffer_slice(min(limit, size()));
- }
-
- private:
- ChainBufferIterator begin_; // use it for prepare_read. Fix result with size()
- ChainBufferIterator end_; // keep end as far as we can. use it for size()
- bool sync_flag_ = true; // auto sync of end_
-
- // 1. We have fixed size. Than end_ is useless.
- // 2. No fixed size. One has to sync end_ with end_.advance_till_end() in order to calculate size.
-};
-
-class ChainBufferWriter {
- public:
- ChainBufferWriter() {
- init();
- }
-
- // legacy
- static ChainBufferWriter create_empty(size_t size = 0) {
- return ChainBufferWriter();
- }
-
- void init(size_t size = 0) {
- writer_ = BufferWriter(size);
- tail_ = ChainBufferNodeAllocator::create(writer_.as_buffer_slice(), true);
- head_ = ChainBufferNodeAllocator::clone(tail_);
- }
-
- MutableSlice prepare_append(size_t hint = 0) {
- CHECK(!empty());
- auto res = prepare_append_inplace();
- if (res.empty()) {
- return prepare_append_alloc(hint);
- }
- return res;
- }
- MutableSlice prepare_append_inplace() {
- CHECK(!empty());
- return writer_.prepare_append();
- }
- MutableSlice prepare_append_alloc(size_t hint = 0) {
- CHECK(!empty());
- if (hint < (1 << 10)) {
- hint = 1 << 12;
- }
- BufferWriter new_writer(hint);
- auto new_tail = ChainBufferNodeAllocator::create(new_writer.as_buffer_slice(), true);
- tail_->next_ = ChainBufferNodeAllocator::clone(new_tail);
- writer_ = std::move(new_writer);
- tail_ = std::move(new_tail); // release tail_
- return writer_.prepare_append();
- }
- void confirm_append(size_t size) {
- CHECK(!empty());
- writer_.confirm_append(size);
- }
-
- void append(Slice slice) {
- while (!slice.empty()) {
- auto ready = prepare_append(slice.size());
- auto shift = min(ready.size(), slice.size());
- std::memcpy(ready.data(), slice.data(), shift);
- confirm_append(shift);
- slice.remove_prefix(shift);
- }
- }
-
- void append(BufferSlice slice) {
- auto ready = prepare_append_inplace();
- // TODO(perf): we have to store some stats in ChainBufferWriter
- // for better append logic
- if (slice.size() < (1 << 8) || ready.size() >= slice.size()) {
- return append(slice.as_slice());
- }
-
- auto new_tail = ChainBufferNodeAllocator::create(std::move(slice), false);
- tail_->next_ = ChainBufferNodeAllocator::clone(new_tail);
- writer_ = BufferWriter();
- tail_ = std::move(new_tail); // release tail_
- }
-
- void append(ChainBufferReader &&reader) {
- while (!reader.empty()) {
- append(reader.read_as_buffer_slice());
- }
- }
- void append(ChainBufferReader &reader) {
- while (!reader.empty()) {
- append(reader.read_as_buffer_slice());
- }
- }
-
- ChainBufferReader extract_reader() {
- CHECK(head_);
- return ChainBufferReader(std::move(head_));
- }
-
- private:
- bool empty() const {
- return !tail_;
- }
-
- ChainBufferNodeReaderPtr head_;
- ChainBufferNodeWriterPtr tail_;
- BufferWriter writer_;
-};
-
-} // namespace td