summaryrefslogtreecommitdiff
path: root/protocols/Telegram/tdlib/td/td/telegram/files/FileDownloader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/Telegram/tdlib/td/td/telegram/files/FileDownloader.cpp')
-rw-r--r--protocols/Telegram/tdlib/td/td/telegram/files/FileDownloader.cpp499
1 files changed, 363 insertions, 136 deletions
diff --git a/protocols/Telegram/tdlib/td/td/telegram/files/FileDownloader.cpp b/protocols/Telegram/tdlib/td/td/telegram/files/FileDownloader.cpp
index 866c714ca4..a7a89c70a3 100644
--- a/protocols/Telegram/tdlib/td/td/telegram/files/FileDownloader.cpp
+++ b/protocols/Telegram/tdlib/td/td/telegram/files/FileDownloader.cpp
@@ -1,5 +1,5 @@
//
-// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
+// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -10,14 +10,14 @@
#include "td/telegram/files/FileLoaderUtils.h"
#include "td/telegram/files/FileType.h"
#include "td/telegram/Global.h"
-#include "td/telegram/net/DcId.h"
+#include "td/telegram/net/NetQueryDispatcher.h"
#include "td/telegram/SecureStorage.h"
+#include "td/telegram/telegram_api.h"
#include "td/telegram/UniqueId.h"
#include "td/utils/as.h"
#include "td/utils/base64.h"
#include "td/utils/buffer.h"
-#include "td/utils/common.h"
#include "td/utils/crypto.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
@@ -43,111 +43,21 @@ FileDownloader::FileDownloader(const FullRemoteFileLocation &remote, const Local
, callback_(std::move(callback))
, is_small_(is_small)
, need_search_file_(need_search_file)
+ , ordered_flag_(encryption_key_.is_secret())
, offset_(offset)
, limit_(limit) {
- if (encryption_key.is_secret()) {
- set_ordered_flag(true);
- }
if (!encryption_key.empty()) {
CHECK(offset_ == 0);
}
}
-Result<FileLoader::FileInfo> FileDownloader::init() {
- SCOPE_EXIT {
- try_release_fd();
- };
- if (local_.type() == LocalFileLocation::Type::Full) {
- return Status::Error("File is already downloaded");
- }
- if (encryption_key_.is_secure() && !encryption_key_.has_value_hash()) {
- LOG(ERROR) << "Can't download Secure file with unknown value_hash";
- }
- if (remote_.file_type_ == FileType::SecureEncrypted) {
- size_ = 0;
- }
- int32 part_size = 0;
- Bitmask bitmask{Bitmask::Ones{}, 0};
- if (local_.type() == LocalFileLocation::Type::Partial) {
- const auto &partial = local_.partial();
- path_ = partial.path_;
- auto result_fd = FileFd::open(path_, FileFd::Write | FileFd::Read);
- // TODO: check timestamps..
- if (result_fd.is_ok()) {
- if ((!encryption_key_.is_secret() || partial.iv_.size() == 32) && partial.part_size_ >= 0 &&
- partial.part_size_ <= (1 << 20) && (partial.part_size_ & (partial.part_size_ - 1)) == 0) {
- bitmask = Bitmask(Bitmask::Decode{}, partial.ready_bitmask_);
- if (encryption_key_.is_secret()) {
- encryption_key_.mutable_iv() = as<UInt256>(partial.iv_.data());
- next_part_ = narrow_cast<int32>(bitmask.get_ready_parts(0));
- }
- fd_ = result_fd.move_as_ok();
- part_size = static_cast<int32>(partial.part_size_);
- } else {
- LOG(ERROR) << "Have invalid " << partial;
- }
- }
- }
- if (need_search_file_ && fd_.empty() && size_ > 0 && encryption_key_.empty() && !remote_.is_web()) {
- [&] {
- TRY_RESULT(path, search_file(remote_.file_type_, name_, size_));
- TRY_RESULT(fd, FileFd::open(path, FileFd::Read));
- LOG(INFO) << "Check hash of local file " << path;
- path_ = std::move(path);
- fd_ = std::move(fd);
- need_check_ = true;
- only_check_ = true;
- part_size = 128 * (1 << 10);
- bitmask = Bitmask{Bitmask::Ones{}, (size_ + part_size - 1) / part_size};
- return Status::OK();
- }();
- }
-
- FileInfo res;
- res.size = size_;
- res.is_size_final = true;
- res.part_size = part_size;
- res.ready_parts = bitmask.as_vector();
- res.use_part_count_limit = false;
- res.only_check = only_check_;
- auto file_type = get_main_file_type(remote_.file_type_);
- res.need_delay =
- !is_small_ && (file_type == FileType::VideoNote || file_type == FileType::Document ||
- file_type == FileType::VoiceNote || file_type == FileType::Audio || file_type == FileType::Video ||
- file_type == FileType::Animation || (file_type == FileType::Encrypted && size_ > (1 << 20)));
- res.offset = offset_;
- res.limit = limit_;
- return res;
-}
-
-Status FileDownloader::on_ok(int64 size) {
- std::string path;
- fd_.close();
- if (encryption_key_.is_secure()) {
- TRY_RESULT(file_path, open_temp_file(remote_.file_type_));
- string tmp_path;
- std::tie(std::ignore, tmp_path) = std::move(file_path);
- TRY_STATUS(secure_storage::decrypt_file(encryption_key_.secret(), encryption_key_.value_hash(), path_, tmp_path));
- unlink(path_).ignore();
- path_ = std::move(tmp_path);
- TRY_RESULT(path_stat, stat(path_));
- size = path_stat.size_;
- }
- if (only_check_) {
- path = path_;
- } else {
- TRY_RESULT_ASSIGN(path, create_from_temp(remote_.file_type_, path_, name_));
- }
- callback_->on_ok(FullLocalFileLocation(remote_.file_type_, std::move(path), 0), size, !only_check_);
- return Status::OK();
-}
-
void FileDownloader::on_error(Status status) {
fd_.close();
+ stop_flag_ = true;
callback_->on_error(std::move(status));
}
-Result<bool> FileDownloader::should_restart_part(Part part, NetQueryPtr &net_query) {
+Result<bool> FileDownloader::should_restart_part(Part part, const NetQueryPtr &net_query) {
// Check if we should use CDN or reupload file to CDN
if (net_query->is_error()) {
@@ -220,17 +130,17 @@ Result<bool> FileDownloader::should_restart_part(Part part, NetQueryPtr &net_que
return false;
}
-Result<std::pair<NetQueryPtr, bool>> FileDownloader::start_part(Part part, int32 part_count, int64 streaming_offset) {
+Result<NetQueryPtr> FileDownloader::start_part(Part part, int32 part_count, int64 streaming_offset) {
if (encryption_key_.is_secret()) {
part.size = (part.size + 15) & ~15; // fix for last part
}
// auto size = part.size;
//// sometimes we can ask more than server has, just to check size
- // if (size < get_part_size()) {
- // size = min(size + 16, get_part_size());
+ // if (size < parts_manager_.get_part_size()) {
+ // size = min(size + 16, parts_manager_.get_part_size());
// LOG(INFO) << "Ask " << size << " instead of " << part.size;
//}
- auto size = get_part_size();
+ auto size = parts_manager_.get_part_size();
CHECK(part.size <= size);
callback_->on_start_download();
@@ -250,12 +160,12 @@ Result<std::pair<NetQueryPtr, bool>> FileDownloader::start_part(Part part, int32
net_query =
remote_.is_web()
? G()->net_query_creator().create(
- unique_id,
+ unique_id, nullptr,
telegram_api::upload_getWebFile(remote_.as_input_web_file_location(), narrow_cast<int32>(part.offset),
narrow_cast<int32>(size)),
{}, dc_id, net_query_type, NetQuery::AuthFlag::On)
: G()->net_query_creator().create(
- unique_id,
+ unique_id, nullptr,
telegram_api::upload_getFile(flags, false /*ignored*/, false /*ignored*/,
remote_.as_input_file_location(), part.offset, narrow_cast<int32>(size)),
{}, dc_id, net_query_type, NetQuery::AuthFlag::On);
@@ -269,17 +179,17 @@ Result<std::pair<NetQueryPtr, bool>> FileDownloader::start_part(Part part, int32
cdn_part_file_token_generation_[part.id] = cdn_file_token_generation_;
net_query =
G()->net_query_creator().create(UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::CDN)),
- query, {}, cdn_dc_id_, net_query_type, NetQuery::AuthFlag::Off);
+ nullptr, query, {}, cdn_dc_id_, net_query_type, NetQuery::AuthFlag::Off);
} else {
auto query = telegram_api::upload_reuploadCdnFile(BufferSlice(cdn_file_token_), BufferSlice(it->second));
net_query = G()->net_query_creator().create(
- UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::ReuploadCDN)), query, {},
+ UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::ReuploadCDN)), nullptr, query, {},
remote_.get_dc_id(), net_query_type, NetQuery::AuthFlag::On);
cdn_part_reupload_token_.erase(it);
}
}
net_query->file_type_ = narrow_cast<int32>(remote_.file_type_);
- return std::make_pair(std::move(net_query), false);
+ return std::move(net_query);
}
Status FileDownloader::check_net_query(NetQueryPtr &net_query) {
@@ -304,10 +214,10 @@ Result<size_t> FileDownloader::process_part(Part part, NetQueryPtr net_query) {
switch (query_type) {
case QueryType::Default: {
if (remote_.is_web()) {
- TRY_RESULT(file, fetch_result<telegram_api::upload_getWebFile>(net_query->ok()));
+ TRY_RESULT(file, fetch_result<telegram_api::upload_getWebFile>(std::move(net_query)));
bytes = std::move(file->bytes_);
} else {
- TRY_RESULT(file_base, fetch_result<telegram_api::upload_getFile>(net_query->ok()));
+ TRY_RESULT(file_base, fetch_result<telegram_api::upload_getFile>(std::move(net_query)));
CHECK(file_base->get_id() == telegram_api::upload_file::ID);
auto file = move_tl_object_as<telegram_api::upload_file>(file_base);
LOG(DEBUG) << "Receive part " << part.id << ": " << to_string(file);
@@ -316,7 +226,7 @@ Result<size_t> FileDownloader::process_part(Part part, NetQueryPtr net_query) {
break;
}
case QueryType::CDN: {
- TRY_RESULT(file_base, fetch_result<telegram_api::upload_getCdnFile>(net_query->ok()));
+ TRY_RESULT(file_base, fetch_result<telegram_api::upload_getCdnFile>(std::move(net_query)));
CHECK(file_base->get_id() == telegram_api::upload_cdnFile::ID);
auto file = move_tl_object_as<telegram_api::upload_cdnFile>(file_base);
LOG(DEBUG) << "Receive part " << part.id << " from CDN: " << to_string(file);
@@ -375,37 +285,37 @@ Result<size_t> FileDownloader::process_part(Part part, NetQueryPtr net_query) {
return written;
}
-void FileDownloader::on_progress(Progress progress) {
- if (progress.is_ready) {
+void FileDownloader::on_progress() {
+ if (parts_manager_.ready()) {
// do not send partial location. will lead to wrong local_size
return;
}
- if (progress.ready_size == 0 || path_.empty()) {
+ auto ready_size = parts_manager_.get_ready_size();
+ if (ready_size == 0 || path_.empty()) {
return;
}
+ auto part_size = static_cast<int32>(parts_manager_.get_part_size());
+ auto size = parts_manager_.get_size_or_zero();
if (encryption_key_.empty() || encryption_key_.is_secure()) {
callback_->on_partial_download(
- PartialLocalFileLocation{remote_.file_type_, progress.part_size, path_, "", std::move(progress.ready_bitmask)},
- progress.ready_size, progress.size);
+ PartialLocalFileLocation{remote_.file_type_, part_size, path_, "", parts_manager_.get_bitmask(), ready_size},
+ size);
} else if (encryption_key_.is_secret()) {
UInt256 iv;
- if (progress.ready_part_count == next_part_) {
+ auto ready_part_count = parts_manager_.get_ready_prefix_count();
+ if (ready_part_count == next_part_) {
iv = encryption_key_.mutable_iv();
} else {
- LOG(FATAL) << tag("ready_part_count", progress.ready_part_count) << tag("next_part", next_part_);
+ LOG(FATAL) << tag("ready_part_count", ready_part_count) << tag("next_part", next_part_);
}
- callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, progress.part_size, path_,
- as_slice(iv).str(), std::move(progress.ready_bitmask)},
- progress.ready_size, progress.size);
+ callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, part_size, path_, as_slice(iv).str(),
+ parts_manager_.get_bitmask(), ready_size},
+ size);
} else {
UNREACHABLE();
}
}
-FileLoader::Callback *FileDownloader::get_callback() {
- return static_cast<FileLoader::Callback *>(callback_.get());
-}
-
Status FileDownloader::process_check_query(NetQueryPtr net_query) {
has_hash_query_ = false;
TRY_STATUS(check_net_query(net_query));
@@ -414,15 +324,15 @@ Status FileDownloader::process_check_query(NetQueryPtr net_query) {
return Status::OK();
}
-Result<FileLoader::CheckInfo> FileDownloader::check_loop(int64 checked_prefix_size, int64 ready_prefix_size,
- bool is_ready) {
+Status FileDownloader::check_loop(int64 checked_prefix_size, int64 ready_prefix_size, bool is_ready) {
if (!need_check_) {
- return CheckInfo{};
+ return Status::OK();
}
SCOPE_EXIT {
try_release_fd();
};
- CheckInfo info;
+ bool is_changed = false;
+ vector<NetQueryPtr> queries;
while (checked_prefix_size < ready_prefix_size) {
//LOG(ERROR) << "NEED TO CHECK: " << checked_prefix_size << "->" << ready_prefix_size - checked_prefix_size;
HashInfo search_info;
@@ -459,7 +369,7 @@ Result<FileLoader::CheckInfo> FileDownloader::check_loop(int64 checked_prefix_si
}
checked_prefix_size = end_offset;
- info.changed = true;
+ is_changed = true;
continue;
}
if (!has_hash_query_) {
@@ -467,15 +377,26 @@ Result<FileLoader::CheckInfo> FileDownloader::check_loop(int64 checked_prefix_si
auto query = telegram_api::upload_getFileHashes(remote_.as_input_file_location(), checked_prefix_size);
auto net_query_type = is_small_ ? NetQuery::Type::DownloadSmall : NetQuery::Type::Download;
auto net_query = G()->net_query_creator().create(query, {}, remote_.get_dc_id(), net_query_type);
- info.queries.push_back(std::move(net_query));
+ queries.push_back(std::move(net_query));
break;
}
// Should fail?
break;
}
- info.need_check = need_check_;
- info.checked_prefix_size = checked_prefix_size;
- return std::move(info);
+
+ if (is_changed) {
+ on_progress();
+ }
+ for (auto &query : queries) {
+ G()->net_query_dispatcher().dispatch_with_callback(
+ std::move(query), actor_shared(this, UniqueId::next(UniqueId::Type::Default, COMMON_QUERY_KEY)));
+ }
+ if (need_check_) {
+ parts_manager_.set_need_check();
+ parts_manager_.set_checked_prefix_size(checked_prefix_size);
+ }
+
+ return Status::OK();
}
void FileDownloader::add_hash_info(const std::vector<telegram_api::object_ptr<telegram_api::fileHash>> &hashes) {
@@ -489,11 +410,6 @@ void FileDownloader::add_hash_info(const std::vector<telegram_api::object_ptr<te
}
}
-void FileDownloader::keep_fd_flag(bool keep_fd) {
- keep_fd_ = keep_fd;
- try_release_fd();
-}
-
void FileDownloader::try_release_fd() {
if (!keep_fd_ && !fd_.empty()) {
fd_.close();
@@ -511,4 +427,315 @@ Status FileDownloader::acquire_fd() {
return Status::OK();
}
+void FileDownloader::set_resource_manager(ActorShared<ResourceManager> resource_manager) {
+ resource_manager_ = std::move(resource_manager);
+ send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_);
+}
+
+void FileDownloader::update_priority(int8 priority) {
+ send_closure(resource_manager_, &ResourceManager::update_priority, priority);
+}
+
+void FileDownloader::update_resources(const ResourceState &other) {
+ resource_state_.update_slave(other);
+ VLOG(file_loader) << "Update resources " << resource_state_;
+ loop();
+}
+
+void FileDownloader::hangup() {
+ if (delay_dispatcher_.empty()) {
+ stop();
+ } else {
+ delay_dispatcher_.reset();
+ }
+}
+
+void FileDownloader::hangup_shared() {
+ if (get_link_token() == 1) {
+ stop();
+ }
+}
+
+void FileDownloader::update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) {
+ if (parts_manager_.get_streaming_offset() != offset) {
+ auto begin_part_id = parts_manager_.set_streaming_offset(offset, limit);
+ auto new_end_part_id = limit <= 0 ? parts_manager_.get_part_count()
+ : narrow_cast<int32>((offset + limit - 1) / parts_manager_.get_part_size()) + 1;
+ auto max_parts = narrow_cast<int32>(max_resource_limit / parts_manager_.get_part_size());
+ auto end_part_id = begin_part_id + td::min(max_parts, new_end_part_id - begin_part_id);
+ VLOG(file_loader) << "Protect parts " << begin_part_id << " ... " << end_part_id - 1;
+ for (auto &it : part_map_) {
+ if (!it.second.second.empty() && !(begin_part_id <= it.second.first.id && it.second.first.id < end_part_id)) {
+ VLOG(file_loader) << "Cancel part " << it.second.first.id;
+ it.second.second.reset(); // cancel_query(it.second.second);
+ }
+ }
+ } else {
+ parts_manager_.set_streaming_limit(limit);
+ }
+ update_estimated_limit();
+ loop();
+}
+
+void FileDownloader::start_up() {
+ if (local_.type() == LocalFileLocation::Type::Full) {
+ return on_error(Status::Error("File is already downloaded"));
+ }
+ if (encryption_key_.is_secure() && !encryption_key_.has_value_hash()) {
+ LOG(ERROR) << "Can't download Secure file with unknown value_hash";
+ }
+ if (remote_.file_type_ == FileType::SecureEncrypted) {
+ size_ = 0;
+ }
+ int32 part_size = 0;
+ Bitmask bitmask{Bitmask::Ones{}, 0};
+ if (local_.type() == LocalFileLocation::Type::Partial) {
+ const auto &partial = local_.partial();
+ path_ = partial.path_;
+ auto result_fd = FileFd::open(path_, FileFd::Write | FileFd::Read);
+ // TODO: check timestamps..
+ if (result_fd.is_ok()) {
+ if ((!encryption_key_.is_secret() || partial.iv_.size() == 32) && partial.part_size_ >= 0 &&
+ partial.part_size_ <= (1 << 20) && (partial.part_size_ & (partial.part_size_ - 1)) == 0) {
+ bitmask = Bitmask(Bitmask::Decode{}, partial.ready_bitmask_);
+ if (encryption_key_.is_secret()) {
+ encryption_key_.mutable_iv() = as<UInt256>(partial.iv_.data());
+ next_part_ = narrow_cast<int32>(bitmask.get_ready_parts(0));
+ }
+ fd_ = result_fd.move_as_ok();
+ part_size = static_cast<int32>(partial.part_size_);
+ } else {
+ LOG(ERROR) << "Have invalid " << partial;
+ }
+ }
+ }
+ if (need_search_file_ && fd_.empty() && size_ > 0 && encryption_key_.empty() && !remote_.is_web()) {
+ auto r_path = search_file(remote_.file_type_, name_, size_);
+ if (r_path.is_ok()) {
+ auto r_fd = FileFd::open(r_path.ok(), FileFd::Read);
+ if (r_fd.is_ok()) {
+ path_ = r_path.move_as_ok();
+ fd_ = r_fd.move_as_ok();
+ need_check_ = true;
+ only_check_ = true;
+ part_size = 128 * (1 << 10);
+ bitmask = Bitmask{Bitmask::Ones{}, (size_ + part_size - 1) / part_size};
+ LOG(INFO) << "Check hash of local file " << path_;
+ }
+ }
+ }
+ try_release_fd();
+
+ auto ready_parts = bitmask.as_vector();
+ auto status = parts_manager_.init(size_, size_, true, part_size, ready_parts, false, false);
+ LOG(DEBUG) << "Start downloading a file of size " << size_ << ", part size " << part_size << " and "
+ << ready_parts.size() << " ready parts: " << status;
+ if (status.is_error()) {
+ return on_error(std::move(status));
+ }
+ if (only_check_) {
+ parts_manager_.set_checked_prefix_size(0);
+ }
+ parts_manager_.set_streaming_offset(offset_, limit_);
+ if (ordered_flag_) {
+ ordered_parts_ = OrderedEventsProcessor<std::pair<Part, NetQueryPtr>>(parts_manager_.get_ready_prefix_count());
+ }
+ auto file_type = get_main_file_type(remote_.file_type_);
+ if (!is_small_ &&
+ (file_type == FileType::VideoNote || file_type == FileType::Document || file_type == FileType::VoiceNote ||
+ file_type == FileType::Audio || file_type == FileType::Video || file_type == FileType::Animation ||
+ file_type == FileType::VideoStory || (file_type == FileType::Encrypted && size_ > (1 << 20)))) {
+ delay_dispatcher_ = create_actor<DelayDispatcher>("DelayDispatcher", 0.003, actor_shared(this, 1));
+ next_delay_ = 0.05;
+ }
+ resource_state_.set_unit_size(parts_manager_.get_part_size());
+ update_estimated_limit();
+ on_progress();
+ yield();
+}
+
+void FileDownloader::loop() {
+ if (stop_flag_) {
+ return;
+ }
+ auto status = do_loop();
+ if (status.is_error()) {
+ if (status.code() == -1) {
+ return;
+ }
+ return on_error(std::move(status));
+ }
+}
+
+Status FileDownloader::do_loop() {
+ TRY_STATUS(check_loop(parts_manager_.get_checked_prefix_size(), parts_manager_.get_unchecked_ready_prefix_size(),
+ parts_manager_.unchecked_ready()));
+
+ if (parts_manager_.may_finish()) {
+ TRY_STATUS(parts_manager_.finish());
+ fd_.close();
+ auto size = parts_manager_.get_size();
+ if (encryption_key_.is_secure()) {
+ TRY_RESULT(file_path, open_temp_file(remote_.file_type_));
+ string tmp_path;
+ std::tie(std::ignore, tmp_path) = std::move(file_path);
+ TRY_STATUS(secure_storage::decrypt_file(encryption_key_.secret(), encryption_key_.value_hash(), path_, tmp_path));
+ unlink(path_).ignore();
+ path_ = std::move(tmp_path);
+ TRY_RESULT(path_stat, stat(path_));
+ size = path_stat.size_;
+ }
+ string path;
+ if (only_check_) {
+ path = path_;
+ } else {
+ TRY_RESULT_ASSIGN(path, create_from_temp(remote_.file_type_, path_, name_));
+ }
+ callback_->on_ok(FullLocalFileLocation(remote_.file_type_, std::move(path), 0), size, !only_check_);
+
+ LOG(INFO) << "Bad download order rate: "
+ << (debug_total_parts_ == 0 ? 0.0 : 100.0 * debug_bad_part_order_ / debug_total_parts_) << "% "
+ << debug_bad_part_order_ << "/" << debug_total_parts_ << " " << format::as_array(debug_bad_parts_);
+ stop_flag_ = true;
+ return Status::OK();
+ }
+
+ while (true) {
+ if (resource_state_.unused() < narrow_cast<int64>(parts_manager_.get_part_size())) {
+ VLOG(file_loader) << "Receive only " << resource_state_.unused() << " resource";
+ break;
+ }
+ TRY_RESULT(part, parts_manager_.start_part());
+ if (part.size == 0) {
+ break;
+ }
+ VLOG(file_loader) << "Start part " << tag("id", part.id) << tag("size", part.size);
+ resource_state_.start_use(static_cast<int64>(part.size));
+
+ TRY_RESULT(query, start_part(part, parts_manager_.get_part_count(), parts_manager_.get_streaming_offset()));
+ uint64 unique_id = UniqueId::next();
+ part_map_[unique_id] = std::make_pair(part, query->cancel_slot_.get_signal_new());
+
+ auto callback = actor_shared(this, unique_id);
+ if (delay_dispatcher_.empty()) {
+ G()->net_query_dispatcher().dispatch_with_callback(std::move(query), std::move(callback));
+ } else {
+ query->debug("sent to DelayDispatcher");
+ send_closure(delay_dispatcher_, &DelayDispatcher::send_with_callback_and_delay, std::move(query),
+ std::move(callback), next_delay_);
+ next_delay_ = max(next_delay_ * 0.8, 0.003);
+ }
+ }
+ return Status::OK();
+}
+
+void FileDownloader::tear_down() {
+ for (auto &it : part_map_) {
+ it.second.second.reset(); // cancel_query(it.second.second);
+ }
+ ordered_parts_.clear([](auto &&part) { part.second->clear(); });
+ if (!delay_dispatcher_.empty()) {
+ send_closure(std::move(delay_dispatcher_), &DelayDispatcher::close_silent);
+ }
+}
+
+void FileDownloader::update_estimated_limit() {
+ if (stop_flag_) {
+ return;
+ }
+ auto estimated_extra = parts_manager_.get_estimated_extra();
+ resource_state_.update_estimated_limit(estimated_extra);
+ VLOG(file_loader) << "Update estimated limit " << estimated_extra;
+ if (!resource_manager_.empty()) {
+ keep_fd_ = narrow_cast<uint64>(resource_state_.active_limit()) >= parts_manager_.get_part_size();
+ try_release_fd();
+ send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_);
+ }
+}
+
+void FileDownloader::on_result(NetQueryPtr query) {
+ if (stop_flag_) {
+ return;
+ }
+ auto unique_id = get_link_token();
+ if (UniqueId::extract_key(unique_id) == COMMON_QUERY_KEY) {
+ auto status = process_check_query(std::move(query));
+ if (status.is_error()) {
+ on_error(std::move(status));
+ } else {
+ loop();
+ }
+ return;
+ }
+ auto it = part_map_.find(unique_id);
+ if (it == part_map_.end()) {
+ LOG(WARNING) << "Receive result for unknown part";
+ return;
+ }
+
+ Part part = it->second.first;
+ it->second.second.release();
+ CHECK(query->is_ready());
+ part_map_.erase(it);
+
+ bool next = false;
+ auto status = [&] {
+ TRY_RESULT(should_restart, should_restart_part(part, query));
+ if (query->is_error() && query->error().code() == NetQuery::Error::Canceled) {
+ should_restart = true;
+ }
+ if (should_restart) {
+ VLOG(file_loader) << "Restart part " << tag("id", part.id) << tag("size", part.size);
+ resource_state_.stop_use(static_cast<int64>(part.size));
+ parts_manager_.on_part_failed(part.id);
+ } else {
+ next = true;
+ }
+ return Status::OK();
+ }();
+ if (status.is_error()) {
+ return on_error(std::move(status));
+ }
+
+ if (next) {
+ if (ordered_flag_) {
+ auto seq_no = part.id;
+ ordered_parts_.add(
+ seq_no, std::make_pair(part, std::move(query)),
+ [this](uint64 seq_no, std::pair<Part, NetQueryPtr> &&p) { on_part_query(p.first, std::move(p.second)); });
+ } else {
+ on_part_query(part, std::move(query));
+ }
+ }
+ update_estimated_limit();
+ loop();
+}
+
+void FileDownloader::on_part_query(Part part, NetQueryPtr query) {
+ if (stop_flag_) {
+ // important for secret files
+ return;
+ }
+ auto status = try_on_part_query(part, std::move(query));
+ if (status.is_error()) {
+ on_error(std::move(status));
+ }
+}
+
+Status FileDownloader::try_on_part_query(Part part, NetQueryPtr query) {
+ TRY_RESULT(size, process_part(part, std::move(query)));
+ VLOG(file_loader) << "Ok part " << tag("id", part.id) << tag("size", part.size);
+ resource_state_.stop_use(static_cast<int64>(part.size));
+ auto old_ready_prefix_count = parts_manager_.get_unchecked_ready_prefix_count();
+ TRY_STATUS(parts_manager_.on_part_ok(part.id, part.size, size));
+ auto new_ready_prefix_count = parts_manager_.get_unchecked_ready_prefix_count();
+ debug_total_parts_++;
+ if (old_ready_prefix_count == new_ready_prefix_count) {
+ debug_bad_parts_.push_back(part.id);
+ debug_bad_part_order_++;
+ }
+ on_progress();
+ return Status::OK();
+}
+
} // namespace td