diff options
Diffstat (limited to 'libs/tdlib/td/tdutils/td/utils/buffer.h')
-rw-r--r-- | libs/tdlib/td/tdutils/td/utils/buffer.h | 708 |
1 files changed, 708 insertions, 0 deletions
diff --git a/libs/tdlib/td/tdutils/td/utils/buffer.h b/libs/tdlib/td/tdutils/td/utils/buffer.h new file mode 100644 index 0000000000..aa4ef8db26 --- /dev/null +++ b/libs/tdlib/td/tdutils/td/utils/buffer.h @@ -0,0 +1,708 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/common.h" +#include "td/utils/logging.h" +#include "td/utils/port/thread_local.h" +#include "td/utils/Slice.h" + +#include <atomic> +#include <cstring> +#include <limits> + +namespace td { + +struct BufferRaw { + size_t data_size_; + + // Constant after first reader is created. + // May be change by writer before it. + // So writer may do prepends till there is no reader created. + size_t begin_; + + // Write by writer. + // Read by reader. + std::atomic<size_t> end_; + + mutable std::atomic<int32> ref_cnt_; + std::atomic<bool> has_writer_; + bool was_reader_; + + alignas(4) char data_[1]; +}; + +class BufferAllocator { + public: + class DeleteWriterPtr { + public: + void operator()(BufferRaw *ptr) { + ptr->has_writer_.store(false, std::memory_order_release); + dec_ref_cnt(ptr); + } + }; + class DeleteReaderPtr { + public: + void operator()(BufferRaw *ptr) { + dec_ref_cnt(ptr); + } + }; + + using WriterPtr = std::unique_ptr<BufferRaw, DeleteWriterPtr>; + using ReaderPtr = std::unique_ptr<BufferRaw, DeleteReaderPtr>; + + static WriterPtr create_writer(size_t size); + + static WriterPtr create_writer(size_t size, size_t prepend, size_t append); + + static ReaderPtr create_reader(size_t size); + + static ReaderPtr create_reader(const WriterPtr &raw); + + static ReaderPtr create_reader(const ReaderPtr &raw); + + static size_t get_buffer_mem(); + + static void clear_thread_local(); + + private: + static ReaderPtr create_reader_fast(size_t size); + + static WriterPtr create_writer_exact(size_t size); + + struct BufferRawDeleter { + void operator()(BufferRaw *ptr) { + dec_ref_cnt(ptr); + } + }; + struct BufferRawTls { + std::unique_ptr<BufferRaw, BufferRawDeleter> buffer_raw; + }; + + static TD_THREAD_LOCAL BufferRawTls *buffer_raw_tls; + + static void dec_ref_cnt(BufferRaw *ptr); + + static BufferRaw *create_buffer_raw(size_t size); + + static std::atomic<size_t> buffer_mem; +}; + +using BufferWriterPtr = BufferAllocator::WriterPtr; +using BufferReaderPtr = BufferAllocator::ReaderPtr; + +class BufferSlice { + public: + BufferSlice() = default; + explicit BufferSlice(BufferReaderPtr buffer_ptr) : buffer_(std::move(buffer_ptr)) { + if (is_null()) { + return; + } + begin_ = buffer_->begin_; + sync_with_writer(); + } + BufferSlice(BufferReaderPtr buffer_ptr, size_t begin, size_t end) + : buffer_(std::move(buffer_ptr)), begin_(begin), end_(end) { + } + + explicit BufferSlice(size_t size) : buffer_(BufferAllocator::create_reader(size)) { + end_ = buffer_->end_.load(std::memory_order_relaxed); + begin_ = end_ - ((size + 7) & -8); + end_ = begin_ + size; + } + + explicit BufferSlice(Slice slice) : BufferSlice(slice.size()) { + std::memcpy(as_slice().begin(), slice.begin(), slice.size()); + } + + BufferSlice(const char *ptr, size_t size) : BufferSlice(Slice(ptr, size)) { + } + + BufferSlice clone() const { + if (is_null()) { + return BufferSlice(BufferReaderPtr(), begin_, end_); + } + return BufferSlice(BufferAllocator::create_reader(buffer_), begin_, end_); + } + + BufferSlice copy() const { + if (is_null()) { + return BufferSlice(BufferReaderPtr(), begin_, end_); + } + return BufferSlice(as_slice()); + } + + Slice as_slice() const { + if (is_null()) { + return Slice(); + } + return Slice(buffer_->data_ + begin_, size()); + } + + MutableSlice as_slice() { + if (is_null()) { + return MutableSlice(); + } + return MutableSlice(buffer_->data_ + begin_, size()); + } + + Slice prepare_read() const { + return as_slice(); + } + + Slice after(size_t offset) const { + auto full = as_slice(); + full.remove_prefix(offset); + return full; + } + + bool confirm_read(size_t size) { + begin_ += size; + CHECK(begin_ <= end_); + return begin_ == end_; + } + + void truncate(size_t limit) { + if (size() > limit) { + end_ = begin_ + limit; + } + } + + BufferSlice from_slice(Slice slice) const { + auto res = BufferSlice(BufferAllocator::create_reader(buffer_)); + res.begin_ = slice.begin() - buffer_->data_; + res.end_ = slice.end() - buffer_->data_; + CHECK(buffer_->begin_ <= res.begin_); + CHECK(res.begin_ <= res.end_); + CHECK(res.end_ <= buffer_->end_.load(std::memory_order_relaxed)); + return res; + } + + // like in std::string + char *data() { + return as_slice().data(); + } + const char *data() const { + return as_slice().data(); + } + char operator[](size_t at) const { + return as_slice()[at]; + } + + bool empty() const { + return size() == 0; + } + + bool is_null() const { + return !buffer_; + } + + size_t size() const { + return end_ - begin_; + } + + // set end_ into writer's end_ + size_t sync_with_writer() { + CHECK(!is_null()); + auto old_end = end_; + end_ = buffer_->end_.load(std::memory_order_acquire); + return end_ - old_end; + } + bool is_writer_alive() const { + CHECK(!is_null()); + return buffer_->has_writer_.load(std::memory_order_acquire); + } + + private: + BufferReaderPtr buffer_; + size_t begin_ = 0; + size_t end_ = 0; +}; + +template <class StorerT> +void store(const BufferSlice &buffer_slice, StorerT &storer) { + storer.store_string(buffer_slice); +} + +template <class ParserT> +void parse(BufferSlice &buffer_slice, ParserT &parser) { + buffer_slice = parser.template fetch_string<BufferSlice>(); +} + +class BufferWriter { + public: + BufferWriter() = default; + explicit BufferWriter(size_t size) : BufferWriter(BufferAllocator::create_writer(size)) { + } + BufferWriter(size_t size, size_t prepend, size_t append) + : BufferWriter(BufferAllocator::create_writer(size, prepend, append)) { + } + explicit BufferWriter(BufferWriterPtr buffer_ptr) : buffer_(std::move(buffer_ptr)) { + } + + BufferSlice as_buffer_slice() const { + return BufferSlice(BufferAllocator::create_reader(buffer_)); + } + bool is_null() const { + return !buffer_; + } + bool empty() const { + return size() == 0; + } + size_t size() const { + if (is_null()) { + return 0; + } + return buffer_->end_.load(std::memory_order_relaxed) - buffer_->begin_; + } + MutableSlice as_slice() { + auto end = buffer_->end_.load(std::memory_order_relaxed); + return MutableSlice(buffer_->data_ + buffer_->begin_, buffer_->data_ + end); + } + + MutableSlice prepare_prepend() { + if (is_null()) { + return MutableSlice(); + } + CHECK(!buffer_->was_reader_); + return MutableSlice(buffer_->data_, buffer_->begin_); + } + MutableSlice prepare_append() { + if (is_null()) { + return MutableSlice(); + } + auto end = buffer_->end_.load(std::memory_order_relaxed); + return MutableSlice(buffer_->data_ + end, buffer_->data_size_ - end); + } + void confirm_append(size_t size) { + if (is_null()) { + CHECK(size == 0); + return; + } + auto new_end = buffer_->end_.load(std::memory_order_relaxed) + size; + CHECK(new_end <= buffer_->data_size_); + buffer_->end_.store(new_end, std::memory_order_release); + } + void confirm_prepend(size_t size) { + if (is_null()) { + CHECK(size == 0); + return; + } + CHECK(buffer_->begin_ >= size); + buffer_->begin_ -= size; + } + + private: + BufferWriterPtr buffer_; +}; + +struct ChainBufferNode { + friend struct DeleteWriterPtr; + struct DeleteWriterPtr { + void operator()(ChainBufferNode *ptr) { + ptr->has_writer_.store(false, std::memory_order_release); + dec_ref_cnt(ptr); + } + }; + friend struct DeleteReaderPtr; + struct DeleteReaderPtr { + void operator()(ChainBufferNode *ptr) { + dec_ref_cnt(ptr); + } + }; + using WriterPtr = std::unique_ptr<ChainBufferNode, DeleteWriterPtr>; + using ReaderPtr = std::unique_ptr<ChainBufferNode, DeleteReaderPtr>; + + static WriterPtr make_writer_ptr(ChainBufferNode *ptr) { + ptr->ref_cnt_.store(1, std::memory_order_relaxed); + ptr->has_writer_.store(true, std::memory_order_relaxed); + return WriterPtr(ptr); + } + static ReaderPtr make_reader_ptr(ChainBufferNode *ptr) { + ptr->ref_cnt_.fetch_add(1, std::memory_order_acq_rel); + return ReaderPtr(ptr); + } + + bool has_writer() { + return has_writer_.load(std::memory_order_acquire); + } + + bool unique() { + return ref_cnt_.load(std::memory_order_acquire) == 1; + } + + ChainBufferNode(BufferSlice slice, bool sync_flag) : slice_(std::move(slice)), sync_flag_(sync_flag) { + } + + // reader + // There are two options + // 1. Fixed slice of Buffer + // 2. Slice with non-fixed right end + // In each case slice_ is const. Reader should read it and use sync_with_writer on its own copy. + const BufferSlice slice_; + const bool sync_flag_{false}; // should we call slice_.sync_with_writer or not. + + // writer + ReaderPtr next_{nullptr}; + + private: + std::atomic<int> ref_cnt_{0}; + std::atomic<bool> has_writer_{false}; + + static void clear_nonrecursive(ReaderPtr ptr) { + while (ptr && ptr->unique()) { + ptr = std::move(ptr->next_); + } + } + static void dec_ref_cnt(ChainBufferNode *ptr) { + int left = --ptr->ref_cnt_; + if (left == 0) { + clear_nonrecursive(std::move(ptr->next_)); + // TODO(refact): move memory management into allocator (?) + delete ptr; + } + } +}; + +using ChainBufferNodeWriterPtr = ChainBufferNode::WriterPtr; +using ChainBufferNodeReaderPtr = ChainBufferNode::ReaderPtr; + +class ChainBufferNodeAllocator { + public: + static ChainBufferNodeWriterPtr create(BufferSlice slice, bool sync_flag) { + auto *ptr = new ChainBufferNode(std::move(slice), sync_flag); + return ChainBufferNode::make_writer_ptr(ptr); + } + static ChainBufferNodeReaderPtr clone(const ChainBufferNodeReaderPtr &ptr) { + if (!ptr) { + return ChainBufferNodeReaderPtr(); + } + return ChainBufferNode::make_reader_ptr(ptr.get()); + } + static ChainBufferNodeReaderPtr clone(ChainBufferNodeWriterPtr &ptr) { + if (!ptr) { + return ChainBufferNodeReaderPtr(); + } + return ChainBufferNode::make_reader_ptr(ptr.get()); + } +}; + +class ChainBufferIterator { + public: + ChainBufferIterator() = default; + explicit ChainBufferIterator(ChainBufferNodeReaderPtr head) : head_(std::move(head)) { + load_head(); + } + ChainBufferIterator clone() const { + return ChainBufferIterator(ChainBufferNodeAllocator::clone(head_), reader_.clone(), need_sync_, offset_); + } + + size_t offset() const { + return offset_; + } + + void clear() { + *this = ChainBufferIterator(); + } + + Slice prepare_read() { + if (!head_) { + return Slice(); + } + while (true) { + auto res = reader_.prepare_read(); + if (!res.empty()) { + return res; + } + auto has_writer = head_->has_writer(); + if (need_sync_) { + reader_.sync_with_writer(); + res = reader_.prepare_read(); + if (!res.empty()) { + return res; + } + } + if (has_writer) { + return Slice(); + } + head_ = ChainBufferNodeAllocator::clone(head_->next_); + if (!head_) { + return Slice(); + } + load_head(); + } + } + + // returns only head + BufferSlice read_as_buffer_slice(size_t limit) { + prepare_read(); + auto res = reader_.clone(); + res.truncate(limit); + confirm_read(res.size()); + return res; + } + + const BufferSlice &head() const { + return reader_; + } + + void confirm_read(size_t size) { + offset_ += size; + reader_.confirm_read(size); + } + + void advance_till_end() { + while (true) { + auto ready = prepare_read(); + if (ready.empty()) { + break; + } + confirm_read(ready.size()); + } + } + + size_t advance(size_t offset, MutableSlice dest = MutableSlice()) { + size_t skipped = 0; + while (offset != 0) { + auto ready = prepare_read(); + if (ready.empty()) { + break; + } + + // read no more than offset + ready.truncate(offset); + offset -= ready.size(); + skipped += ready.size(); + + // copy to dest if possible + auto to_dest_size = min(ready.size(), dest.size()); + if (to_dest_size != 0) { + std::memcpy(dest.data(), ready.data(), to_dest_size); + dest.remove_prefix(to_dest_size); + } + + confirm_read(ready.size()); + } + return skipped; + } + + private: + ChainBufferNodeReaderPtr head_; + BufferSlice reader_; // copy of head_->slice_ + bool need_sync_ = false; // copy of head_->sync_flag_ + size_t offset_ = 0; // position in the union of all nodes + + ChainBufferIterator(ChainBufferNodeReaderPtr head, BufferSlice reader, bool need_sync, size_t offset) + : head_(std::move(head)), reader_(std::move(reader)), need_sync_(need_sync), offset_(offset) { + } + void load_head() { + reader_ = head_->slice_.clone(); + need_sync_ = head_->sync_flag_; + } +}; + +class ChainBufferReader { + public: + ChainBufferReader() = default; + explicit ChainBufferReader(ChainBufferNodeReaderPtr head) + : begin_(ChainBufferNodeAllocator::clone(head)), end_(std::move(head)) { + end_.advance_till_end(); + } + ChainBufferReader(ChainBufferIterator begin, ChainBufferIterator end, bool sync_flag) + : begin_(std::move(begin)), end_(std::move(end)), sync_flag_(sync_flag) { + } + ChainBufferReader(ChainBufferNodeReaderPtr head, size_t size) + : begin_(ChainBufferNodeAllocator::clone(head)), end_(std::move(head)) { + auto advanced = end_.advance(size); + CHECK(advanced == size); + } + ChainBufferReader(ChainBufferReader &&) = default; + ChainBufferReader &operator=(ChainBufferReader &&) = default; + ChainBufferReader(const ChainBufferReader &) = delete; + ChainBufferReader &operator=(const ChainBufferReader &) = delete; + ~ChainBufferReader() = default; + + ChainBufferReader clone() { + return ChainBufferReader(begin_.clone(), end_.clone(), sync_flag_); + } + + Slice prepare_read() { + auto res = begin_.prepare_read(); + res.truncate(size()); + return res; + } + + void confirm_read(size_t size) { + CHECK(size <= this->size()); + begin_.confirm_read(size); + } + + size_t advance(size_t offset, MutableSlice dest = MutableSlice()) { + CHECK(offset <= size()); + return begin_.advance(offset, dest); + } + + size_t size() const { + return end_.offset() - begin_.offset(); + } + bool empty() const { + return size() == 0; + } + + void sync_with_writer() { + if (sync_flag_) { + end_.advance_till_end(); + } + } + void advance_end(size_t size) { + end_.advance(size); + } + const ChainBufferIterator &begin() { + return begin_; + } + const ChainBufferIterator &end() { + return end_; + } + + // Return [begin_, tail.begin_) + // *this = tail + ChainBufferReader cut_head(ChainBufferIterator pos) { + auto tmp = begin_.clone(); + begin_ = pos.clone(); + return ChainBufferReader(std::move(tmp), std::move(pos), false); + } + + ChainBufferReader cut_head(size_t offset) { + CHECK(offset <= size()) << offset << " " << size(); + auto it = begin_.clone(); + it.advance(offset); + return cut_head(std::move(it)); + } + + BufferSlice move_as_buffer_slice() { + BufferSlice res; + if (begin_.head().size() >= size()) { + res = begin_.read_as_buffer_slice(size()); + } else { + auto save_size = size(); + res = BufferSlice{save_size}; + advance(save_size, res.as_slice()); + } + *this = ChainBufferReader(); + return res; + } + + BufferSlice read_as_buffer_slice(size_t limit = std::numeric_limits<size_t>::max()) { + return begin_.read_as_buffer_slice(min(limit, size())); + } + + private: + ChainBufferIterator begin_; // use it for prepare_read. Fix result with size() + ChainBufferIterator end_; // keep end as far as we can. use it for size() + bool sync_flag_ = true; // auto sync of end_ + + // 1. We have fixed size. Than end_ is useless. + // 2. No fixed size. One has to sync end_ with end_.advance_till_end() in order to calculate size. +}; + +class ChainBufferWriter { + public: + ChainBufferWriter() { + init(); + } + + // legacy + static ChainBufferWriter create_empty(size_t size = 0) { + return ChainBufferWriter(); + } + + void init(size_t size = 0) { + writer_ = BufferWriter(size); + tail_ = ChainBufferNodeAllocator::create(writer_.as_buffer_slice(), true); + head_ = ChainBufferNodeAllocator::clone(tail_); + } + + MutableSlice prepare_append(size_t hint = 0) { + CHECK(!empty()); + auto res = prepare_append_inplace(); + if (res.empty()) { + return prepare_append_alloc(hint); + } + return res; + } + MutableSlice prepare_append_inplace() { + CHECK(!empty()); + return writer_.prepare_append(); + } + MutableSlice prepare_append_alloc(size_t hint = 0) { + CHECK(!empty()); + if (hint < (1 << 10)) { + hint = 1 << 12; + } + BufferWriter new_writer(hint); + auto new_tail = ChainBufferNodeAllocator::create(new_writer.as_buffer_slice(), true); + tail_->next_ = ChainBufferNodeAllocator::clone(new_tail); + writer_ = std::move(new_writer); + tail_ = std::move(new_tail); // release tail_ + return writer_.prepare_append(); + } + void confirm_append(size_t size) { + CHECK(!empty()); + writer_.confirm_append(size); + } + + void append(Slice slice) { + while (!slice.empty()) { + auto ready = prepare_append(slice.size()); + auto shift = min(ready.size(), slice.size()); + std::memcpy(ready.data(), slice.data(), shift); + confirm_append(shift); + slice.remove_prefix(shift); + } + } + + void append(BufferSlice slice) { + auto ready = prepare_append_inplace(); + // TODO(perf): we have to store some stats in ChainBufferWriter + // for better append logic + if (slice.size() < (1 << 8) || ready.size() >= slice.size()) { + return append(slice.as_slice()); + } + + auto new_tail = ChainBufferNodeAllocator::create(std::move(slice), false); + tail_->next_ = ChainBufferNodeAllocator::clone(new_tail); + writer_ = BufferWriter(); + tail_ = std::move(new_tail); // release tail_ + } + + void append(ChainBufferReader &&reader) { + while (!reader.empty()) { + append(reader.read_as_buffer_slice()); + } + } + void append(ChainBufferReader &reader) { + while (!reader.empty()) { + append(reader.read_as_buffer_slice()); + } + } + + ChainBufferReader extract_reader() { + CHECK(head_); + return ChainBufferReader(std::move(head_)); + } + + private: + bool empty() const { + return !tail_; + } + + ChainBufferNodeReaderPtr head_; + ChainBufferNodeWriterPtr tail_; + BufferWriter writer_; +}; + +} // namespace td |