diff options
Diffstat (limited to 'protocols/Telegram/tdlib/td/td/telegram/files/FileDownloader.cpp')
-rw-r--r-- | protocols/Telegram/tdlib/td/td/telegram/files/FileDownloader.cpp | 499 |
1 files changed, 363 insertions, 136 deletions
diff --git a/protocols/Telegram/tdlib/td/td/telegram/files/FileDownloader.cpp b/protocols/Telegram/tdlib/td/td/telegram/files/FileDownloader.cpp index 866c714ca4..a7a89c70a3 100644 --- a/protocols/Telegram/tdlib/td/td/telegram/files/FileDownloader.cpp +++ b/protocols/Telegram/tdlib/td/td/telegram/files/FileDownloader.cpp @@ -1,5 +1,5 @@ // -// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -10,14 +10,14 @@ #include "td/telegram/files/FileLoaderUtils.h" #include "td/telegram/files/FileType.h" #include "td/telegram/Global.h" -#include "td/telegram/net/DcId.h" +#include "td/telegram/net/NetQueryDispatcher.h" #include "td/telegram/SecureStorage.h" +#include "td/telegram/telegram_api.h" #include "td/telegram/UniqueId.h" #include "td/utils/as.h" #include "td/utils/base64.h" #include "td/utils/buffer.h" -#include "td/utils/common.h" #include "td/utils/crypto.h" #include "td/utils/format.h" #include "td/utils/logging.h" @@ -43,111 +43,21 @@ FileDownloader::FileDownloader(const FullRemoteFileLocation &remote, const Local , callback_(std::move(callback)) , is_small_(is_small) , need_search_file_(need_search_file) + , ordered_flag_(encryption_key_.is_secret()) , offset_(offset) , limit_(limit) { - if (encryption_key.is_secret()) { - set_ordered_flag(true); - } if (!encryption_key.empty()) { CHECK(offset_ == 0); } } -Result<FileLoader::FileInfo> FileDownloader::init() { - SCOPE_EXIT { - try_release_fd(); - }; - if (local_.type() == LocalFileLocation::Type::Full) { - return Status::Error("File is already downloaded"); - } - if (encryption_key_.is_secure() && !encryption_key_.has_value_hash()) { - LOG(ERROR) << "Can't download Secure file with unknown value_hash"; - } - if (remote_.file_type_ == FileType::SecureEncrypted) { - size_ = 0; - } - int32 part_size = 0; - Bitmask bitmask{Bitmask::Ones{}, 0}; - if (local_.type() == LocalFileLocation::Type::Partial) { - const auto &partial = local_.partial(); - path_ = partial.path_; - auto result_fd = FileFd::open(path_, FileFd::Write | FileFd::Read); - // TODO: check timestamps.. - if (result_fd.is_ok()) { - if ((!encryption_key_.is_secret() || partial.iv_.size() == 32) && partial.part_size_ >= 0 && - partial.part_size_ <= (1 << 20) && (partial.part_size_ & (partial.part_size_ - 1)) == 0) { - bitmask = Bitmask(Bitmask::Decode{}, partial.ready_bitmask_); - if (encryption_key_.is_secret()) { - encryption_key_.mutable_iv() = as<UInt256>(partial.iv_.data()); - next_part_ = narrow_cast<int32>(bitmask.get_ready_parts(0)); - } - fd_ = result_fd.move_as_ok(); - part_size = static_cast<int32>(partial.part_size_); - } else { - LOG(ERROR) << "Have invalid " << partial; - } - } - } - if (need_search_file_ && fd_.empty() && size_ > 0 && encryption_key_.empty() && !remote_.is_web()) { - [&] { - TRY_RESULT(path, search_file(remote_.file_type_, name_, size_)); - TRY_RESULT(fd, FileFd::open(path, FileFd::Read)); - LOG(INFO) << "Check hash of local file " << path; - path_ = std::move(path); - fd_ = std::move(fd); - need_check_ = true; - only_check_ = true; - part_size = 128 * (1 << 10); - bitmask = Bitmask{Bitmask::Ones{}, (size_ + part_size - 1) / part_size}; - return Status::OK(); - }(); - } - - FileInfo res; - res.size = size_; - res.is_size_final = true; - res.part_size = part_size; - res.ready_parts = bitmask.as_vector(); - res.use_part_count_limit = false; - res.only_check = only_check_; - auto file_type = get_main_file_type(remote_.file_type_); - res.need_delay = - !is_small_ && (file_type == FileType::VideoNote || file_type == FileType::Document || - file_type == FileType::VoiceNote || file_type == FileType::Audio || file_type == FileType::Video || - file_type == FileType::Animation || (file_type == FileType::Encrypted && size_ > (1 << 20))); - res.offset = offset_; - res.limit = limit_; - return res; -} - -Status FileDownloader::on_ok(int64 size) { - std::string path; - fd_.close(); - if (encryption_key_.is_secure()) { - TRY_RESULT(file_path, open_temp_file(remote_.file_type_)); - string tmp_path; - std::tie(std::ignore, tmp_path) = std::move(file_path); - TRY_STATUS(secure_storage::decrypt_file(encryption_key_.secret(), encryption_key_.value_hash(), path_, tmp_path)); - unlink(path_).ignore(); - path_ = std::move(tmp_path); - TRY_RESULT(path_stat, stat(path_)); - size = path_stat.size_; - } - if (only_check_) { - path = path_; - } else { - TRY_RESULT_ASSIGN(path, create_from_temp(remote_.file_type_, path_, name_)); - } - callback_->on_ok(FullLocalFileLocation(remote_.file_type_, std::move(path), 0), size, !only_check_); - return Status::OK(); -} - void FileDownloader::on_error(Status status) { fd_.close(); + stop_flag_ = true; callback_->on_error(std::move(status)); } -Result<bool> FileDownloader::should_restart_part(Part part, NetQueryPtr &net_query) { +Result<bool> FileDownloader::should_restart_part(Part part, const NetQueryPtr &net_query) { // Check if we should use CDN or reupload file to CDN if (net_query->is_error()) { @@ -220,17 +130,17 @@ Result<bool> FileDownloader::should_restart_part(Part part, NetQueryPtr &net_que return false; } -Result<std::pair<NetQueryPtr, bool>> FileDownloader::start_part(Part part, int32 part_count, int64 streaming_offset) { +Result<NetQueryPtr> FileDownloader::start_part(Part part, int32 part_count, int64 streaming_offset) { if (encryption_key_.is_secret()) { part.size = (part.size + 15) & ~15; // fix for last part } // auto size = part.size; //// sometimes we can ask more than server has, just to check size - // if (size < get_part_size()) { - // size = min(size + 16, get_part_size()); + // if (size < parts_manager_.get_part_size()) { + // size = min(size + 16, parts_manager_.get_part_size()); // LOG(INFO) << "Ask " << size << " instead of " << part.size; //} - auto size = get_part_size(); + auto size = parts_manager_.get_part_size(); CHECK(part.size <= size); callback_->on_start_download(); @@ -250,12 +160,12 @@ Result<std::pair<NetQueryPtr, bool>> FileDownloader::start_part(Part part, int32 net_query = remote_.is_web() ? G()->net_query_creator().create( - unique_id, + unique_id, nullptr, telegram_api::upload_getWebFile(remote_.as_input_web_file_location(), narrow_cast<int32>(part.offset), narrow_cast<int32>(size)), {}, dc_id, net_query_type, NetQuery::AuthFlag::On) : G()->net_query_creator().create( - unique_id, + unique_id, nullptr, telegram_api::upload_getFile(flags, false /*ignored*/, false /*ignored*/, remote_.as_input_file_location(), part.offset, narrow_cast<int32>(size)), {}, dc_id, net_query_type, NetQuery::AuthFlag::On); @@ -269,17 +179,17 @@ Result<std::pair<NetQueryPtr, bool>> FileDownloader::start_part(Part part, int32 cdn_part_file_token_generation_[part.id] = cdn_file_token_generation_; net_query = G()->net_query_creator().create(UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::CDN)), - query, {}, cdn_dc_id_, net_query_type, NetQuery::AuthFlag::Off); + nullptr, query, {}, cdn_dc_id_, net_query_type, NetQuery::AuthFlag::Off); } else { auto query = telegram_api::upload_reuploadCdnFile(BufferSlice(cdn_file_token_), BufferSlice(it->second)); net_query = G()->net_query_creator().create( - UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::ReuploadCDN)), query, {}, + UniqueId::next(UniqueId::Type::Default, static_cast<uint8>(QueryType::ReuploadCDN)), nullptr, query, {}, remote_.get_dc_id(), net_query_type, NetQuery::AuthFlag::On); cdn_part_reupload_token_.erase(it); } } net_query->file_type_ = narrow_cast<int32>(remote_.file_type_); - return std::make_pair(std::move(net_query), false); + return std::move(net_query); } Status FileDownloader::check_net_query(NetQueryPtr &net_query) { @@ -304,10 +214,10 @@ Result<size_t> FileDownloader::process_part(Part part, NetQueryPtr net_query) { switch (query_type) { case QueryType::Default: { if (remote_.is_web()) { - TRY_RESULT(file, fetch_result<telegram_api::upload_getWebFile>(net_query->ok())); + TRY_RESULT(file, fetch_result<telegram_api::upload_getWebFile>(std::move(net_query))); bytes = std::move(file->bytes_); } else { - TRY_RESULT(file_base, fetch_result<telegram_api::upload_getFile>(net_query->ok())); + TRY_RESULT(file_base, fetch_result<telegram_api::upload_getFile>(std::move(net_query))); CHECK(file_base->get_id() == telegram_api::upload_file::ID); auto file = move_tl_object_as<telegram_api::upload_file>(file_base); LOG(DEBUG) << "Receive part " << part.id << ": " << to_string(file); @@ -316,7 +226,7 @@ Result<size_t> FileDownloader::process_part(Part part, NetQueryPtr net_query) { break; } case QueryType::CDN: { - TRY_RESULT(file_base, fetch_result<telegram_api::upload_getCdnFile>(net_query->ok())); + TRY_RESULT(file_base, fetch_result<telegram_api::upload_getCdnFile>(std::move(net_query))); CHECK(file_base->get_id() == telegram_api::upload_cdnFile::ID); auto file = move_tl_object_as<telegram_api::upload_cdnFile>(file_base); LOG(DEBUG) << "Receive part " << part.id << " from CDN: " << to_string(file); @@ -375,37 +285,37 @@ Result<size_t> FileDownloader::process_part(Part part, NetQueryPtr net_query) { return written; } -void FileDownloader::on_progress(Progress progress) { - if (progress.is_ready) { +void FileDownloader::on_progress() { + if (parts_manager_.ready()) { // do not send partial location. will lead to wrong local_size return; } - if (progress.ready_size == 0 || path_.empty()) { + auto ready_size = parts_manager_.get_ready_size(); + if (ready_size == 0 || path_.empty()) { return; } + auto part_size = static_cast<int32>(parts_manager_.get_part_size()); + auto size = parts_manager_.get_size_or_zero(); if (encryption_key_.empty() || encryption_key_.is_secure()) { callback_->on_partial_download( - PartialLocalFileLocation{remote_.file_type_, progress.part_size, path_, "", std::move(progress.ready_bitmask)}, - progress.ready_size, progress.size); + PartialLocalFileLocation{remote_.file_type_, part_size, path_, "", parts_manager_.get_bitmask(), ready_size}, + size); } else if (encryption_key_.is_secret()) { UInt256 iv; - if (progress.ready_part_count == next_part_) { + auto ready_part_count = parts_manager_.get_ready_prefix_count(); + if (ready_part_count == next_part_) { iv = encryption_key_.mutable_iv(); } else { - LOG(FATAL) << tag("ready_part_count", progress.ready_part_count) << tag("next_part", next_part_); + LOG(FATAL) << tag("ready_part_count", ready_part_count) << tag("next_part", next_part_); } - callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, progress.part_size, path_, - as_slice(iv).str(), std::move(progress.ready_bitmask)}, - progress.ready_size, progress.size); + callback_->on_partial_download(PartialLocalFileLocation{remote_.file_type_, part_size, path_, as_slice(iv).str(), + parts_manager_.get_bitmask(), ready_size}, + size); } else { UNREACHABLE(); } } -FileLoader::Callback *FileDownloader::get_callback() { - return static_cast<FileLoader::Callback *>(callback_.get()); -} - Status FileDownloader::process_check_query(NetQueryPtr net_query) { has_hash_query_ = false; TRY_STATUS(check_net_query(net_query)); @@ -414,15 +324,15 @@ Status FileDownloader::process_check_query(NetQueryPtr net_query) { return Status::OK(); } -Result<FileLoader::CheckInfo> FileDownloader::check_loop(int64 checked_prefix_size, int64 ready_prefix_size, - bool is_ready) { +Status FileDownloader::check_loop(int64 checked_prefix_size, int64 ready_prefix_size, bool is_ready) { if (!need_check_) { - return CheckInfo{}; + return Status::OK(); } SCOPE_EXIT { try_release_fd(); }; - CheckInfo info; + bool is_changed = false; + vector<NetQueryPtr> queries; while (checked_prefix_size < ready_prefix_size) { //LOG(ERROR) << "NEED TO CHECK: " << checked_prefix_size << "->" << ready_prefix_size - checked_prefix_size; HashInfo search_info; @@ -459,7 +369,7 @@ Result<FileLoader::CheckInfo> FileDownloader::check_loop(int64 checked_prefix_si } checked_prefix_size = end_offset; - info.changed = true; + is_changed = true; continue; } if (!has_hash_query_) { @@ -467,15 +377,26 @@ Result<FileLoader::CheckInfo> FileDownloader::check_loop(int64 checked_prefix_si auto query = telegram_api::upload_getFileHashes(remote_.as_input_file_location(), checked_prefix_size); auto net_query_type = is_small_ ? NetQuery::Type::DownloadSmall : NetQuery::Type::Download; auto net_query = G()->net_query_creator().create(query, {}, remote_.get_dc_id(), net_query_type); - info.queries.push_back(std::move(net_query)); + queries.push_back(std::move(net_query)); break; } // Should fail? break; } - info.need_check = need_check_; - info.checked_prefix_size = checked_prefix_size; - return std::move(info); + + if (is_changed) { + on_progress(); + } + for (auto &query : queries) { + G()->net_query_dispatcher().dispatch_with_callback( + std::move(query), actor_shared(this, UniqueId::next(UniqueId::Type::Default, COMMON_QUERY_KEY))); + } + if (need_check_) { + parts_manager_.set_need_check(); + parts_manager_.set_checked_prefix_size(checked_prefix_size); + } + + return Status::OK(); } void FileDownloader::add_hash_info(const std::vector<telegram_api::object_ptr<telegram_api::fileHash>> &hashes) { @@ -489,11 +410,6 @@ void FileDownloader::add_hash_info(const std::vector<telegram_api::object_ptr<te } } -void FileDownloader::keep_fd_flag(bool keep_fd) { - keep_fd_ = keep_fd; - try_release_fd(); -} - void FileDownloader::try_release_fd() { if (!keep_fd_ && !fd_.empty()) { fd_.close(); @@ -511,4 +427,315 @@ Status FileDownloader::acquire_fd() { return Status::OK(); } +void FileDownloader::set_resource_manager(ActorShared<ResourceManager> resource_manager) { + resource_manager_ = std::move(resource_manager); + send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_); +} + +void FileDownloader::update_priority(int8 priority) { + send_closure(resource_manager_, &ResourceManager::update_priority, priority); +} + +void FileDownloader::update_resources(const ResourceState &other) { + resource_state_.update_slave(other); + VLOG(file_loader) << "Update resources " << resource_state_; + loop(); +} + +void FileDownloader::hangup() { + if (delay_dispatcher_.empty()) { + stop(); + } else { + delay_dispatcher_.reset(); + } +} + +void FileDownloader::hangup_shared() { + if (get_link_token() == 1) { + stop(); + } +} + +void FileDownloader::update_downloaded_part(int64 offset, int64 limit, int64 max_resource_limit) { + if (parts_manager_.get_streaming_offset() != offset) { + auto begin_part_id = parts_manager_.set_streaming_offset(offset, limit); + auto new_end_part_id = limit <= 0 ? parts_manager_.get_part_count() + : narrow_cast<int32>((offset + limit - 1) / parts_manager_.get_part_size()) + 1; + auto max_parts = narrow_cast<int32>(max_resource_limit / parts_manager_.get_part_size()); + auto end_part_id = begin_part_id + td::min(max_parts, new_end_part_id - begin_part_id); + VLOG(file_loader) << "Protect parts " << begin_part_id << " ... " << end_part_id - 1; + for (auto &it : part_map_) { + if (!it.second.second.empty() && !(begin_part_id <= it.second.first.id && it.second.first.id < end_part_id)) { + VLOG(file_loader) << "Cancel part " << it.second.first.id; + it.second.second.reset(); // cancel_query(it.second.second); + } + } + } else { + parts_manager_.set_streaming_limit(limit); + } + update_estimated_limit(); + loop(); +} + +void FileDownloader::start_up() { + if (local_.type() == LocalFileLocation::Type::Full) { + return on_error(Status::Error("File is already downloaded")); + } + if (encryption_key_.is_secure() && !encryption_key_.has_value_hash()) { + LOG(ERROR) << "Can't download Secure file with unknown value_hash"; + } + if (remote_.file_type_ == FileType::SecureEncrypted) { + size_ = 0; + } + int32 part_size = 0; + Bitmask bitmask{Bitmask::Ones{}, 0}; + if (local_.type() == LocalFileLocation::Type::Partial) { + const auto &partial = local_.partial(); + path_ = partial.path_; + auto result_fd = FileFd::open(path_, FileFd::Write | FileFd::Read); + // TODO: check timestamps.. + if (result_fd.is_ok()) { + if ((!encryption_key_.is_secret() || partial.iv_.size() == 32) && partial.part_size_ >= 0 && + partial.part_size_ <= (1 << 20) && (partial.part_size_ & (partial.part_size_ - 1)) == 0) { + bitmask = Bitmask(Bitmask::Decode{}, partial.ready_bitmask_); + if (encryption_key_.is_secret()) { + encryption_key_.mutable_iv() = as<UInt256>(partial.iv_.data()); + next_part_ = narrow_cast<int32>(bitmask.get_ready_parts(0)); + } + fd_ = result_fd.move_as_ok(); + part_size = static_cast<int32>(partial.part_size_); + } else { + LOG(ERROR) << "Have invalid " << partial; + } + } + } + if (need_search_file_ && fd_.empty() && size_ > 0 && encryption_key_.empty() && !remote_.is_web()) { + auto r_path = search_file(remote_.file_type_, name_, size_); + if (r_path.is_ok()) { + auto r_fd = FileFd::open(r_path.ok(), FileFd::Read); + if (r_fd.is_ok()) { + path_ = r_path.move_as_ok(); + fd_ = r_fd.move_as_ok(); + need_check_ = true; + only_check_ = true; + part_size = 128 * (1 << 10); + bitmask = Bitmask{Bitmask::Ones{}, (size_ + part_size - 1) / part_size}; + LOG(INFO) << "Check hash of local file " << path_; + } + } + } + try_release_fd(); + + auto ready_parts = bitmask.as_vector(); + auto status = parts_manager_.init(size_, size_, true, part_size, ready_parts, false, false); + LOG(DEBUG) << "Start downloading a file of size " << size_ << ", part size " << part_size << " and " + << ready_parts.size() << " ready parts: " << status; + if (status.is_error()) { + return on_error(std::move(status)); + } + if (only_check_) { + parts_manager_.set_checked_prefix_size(0); + } + parts_manager_.set_streaming_offset(offset_, limit_); + if (ordered_flag_) { + ordered_parts_ = OrderedEventsProcessor<std::pair<Part, NetQueryPtr>>(parts_manager_.get_ready_prefix_count()); + } + auto file_type = get_main_file_type(remote_.file_type_); + if (!is_small_ && + (file_type == FileType::VideoNote || file_type == FileType::Document || file_type == FileType::VoiceNote || + file_type == FileType::Audio || file_type == FileType::Video || file_type == FileType::Animation || + file_type == FileType::VideoStory || (file_type == FileType::Encrypted && size_ > (1 << 20)))) { + delay_dispatcher_ = create_actor<DelayDispatcher>("DelayDispatcher", 0.003, actor_shared(this, 1)); + next_delay_ = 0.05; + } + resource_state_.set_unit_size(parts_manager_.get_part_size()); + update_estimated_limit(); + on_progress(); + yield(); +} + +void FileDownloader::loop() { + if (stop_flag_) { + return; + } + auto status = do_loop(); + if (status.is_error()) { + if (status.code() == -1) { + return; + } + return on_error(std::move(status)); + } +} + +Status FileDownloader::do_loop() { + TRY_STATUS(check_loop(parts_manager_.get_checked_prefix_size(), parts_manager_.get_unchecked_ready_prefix_size(), + parts_manager_.unchecked_ready())); + + if (parts_manager_.may_finish()) { + TRY_STATUS(parts_manager_.finish()); + fd_.close(); + auto size = parts_manager_.get_size(); + if (encryption_key_.is_secure()) { + TRY_RESULT(file_path, open_temp_file(remote_.file_type_)); + string tmp_path; + std::tie(std::ignore, tmp_path) = std::move(file_path); + TRY_STATUS(secure_storage::decrypt_file(encryption_key_.secret(), encryption_key_.value_hash(), path_, tmp_path)); + unlink(path_).ignore(); + path_ = std::move(tmp_path); + TRY_RESULT(path_stat, stat(path_)); + size = path_stat.size_; + } + string path; + if (only_check_) { + path = path_; + } else { + TRY_RESULT_ASSIGN(path, create_from_temp(remote_.file_type_, path_, name_)); + } + callback_->on_ok(FullLocalFileLocation(remote_.file_type_, std::move(path), 0), size, !only_check_); + + LOG(INFO) << "Bad download order rate: " + << (debug_total_parts_ == 0 ? 0.0 : 100.0 * debug_bad_part_order_ / debug_total_parts_) << "% " + << debug_bad_part_order_ << "/" << debug_total_parts_ << " " << format::as_array(debug_bad_parts_); + stop_flag_ = true; + return Status::OK(); + } + + while (true) { + if (resource_state_.unused() < narrow_cast<int64>(parts_manager_.get_part_size())) { + VLOG(file_loader) << "Receive only " << resource_state_.unused() << " resource"; + break; + } + TRY_RESULT(part, parts_manager_.start_part()); + if (part.size == 0) { + break; + } + VLOG(file_loader) << "Start part " << tag("id", part.id) << tag("size", part.size); + resource_state_.start_use(static_cast<int64>(part.size)); + + TRY_RESULT(query, start_part(part, parts_manager_.get_part_count(), parts_manager_.get_streaming_offset())); + uint64 unique_id = UniqueId::next(); + part_map_[unique_id] = std::make_pair(part, query->cancel_slot_.get_signal_new()); + + auto callback = actor_shared(this, unique_id); + if (delay_dispatcher_.empty()) { + G()->net_query_dispatcher().dispatch_with_callback(std::move(query), std::move(callback)); + } else { + query->debug("sent to DelayDispatcher"); + send_closure(delay_dispatcher_, &DelayDispatcher::send_with_callback_and_delay, std::move(query), + std::move(callback), next_delay_); + next_delay_ = max(next_delay_ * 0.8, 0.003); + } + } + return Status::OK(); +} + +void FileDownloader::tear_down() { + for (auto &it : part_map_) { + it.second.second.reset(); // cancel_query(it.second.second); + } + ordered_parts_.clear([](auto &&part) { part.second->clear(); }); + if (!delay_dispatcher_.empty()) { + send_closure(std::move(delay_dispatcher_), &DelayDispatcher::close_silent); + } +} + +void FileDownloader::update_estimated_limit() { + if (stop_flag_) { + return; + } + auto estimated_extra = parts_manager_.get_estimated_extra(); + resource_state_.update_estimated_limit(estimated_extra); + VLOG(file_loader) << "Update estimated limit " << estimated_extra; + if (!resource_manager_.empty()) { + keep_fd_ = narrow_cast<uint64>(resource_state_.active_limit()) >= parts_manager_.get_part_size(); + try_release_fd(); + send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_); + } +} + +void FileDownloader::on_result(NetQueryPtr query) { + if (stop_flag_) { + return; + } + auto unique_id = get_link_token(); + if (UniqueId::extract_key(unique_id) == COMMON_QUERY_KEY) { + auto status = process_check_query(std::move(query)); + if (status.is_error()) { + on_error(std::move(status)); + } else { + loop(); + } + return; + } + auto it = part_map_.find(unique_id); + if (it == part_map_.end()) { + LOG(WARNING) << "Receive result for unknown part"; + return; + } + + Part part = it->second.first; + it->second.second.release(); + CHECK(query->is_ready()); + part_map_.erase(it); + + bool next = false; + auto status = [&] { + TRY_RESULT(should_restart, should_restart_part(part, query)); + if (query->is_error() && query->error().code() == NetQuery::Error::Canceled) { + should_restart = true; + } + if (should_restart) { + VLOG(file_loader) << "Restart part " << tag("id", part.id) << tag("size", part.size); + resource_state_.stop_use(static_cast<int64>(part.size)); + parts_manager_.on_part_failed(part.id); + } else { + next = true; + } + return Status::OK(); + }(); + if (status.is_error()) { + return on_error(std::move(status)); + } + + if (next) { + if (ordered_flag_) { + auto seq_no = part.id; + ordered_parts_.add( + seq_no, std::make_pair(part, std::move(query)), + [this](uint64 seq_no, std::pair<Part, NetQueryPtr> &&p) { on_part_query(p.first, std::move(p.second)); }); + } else { + on_part_query(part, std::move(query)); + } + } + update_estimated_limit(); + loop(); +} + +void FileDownloader::on_part_query(Part part, NetQueryPtr query) { + if (stop_flag_) { + // important for secret files + return; + } + auto status = try_on_part_query(part, std::move(query)); + if (status.is_error()) { + on_error(std::move(status)); + } +} + +Status FileDownloader::try_on_part_query(Part part, NetQueryPtr query) { + TRY_RESULT(size, process_part(part, std::move(query))); + VLOG(file_loader) << "Ok part " << tag("id", part.id) << tag("size", part.size); + resource_state_.stop_use(static_cast<int64>(part.size)); + auto old_ready_prefix_count = parts_manager_.get_unchecked_ready_prefix_count(); + TRY_STATUS(parts_manager_.on_part_ok(part.id, part.size, size)); + auto new_ready_prefix_count = parts_manager_.get_unchecked_ready_prefix_count(); + debug_total_parts_++; + if (old_ready_prefix_count == new_ready_prefix_count) { + debug_bad_parts_.push_back(part.id); + debug_bad_part_order_++; + } + on_progress(); + return Status::OK(); +} + } // namespace td |