summaryrefslogtreecommitdiff
path: root/protocols/Telegram/tdlib/td/tdutils/td/utils/OrderedEventsProcessor.h
blob: 8b3474ab576ab3f0730e1014a3af7f492781aea9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2022
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once

#include "td/utils/common.h"
#include "td/utils/logging.h"

#include <utility>

namespace td {

// Process states in order defined by their Id
template <class DataT>
class OrderedEventsProcessor {
 public:
  using SeqNo = uint64;

  OrderedEventsProcessor() = default;
  explicit OrderedEventsProcessor(SeqNo offset) : offset_(offset), begin_(offset_), end_(offset_) {
  }

  template <class FunctionT>
  void clear(FunctionT &&function) {
    for (auto &it : data_array_) {
      if (it.second) {
        function(std::move(it.first));
      }
    }
    *this = OrderedEventsProcessor();
  }
  void clear() {
    *this = OrderedEventsProcessor();
  }
  template <class FromDataT, class FunctionT>
  void add(SeqNo seq_no, FromDataT &&data, FunctionT &&function) {
    LOG_CHECK(seq_no >= begin_) << seq_no << ">=" << begin_;  // or ignore?

    if (seq_no == begin_) {  // run now
      begin_++;
      function(seq_no, std::forward<FromDataT>(data));

      while (begin_ < end_) {
        auto &data_flag = data_array_[static_cast<size_t>(begin_ - offset_)];
        if (!data_flag.second) {
          break;
        }
        function(begin_, std::move(data_flag.first));
        data_flag.second = false;
        begin_++;
      }
      if (begin_ > end_) {
        end_ = begin_;
      }
      if (begin_ == end_) {
        offset_ = begin_;
      }

      // try_compactify
      auto begin_pos = static_cast<size_t>(begin_ - offset_);
      if (begin_pos > 5 && begin_pos * 2 > data_array_.size()) {
        data_array_.erase(data_array_.begin(), data_array_.begin() + begin_pos);
        offset_ = begin_;
      }
    } else {
      auto pos = static_cast<size_t>(seq_no - offset_);
      auto need_size = pos + 1;
      if (data_array_.size() < need_size) {
        data_array_.resize(need_size);
      }
      data_array_[pos].first = std::forward<FromDataT>(data);
      data_array_[pos].second = true;
      if (end_ < seq_no + 1) {
        end_ = seq_no + 1;
      }
    }
  }

  bool has_events() const {
    return begin_ != end_;
  }
  SeqNo max_unfinished_seq_no() {
    return end_ - 1;
  }
  SeqNo max_finished_seq_no() {
    return begin_ - 1;
  }

 private:
  SeqNo offset_ = 1;
  SeqNo begin_ = 1;
  SeqNo end_ = 1;
  std::vector<std::pair<DataT, bool>> data_array_;
};

}  // namespace td