diff options
author | pescuma <pescuma@c086bb3d-8645-0410-b8da-73a8550f86e7> | 2011-10-10 01:39:18 +0000 |
---|---|---|
committer | pescuma <pescuma@c086bb3d-8645-0410-b8da-73a8550f86e7> | 2011-10-10 01:39:18 +0000 |
commit | e9bf8a6e2d782dc480fb97cb59928c8cfe1dd777 (patch) | |
tree | 12bb8ebaab13ef1476ce9343482796c901ee0130 /Plugins/jingle/libjingle/talk/session/fileshare/fileshare.cc | |
parent | f574681d9b6fee0d05319af1831620e48cc8492f (diff) |
Moved files from BerliOS
git-svn-id: http://pescuma.googlecode.com/svn/trunk/Miranda@229 c086bb3d-8645-0410-b8da-73a8550f86e7
Diffstat (limited to 'Plugins/jingle/libjingle/talk/session/fileshare/fileshare.cc')
-rw-r--r-- | Plugins/jingle/libjingle/talk/session/fileshare/fileshare.cc | 1313 |
1 files changed, 1313 insertions, 0 deletions
diff --git a/Plugins/jingle/libjingle/talk/session/fileshare/fileshare.cc b/Plugins/jingle/libjingle/talk/session/fileshare/fileshare.cc new file mode 100644 index 0000000..863c352 --- /dev/null +++ b/Plugins/jingle/libjingle/talk/session/fileshare/fileshare.cc @@ -0,0 +1,1313 @@ +/* + * libjingle + * Copyright 2004--2006, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "talk/session/fileshare/fileshare.h" + +#include "talk/base/httpcommon-inl.h" + +#include "talk/base/fileutils.h" +#include "talk/base/streamutils.h" +#include "talk/base/event.h" +#include "talk/base/helpers.h" +#include "talk/base/httpclient.h" +#include "talk/base/httpserver.h" +#include "talk/base/pathutils.h" +#include "talk/base/socketstream.h" +#include "talk/base/stringdigest.h" +#include "talk/base/stringencode.h" +#include "talk/base/stringutils.h" +#include "talk/base/tarstream.h" +#include "talk/base/thread.h" +#include "talk/session/tunnel/pseudotcpchannel.h" +#include "talk/session/tunnel/tunnelsessionclient.h" + +/////////////////////////////////////////////////////////////////////////////// +// <description xmlns="http://www.google.com/session/share"> +// <manifest> +// <file size='341'> +// <name>foo.txt</name> +// </file> +// <file size='51321'> +// <name>foo.jpg</name> +// <image width='480' height='320'/> +// </file> +// <folder> +// <name>stuff</name> +// </folder> +// </manifest> +// <protocol> +// <http> +// <url name='source-path'>/temporary/23A53F01/</url> +// <url name='preview-path'>/temporary/90266EA1/</url> +// </http> +// <raw/> +// </protocol> +// </description> +// <p:transport xmns:p="p2p"/> +/////////////////////////////////////////////////////////////////////////////// + +/////////////////////////////////////////////////////////////////////////////// +// Constants and private functions +/////////////////////////////////////////////////////////////////////////////// + +const std::string NS_GOOGLE_SHARE("http://www.google.com/session/share"); + +namespace { + +const buzz::QName QN_SHARE_DESCRIPTION(true, NS_GOOGLE_SHARE, "description"); +const buzz::QName QN_SHARE_MANIFEST(true, NS_GOOGLE_SHARE, "manifest"); +const buzz::QName QN_SHARE_FOLDER(true, NS_GOOGLE_SHARE, "folder"); +const buzz::QName QN_SHARE_FILE(true, NS_GOOGLE_SHARE, "file"); +const buzz::QName QN_SHARE_NAME(true, NS_GOOGLE_SHARE, "name"); +const buzz::QName QN_SHARE_IMAGE(true, NS_GOOGLE_SHARE, "image"); +const buzz::QName QN_SHARE_PROTOCOL(true, NS_GOOGLE_SHARE, "protocol"); +const buzz::QName QN_SHARE_HTTP(true, NS_GOOGLE_SHARE, "http"); +const buzz::QName QN_SHARE_URL(true, NS_GOOGLE_SHARE, "url"); +const buzz::QName QN_SHARE_CHANNEL(true, NS_GOOGLE_SHARE, "channel"); +const buzz::QName QN_SHARE_COMPLETE(true, NS_GOOGLE_SHARE, "complete"); + +const buzz::QName QN_SIZE(true, buzz::STR_EMPTY, "size"); +const buzz::QName QN_WIDTH(true, buzz::STR_EMPTY, "width"); +const buzz::QName QN_HEIGHT(true, buzz::STR_EMPTY, "height"); + +const std::string kHttpSourcePath("source-path"); +const std::string kHttpPreviewPath("preview-path"); + +const size_t kMinImageSize = 16U; +const size_t kMaxImageSize = 0x8000U; // (32k) +const size_t kMaxPreviewSize = 1024; +// Wait 10 seconds to see if any new proxies get established +const uint32 kProxyWait = 10000; + +const int MSG_RETRY = 1; +const uint32 kFileTransferEnableRetryMs = 1000 * 60 * 4; // 4 minutes + +const std::string MIME_OCTET_STREAM("application/octet-stream"); + +enum { + MSG_PROXY_WAIT, +}; + +bool AllowedImageDimensions(size_t width, size_t height) { + return (width >= kMinImageSize) && (width <= kMaxImageSize) + && (height >= kMinImageSize) && (height <= kMaxImageSize); +} + +} // anon namespace + +namespace cricket { + +/////////////////////////////////////////////////////////////////////////////// +// FileShareManifest +/////////////////////////////////////////////////////////////////////////////// + +void +FileShareManifest::AddFile(const std::string& name, size_t size) { + Item i = { T_FILE, name, size }; + items_.push_back(i); +} + +void +FileShareManifest::AddImage(const std::string& name, size_t size, + size_t width, size_t height) { + Item i = { T_IMAGE, name, size, width, height }; + items_.push_back(i); +} + +void +FileShareManifest::AddFolder(const std::string& name, size_t size) { + Item i = { T_FOLDER, name, size }; + items_.push_back(i); +} + +size_t +FileShareManifest::GetItemCount(Type t) const { + size_t count = 0; + for (size_t i=0; i<items_.size(); ++i) { + if (items_[i].type == t) + ++count; + } + return count; +} + +/////////////////////////////////////////////////////////////////////////////// +// FileShareSession +/////////////////////////////////////////////////////////////////////////////// + +FileShareSession::FileShareSession(cricket::Session* session, const std::string &user_agent) + : session_(session), state_(FS_NONE), + is_closed_(false), + is_sender_(false), manifest_(NULL), pool_(this), http_client_(NULL), + http_server_(NULL), + transfer_connection_id_(talk_base::HTTP_INVALID_CONNECTION_ID), + counter_(NULL), item_transferring_(0), bytes_transferred_(0), + local_cancel_(false), local_listener_(NULL), remote_listener_(NULL), + next_channel_id_(1), user_agent_(user_agent) { + session_->SignalState.connect(this, &FileShareSession::OnSessionState); + session_->SignalInfoMessage.connect(this, + &FileShareSession::OnSessionInfoMessage); + session_->SignalChannelGone.connect(this, + &FileShareSession::OnSessionChannelGone); +} + +FileShareSession::~FileShareSession() { + ASSERT(FS_NONE != state_); + // If we haven't closed, do cleanup now. + if (!IsClosed()) { + if (!IsComplete()) { + state_ = FS_FAILURE; + } + DoClose(true); + } + if (session_) { + // Make sure we don't get future state changes on this session. + session_->SignalState.disconnect(this); + session_->SignalInfoMessage.disconnect(this); + session_ = NULL; + } + + for (TransactionList::const_iterator trans_it = transactions_.begin(); + trans_it != transactions_.end(); ++trans_it) { + (*trans_it)->response()->set_error(talk_base::HC_NOT_FOUND); + http_server_->Respond(*trans_it); + } + + delete http_client_; + delete http_server_; + delete manifest_; + delete local_listener_; + delete remote_listener_; +} + +bool +FileShareSession::IsComplete() const { + return (state_ >= FS_COMPLETE); +} + +bool +FileShareSession::IsClosed() const { + return is_closed_; +} + +FileShareState +FileShareSession::state() const { + return state_; +} + +bool +FileShareSession::is_sender() const { + ASSERT(FS_NONE != state_); + return is_sender_; +} + +const buzz::Jid& +FileShareSession::jid() const { + ASSERT(FS_NONE != state_); + return jid_; +} + +const FileShareManifest* +FileShareSession::manifest() const { + ASSERT(FS_NONE != state_); + return manifest_; +} + +const std::string& +FileShareSession::local_folder() const { + ASSERT(!local_folder_.empty()); + return local_folder_; +} + +void +FileShareSession::Share(const buzz::Jid& jid, FileShareManifest* manifest) { + ASSERT(FS_NONE == state_); + ASSERT(NULL != session_); + + http_server_ = new talk_base::HttpServer; + http_server_->SignalHttpRequest.connect(this, + &FileShareSession::OnHttpRequest); + http_server_->SignalHttpRequestComplete.connect(this, + &FileShareSession::OnHttpRequestComplete); + http_server_->SignalConnectionClosed.connect(this, + &FileShareSession::OnHttpConnectionClosed); + + FileShareDescription* desc = new FileShareDescription; + desc->supports_http = true; + desc->manifest = *manifest; + GenerateTemporaryPrefix(&desc->source_path); + GenerateTemporaryPrefix(&desc->preview_path); + session_->Initiate(jid.Str(), NULL, desc); + + delete manifest; +} + +void +FileShareSession::Accept() { + ASSERT(FS_OFFER == state_); + ASSERT(NULL != session_); + ASSERT(NULL != manifest_); + + ASSERT(!http_client_); + ASSERT(item_transferring_ == 0); + http_client_ = new talk_base::HttpClient(user_agent_, + &pool_); + http_client_->SignalHttpClientComplete.connect(this, + &FileShareSession::OnHttpClientComplete); + http_client_->SignalHttpClientClosed.connect(this, + &FileShareSession::OnHttpClientClosed); + + // The receiver now has a need for the http_server_, when previewing already + // downloaded content. + http_server_ = new talk_base::HttpServer; + http_server_->SignalHttpRequest.connect(this, + &FileShareSession::OnHttpRequest); + http_server_->SignalHttpRequestComplete.connect(this, + &FileShareSession::OnHttpRequestComplete); + http_server_->SignalConnectionClosed.connect(this, + &FileShareSession::OnHttpConnectionClosed); + + FileShareDescription* desc = new FileShareDescription; + desc->supports_http = description()->supports_http; + session_->Accept(desc); + + SetState(FS_TRANSFER, false); + NextDownload(); +} + +void +FileShareSession::Decline() { + ASSERT(FS_OFFER == state_); + ASSERT(NULL != session_); + local_cancel_ = true; + session_->Reject(); +} + +void +FileShareSession::Cancel() { + ASSERT(!IsComplete()); + ASSERT(NULL != session_); + local_cancel_ = true; + session_->Terminate(); +} + +bool +FileShareSession::GetItemUrl(size_t index, std::string* url) { + return GetItemBaseUrl(index, false, url); +} + +bool FileShareSession::GetImagePreviewUrl(size_t index, size_t width, + size_t height, std::string* url) { + if (!GetItemBaseUrl(index, true, url)) + return false; + + if (FileShareManifest::T_IMAGE != manifest_->item(index).type) { + ASSERT(false); + return false; + } + + char query[256]; + talk_base::sprintfn(query, ARRAY_SIZE(query), "?width=%u&height=%u", + width, height); + url->append(query); + return true; +} + +void FileShareSession::ResampleComplete(talk_base::StreamInterface *i, talk_base::HttpTransaction *trans, bool success) { + bool found = false; + for (TransactionList::const_iterator trans_it = transactions_.begin(); + trans_it != transactions_.end(); ++trans_it) { + if (*trans_it == trans) { + found = true; + break; + } + } + + if (!found) + return; + + transactions_.remove(trans); + + if (success) { + trans->response()->set_success(MIME_OCTET_STREAM, i); + http_server_->Respond(trans); + + } + trans->response()->set_error(talk_base::HC_NOT_FOUND); + http_server_->Respond(trans); +} + +bool FileShareSession::GetProgress(size_t& bytes) const { + bool known = true; + bytes = bytes_transferred_; + if (counter_) { + size_t current_size = manifest_->item(item_transferring_).size; + size_t current_pos = counter_->GetByteCount(); + if (current_size == FileShareManifest::SIZE_UNKNOWN) { + known = false; + } else if (current_pos > current_size) { + // Don't allow the size of a 'known' item to be reported as larger than + // it claimed to be. + ASSERT(false); + current_pos = current_size; + } + bytes += current_pos; + } + return known; +} + +bool FileShareSession::GetTotalSize(size_t& bytes) const { + bool known = true; + bytes = 0; + for (size_t i=0; i<manifest_->size(); ++i) { + if (manifest_->item(i).size == FileShareManifest::SIZE_UNKNOWN) { + // We make files of unknown length worth a single byte. + known = false; + bytes += 1; + } else { + bytes += manifest_->item(i).size; + } + } + return known; +} + +bool FileShareSession::GetCurrentItemName(std::string* name) { + if (FS_TRANSFER != state_) { + name->clear(); + return false; + } + ASSERT(item_transferring_ < manifest_->size()); + if (transfer_name_.empty()) { + const FileShareManifest::Item& item = manifest_->item(item_transferring_); + *name = item.name; + } else { + *name = transfer_name_; + } + return !name->empty(); +} + +// StreamPool Implementation + +talk_base::StreamInterface* FileShareSession::RequestConnectedStream( + const talk_base::SocketAddress& remote, int* err) { + ASSERT(remote.IPAsString() == jid_.Str()); + ASSERT(!IsClosed()); + ASSERT(NULL != session_); + if (!session_) { + if (err) + *err = -1; + return NULL; + } + + char channel_name[64]; + talk_base::sprintfn(channel_name, ARRAY_SIZE(channel_name), + "private-%u", next_channel_id_++); + if (err) + *err = 0; + return CreateChannel(channel_name); +} + +void FileShareSession::ReturnConnectedStream( + talk_base::StreamInterface* stream) { + talk_base::Thread::Current()->Dispose(stream); +} + +// MessageHandler Implementation + +void FileShareSession::OnMessage(talk_base::Message* msg) { + if (MSG_PROXY_WAIT == msg->message_id) { + LOG_F(LS_INFO) << "MSG_PROXY_WAIT"; + if (proxies_.empty() && IsComplete() && !IsClosed()) { + DoClose(true); + } + } +} + +// Session Signals + +void FileShareSession::OnSessionState(cricket::Session* session, + cricket::Session::State state) { + // Once we are complete, state changes are meaningless. + if (!IsComplete()) { + switch (state) { + case cricket::Session::STATE_SENTINITIATE: + case cricket::Session::STATE_RECEIVEDINITIATE: + OnInitiate(); + break; + case cricket::Session::STATE_SENTACCEPT: + case cricket::Session::STATE_RECEIVEDACCEPT: + case cricket::Session::STATE_INPROGRESS: + SetState(FS_TRANSFER, false); + break; + case cricket::Session::STATE_SENTREJECT: + case cricket::Session::STATE_SENTTERMINATE: + case cricket::Session::STATE_DEINIT: + if (local_cancel_) { + SetState(FS_LOCAL_CANCEL, false); + } else { + SetState(FS_REMOTE_CANCEL, false); + } + break; + case cricket::Session::STATE_RECEIVEDTERMINATE: + if (is_sender()) { + // If we are the sender, and the receiver downloaded the correct number + // of bytes, then we assume the transfer was successful. We've + // introduced support for explicit completion notification + // (QN_SHARE_COMPLETE), but it's not mandatory at this point, so we need + // this as a fallback. + size_t total_bytes; + GetTotalSize(total_bytes); + if (bytes_transferred_ >= total_bytes) { + SetState(FS_COMPLETE, false); + break; + } + } + // Fall through + case cricket::Session::STATE_RECEIVEDREJECT: + SetState(FS_REMOTE_CANCEL, false); + break; + case cricket::Session::STATE_INIT: + case cricket::Session::STATE_SENTMODIFY: + case cricket::Session::STATE_RECEIVEDMODIFY: + case cricket::Session::STATE_SENTREDIRECT: + default: + // These states should not occur. + ASSERT(false); + break; + } + } + + if (state == cricket::Session::STATE_DEINIT) { + if (!IsClosed()) { + DoClose(false); + } + session_ = NULL; + } +} + +void FileShareSession::OnSessionInfoMessage(cricket::Session* session, + const cricket::Session::XmlElements& els) { + if (IsClosed()) + return; + ASSERT(NULL != session_); + for (size_t i=0; i<els.size(); ++i) { + if (is_sender() && (els[i]->Name() == QN_SHARE_CHANNEL)) { + if (els[i]->HasAttr(buzz::QN_NAME)) { + cricket::PseudoTcpChannel* channel = + new cricket::PseudoTcpChannel(talk_base::Thread::Current(), session_); + VERIFY(channel->Connect(els[i]->Attr(buzz::QN_NAME))); + talk_base::StreamInterface* stream = channel->GetStream(); + http_server_->HandleConnection(stream); + } + } else if (is_sender() && (els[i]->Name() == QN_SHARE_COMPLETE)) { + // Normal file transfer has completed, but receiver may still be getting + // previews. + if (!IsComplete()) { + SetState(FS_COMPLETE, true); + } + } else { + LOG(LS_WARNING) << "Unknown FileShareSession info message: " + << els[i]->Name().Merged(); + } + } +} + +void FileShareSession::OnSessionChannelGone(cricket::Session* session, + const std::string& name) { + LOG_F(LS_WARNING) << "(" << name << ")"; + ASSERT(session == session_); + if (cricket::TransportChannel* channel = session->GetChannel(name)) { + session->DestroyChannel(channel); + } +} + +// HttpClient Signals + +void FileShareSession::OnHttpClientComplete(talk_base::HttpClient* http, + int err) { + LOG_F(LS_INFO) << "(" << err << ", " << http->response().scode << ")"; + ASSERT(http == http_client_); + ASSERT(NULL != session_); + + transfer_name_.clear(); + counter_ = NULL; // counter_ is deleted by HttpClient + http->response().document.reset(); + bool success = (err == 0) && (http->response().scode == talk_base::HC_OK); + + const FileShareManifest::Item& item = manifest_->item(item_transferring_); + talk_base::Pathname local_name; + local_name.SetFilename(item.name); + local_name.SetFolder(local_folder_); + + if (local_name.pathname() != transfer_path_) { + const bool is_folder = (item.type == FileShareManifest::T_FOLDER); + if (success && !talk_base::CreateUniqueFile(local_name, false)) { + LOG(LS_ERROR) << "Couldn't rename downloaded file: " + << local_name.pathname(); + success = false; + } + + talk_base::Pathname temp_name(transfer_path_); + if (is_folder) { + // The folder we want is a subdirectory of the transfer_path_. + temp_name.AppendFolder(item.name); + } + + if (!talk_base::Filesystem::MoveFile(temp_name.pathname(), local_name.pathname())) { + success = false; + LOG(LS_ERROR) << "Couldn't move downloaded file from '" + << temp_name.pathname() << "' to '" + << local_name.pathname(); + } + + if (success && is_folder) { + talk_base::Filesystem::DeleteFile(transfer_path_); + } + } + + if (!success) { + if (!talk_base::Filesystem::DeleteFile(transfer_path_)) { + LOG(LS_ERROR) << "Couldn't delete downloaded file: " << transfer_path_; + } + if (!IsComplete()) { + SetState(FS_FAILURE, false); + } + return; + } + + // We may have skipped over some items (if they are directories, or otherwise + // failed. resize ensures that we populate the skipped entries with empty + // strings. + stored_location_.resize(item_transferring_ + 1); + stored_location_[item_transferring_] = local_name.pathname(); + + // bytes_transferred_ represents the size of items which have completely + // transferred, and is added to the progress of the currently transferring + // items. + if (item.size == FileShareManifest::SIZE_UNKNOWN) { + bytes_transferred_ += 1; + } else { + bytes_transferred_ += item.size; + } + item_transferring_ += 1; + NextDownload(); +} + +void FileShareSession::OnHttpClientClosed(talk_base::HttpClient* http, + int err) { + LOG_F(LS_INFO) << "(" << err << ")"; +} + +// HttpServer Signals + +void FileShareSession::OnHttpRequest(talk_base::HttpServer* server, + talk_base::HttpTransaction* transaction) { + LOG_F(LS_INFO) << "(" << transaction->request()->path << ")"; + ASSERT(server == http_server_); + + std::string path, query; + size_t query_start = transaction->request()->path.find('?'); + if (query_start != std::string::npos) { + path = transaction->request()->path.substr(0, query_start); + query = transaction->request()->path.substr(query_start + 1); + } else { + path = transaction->request()->path; + } + + talk_base::Pathname remote_name(path); + bool preview = (preview_path_ == remote_name.folder()); + bool original = (source_path_ == remote_name.folder()); + + std::string requested_file(remote_name.filename()); + talk_base::transform(requested_file, requested_file.size(), requested_file, + talk_base::url_decode); + + size_t item_index; + const FileShareManifest::Item* item = NULL; + if (preview || original) { + for (size_t i=0; i<manifest_->size(); ++i) { + LOG(LS_INFO) << "++++ " << manifest_->item(i).name + " " << requested_file; + if (manifest_->item(i).name == requested_file) { + item_index = i; + item = &manifest_->item(item_index); + break; + } + } + } + + talk_base::StreamInterface* stream = NULL; + std::string mime_type(MIME_OCTET_STREAM); + + if (!item) { + // Fall through + } else if (preview) { + // Only image previews allowed + unsigned int width = 0, height = 0; + if ((item->type == FileShareManifest::T_IMAGE) + && !query.empty() + && (sscanf(query.c_str(), "width=%u&height=%u", + &width, &height) == 2)) { + width = talk_base::_max<unsigned int>(1, talk_base::_min(width, kMaxPreviewSize)); + height = talk_base::_max<unsigned int>(1, talk_base::_min(height, kMaxPreviewSize)); + std::string pathname; + if (is_sender_) { + talk_base::Pathname local_path; + local_path.SetFolder(local_folder_); + local_path.SetFilename(item->name); + pathname = local_path.pathname(); + } else if ((item_index < stored_location_.size()) + && !stored_location_[item_index].empty()) { + pathname = stored_location_[item_index]; + } + if (!pathname.empty()) { + transactions_.push_back(transaction); + SignalResampleImage(pathname, width, height, transaction); + } + } + } else if (item->type == FileShareManifest::T_FOLDER) { + talk_base::Pathname local_path; + local_path.SetFolder(local_folder_); + local_path.AppendFolder(item->name); + talk_base::TarStream* tar = new talk_base::TarStream; + VERIFY(tar->AddFilter(local_path.folder_name())); + if (tar->Open(local_path.parent_folder(), true)) { + stream = tar; + tar->SignalNextEntry.connect(this, &FileShareSession::OnNextEntry); + mime_type = "application/x-tar"; + } else { + delete tar; + } + } else if ((item->type == FileShareManifest::T_FILE) + || (item->type == FileShareManifest::T_IMAGE)) { + talk_base::Pathname local_path; + local_path.SetFolder(local_folder_); + local_path.SetFilename(item->name); + talk_base::FileStream* file = new talk_base::FileStream; + LOG(LS_INFO) << "opening file " << local_path.pathname(); + if (file->Open(local_path.pathname().c_str(), "rb")) { + LOG(LS_INFO) << "File opened"; + stream = file; + } else { + delete file; + } + } + + if (!stream) { + transaction->response()->set_error(talk_base::HC_NOT_FOUND); + } else if (original) { + // We should never have more than one original request pending at a time + ASSERT(NULL == counter_); + StreamCounter* counter = new StreamCounter(stream); + counter->SignalUpdateByteCount.connect(this, &FileShareSession::OnUpdateBytes); + transaction->response()->set_success(mime_type.c_str(), counter); + transfer_connection_id_ = transaction->connection_id(); + item_transferring_ = item_index; + counter_ = counter; + } else { + // Note: in the preview case, we don't set counter_, so the transferred + // bytes won't be shown as progress, and won't trigger a state change. + transaction->response()->set_success(mime_type.c_str(), stream); + } + + LOG_F(LS_INFO) << "Result: " << transaction->response()->scode; + http_server_->Respond(transaction); +} + +void FileShareSession::OnHttpRequestComplete(talk_base::HttpServer* server, + talk_base::HttpTransaction* transaction, int err) { + LOG_F(LS_INFO) << "(" << transaction->request()->path << ", " << err << ")"; + ASSERT(server == http_server_); + + // We only care about transferred originals + if (transfer_connection_id_ != transaction->connection_id()) + return; + + ASSERT(item_transferring_ < manifest_->size()); + ASSERT(NULL != counter_); + + transfer_connection_id_ = talk_base::HTTP_INVALID_CONNECTION_ID; + transfer_name_.clear(); + counter_ = NULL; + + if (err == 0) { + const FileShareManifest::Item& item = manifest_->item(item_transferring_); + if (item.size == FileShareManifest::SIZE_UNKNOWN) { + bytes_transferred_ += 1; + } else { + bytes_transferred_ += item.size; + } + } +} + +void FileShareSession::OnHttpConnectionClosed(talk_base::HttpServer* server, + int err, talk_base::StreamInterface* stream) { + LOG_F(LS_INFO) << "(" << err << ")"; + talk_base::Thread::Current()->Dispose(stream); +} + +// TarStream Signals + +void FileShareSession::OnNextEntry(const std::string& name, size_t size) { + LOG_F(LS_VERBOSE) << "(" << name << ", " << size << ")"; + transfer_name_ = name; + SignalNextFile(this); +} + +// Socket Signals + +void FileShareSession::OnProxyAccept(talk_base::AsyncSocket* socket) { + bool is_remote; + if (socket == remote_listener_) { + is_remote = true; + ASSERT(NULL != session_); + } else if (socket == local_listener_) { + is_remote = false; + } else { + ASSERT(false); + return; + } + + while (talk_base::AsyncSocket* accepted = + static_cast<talk_base::AsyncSocket*>(socket->Accept(NULL))) { + + // Check if connection is from localhost. + if (accepted->GetRemoteAddress().ip() != 0x7F000001) { + delete accepted; + continue; + } + + LOG_F(LS_VERBOSE) << (is_remote ? "[remote]" : "[local]"); + + if (is_remote) { + char channel_name[64]; + talk_base::sprintfn(channel_name, ARRAY_SIZE(channel_name), + "proxy-%u", next_channel_id_++); + talk_base::StreamInterface* remote = + (NULL != session_) ? CreateChannel(channel_name) : NULL; + if (!remote) { + LOG_F(LS_WARNING) << "CreateChannel(" << channel_name << ") failed"; + delete accepted; + continue; + } + + talk_base::StreamInterface* local = new talk_base::SocketStream(accepted); + StreamRelay* proxy = new StreamRelay(local, remote, 64 * 1024); + proxy->SignalClosed.connect(this, &FileShareSession::OnProxyClosed); + proxies_.push_back(proxy); + proxy->Circulate(); + talk_base::Thread::Current()->Clear(this, MSG_PROXY_WAIT); + } else { + talk_base::StreamInterface* local = new talk_base::SocketStream(accepted); + http_server_->HandleConnection(local); + } + } +} + +void FileShareSession::OnProxyClosed(StreamRelay* proxy, int error) { + ProxyList::iterator it = std::find(proxies_.begin(), proxies_.end(), proxy); + if (it == proxies_.end()) { + ASSERT(false); + return; + } + + LOG_F(LS_VERBOSE) << "(" << error << ")"; + + proxies_.erase(it); + talk_base::Thread::Current()->Dispose(proxy); + + if (proxies_.empty() && IsComplete() && !IsClosed()) { + talk_base::Thread::Current()->PostDelayed(kProxyWait, this, MSG_PROXY_WAIT); + } +} + + +void FileShareSession::OnUpdateBytes(size_t count) { + SignalUpdateProgress(this); +} + +// Internal Helpers + +void FileShareSession::GenerateTemporaryPrefix(std::string* prefix) { + std::string data = cricket::CreateRandomString(32); + ASSERT(NULL != prefix); + prefix->assign("/temporary/"); + prefix->append(talk_base::MD5(data)); + prefix->append("/"); +} + +void FileShareSession::GetItemNetworkPath(size_t index, bool preview, + std::string* path) { + ASSERT(index < manifest_->size()); + ASSERT(NULL != path); + + // preview_path_ and source_path_ are url path segments, which are composed + // with the address of the localhost p2p proxy to provide a url which IE can + // use. + + std::string ue_name; + const std::string& name = manifest_->item(index).name; + talk_base::transform(ue_name, name.length() * 3, name, talk_base::url_encode); + + talk_base::Pathname pathname; + pathname.SetFolder(preview ? preview_path_ : source_path_); + pathname.SetFilename(ue_name); + *path = pathname.pathname(); +} + +bool FileShareSession::GetItemBaseUrl(size_t index, bool preview, + std::string* url) { + // This function composes a URL to the referenced item. It may be a local + // file url (file:///...), or a remote peer url relayed through localhost + // (http://...) + + ASSERT(NULL != url); + if (index >= manifest_->size()) { + ASSERT(false); + return false; + } + + const FileShareManifest::Item& item = manifest_->item(index); + + bool is_remote; + if (is_sender_) { + if (!preview) { + talk_base::Pathname path(local_folder_); + path.SetFilename(item.name); + *url = path.url(); + return true; + } + is_remote = false; + } else { + if ((index < stored_location_.size()) && !stored_location_[index].empty()) { + if (!preview) { + *url = talk_base::Pathname(stored_location_[index]).url(); + return true; + } + // Note: Using the local downloaded files as a source for previews is + // desireable, because it means that previews can be regenerated if IE's + // cached versions get flushed for some reason, and the remote side is + // not available. However, it has the downside that IE _must_ regenerate + // the preview locally, which takes time, memory and CPU. Eventually, + // we will unify the remote and local cached copy through some sort of + // smart http proxying. In the meantime, always use the remote url, to + // eliminate the annoying transition from remote to local caching. + //is_remote = false; + is_remote = true; + } else { + is_remote = true; + } + } + + talk_base::SocketAddress address; + if (!GetProxyAddress(address, is_remote)) + return false; + + std::string path; + GetItemNetworkPath(index, preview, &path); + talk_base::Url<char> make_url(path.c_str(), + address.IPAsString().c_str(), + address.port()); + *url = make_url.url(); + return true; +} + +bool FileShareSession::GetProxyAddress(talk_base::SocketAddress& address, + bool is_remote) { + talk_base::AsyncSocket*& proxy_listener = + is_remote ? remote_listener_ : local_listener_; + + if (!proxy_listener) { + talk_base::AsyncSocket* listener = + talk_base::Thread::Current()->socketserver() + ->CreateAsyncSocket(SOCK_STREAM); + if (!listener) + return false; + + talk_base::SocketAddress bind_address("127.0.0.1", 0); + + if ((listener->Bind(bind_address) != 0) + || (listener->Listen(5) != 0)) { + delete listener; + return false; + } + + LOG(LS_INFO) << "Proxy listener available @ " + << listener->GetLocalAddress().ToString(); + + listener->SignalReadEvent.connect(this, &FileShareSession::OnProxyAccept); + proxy_listener = listener; + } + + if (proxy_listener->GetState() == talk_base::Socket::CS_CLOSED) { + if (is_remote) { + address = remote_listener_address_; + return true; + } + return false; + } + + address = proxy_listener->GetLocalAddress(); + return !address.IsAny(); +} + +talk_base::StreamInterface* FileShareSession::CreateChannel( + const std::string& channel_name) { + ASSERT(NULL != session_); + + // Send a heads-up for our new channel + cricket::Session::XmlElements els; + buzz::XmlElement* xel_channel = new buzz::XmlElement(QN_SHARE_CHANNEL, true); + xel_channel->AddAttr(buzz::QN_NAME, channel_name); + els.push_back(xel_channel); + session_->SendInfoMessage(els); + + cricket::PseudoTcpChannel* channel = + new cricket::PseudoTcpChannel(talk_base::Thread::Current(), session_); + VERIFY(channel->Connect(channel_name)); + return channel->GetStream(); +} + +void FileShareSession::SetState(FileShareState state, bool prevent_close) { + if (state == state_) + return; + + if (IsComplete()) { + // Entering a completion state is permanent. + ASSERT(false); + return; + } + + state_ = state; + if (IsComplete()) { + // All completion states auto-close except for FS_COMPLETE + bool close = (state_ > FS_COMPLETE) || !prevent_close; + if (close) { + DoClose(true); + } + } + + SignalState(state_); +} + +void FileShareSession::OnInitiate() { + // Cache the variables we will need, in case session_ goes away + is_sender_ = session_->initiator(); + jid_ = buzz::Jid(session_->remote_name()); + manifest_ = new FileShareManifest(description()->manifest); + source_path_ = description()->source_path; + preview_path_ = description()->preview_path; + + if (local_folder_.empty()) { + LOG(LS_ERROR) << "FileShareSession - no local folder, using temp"; + talk_base::Pathname temp_folder; + talk_base::Filesystem::GetTemporaryFolder(temp_folder, true, NULL); + local_folder_ = temp_folder.pathname(); + } + LOG(LS_INFO) << session_->state(); + SetState(FS_OFFER, false); +} + +void FileShareSession::NextDownload() { + if (FS_TRANSFER != state_) + return; + + if (item_transferring_ >= manifest_->size()) { + // Notify the other side that transfer has completed + cricket::Session::XmlElements els; + els.push_back(new buzz::XmlElement(QN_SHARE_COMPLETE, true)); + session_->SendInfoMessage(els); + SetState(FS_COMPLETE, !proxies_.empty()); + return; + } + + const FileShareManifest::Item& item = manifest_->item(item_transferring_); + if ((item.type != FileShareManifest::T_FILE) + && (item.type != FileShareManifest::T_IMAGE) + && (item.type != FileShareManifest::T_FOLDER)) { + item_transferring_ += 1; + NextDownload(); + return; + } + + const bool is_folder = (item.type == FileShareManifest::T_FOLDER); + talk_base::Pathname temp_name; + temp_name.SetFilename(item.name); + if (!talk_base::CreateUniqueFile(temp_name, !is_folder)) { + SetState(FS_FAILURE, false); + return; + } + + talk_base::StreamInterface* stream = NULL; + if (is_folder) { + // Convert unique filename into unique foldername + temp_name.AppendFolder(temp_name.filename()); + temp_name.SetFilename(""); + talk_base::TarStream* tar = new talk_base::TarStream; + // Note: the 'target' directory will be a subdirectory of the transfer_path_ + talk_base::Pathname target; + target.SetFolder(item.name); + tar->AddFilter(target.pathname()); + if (!tar->Open(temp_name.pathname(), false)) { + delete tar; + SetState(FS_FAILURE, false); + return; + } + stream = tar; + tar->SignalNextEntry.connect(this, &FileShareSession::OnNextEntry); + } else { + talk_base::FileStream* file = new talk_base::FileStream; + if (!file->Open(temp_name.pathname().c_str(), "wb")) { + delete file; + talk_base::Filesystem::DeleteFile(temp_name); + SetState(FS_FAILURE, false); + return; + } + stream = file; + } + + ASSERT(NULL != stream); + transfer_path_ = temp_name.pathname(); + + std::string remote_path; + GetItemNetworkPath(item_transferring_, false, &remote_path); + + StreamCounter* counter = new StreamCounter(stream); + counter->SignalUpdateByteCount.connect(this, &FileShareSession::OnUpdateBytes); + counter_ = counter; + + http_client_->reset(); + http_client_->set_server(talk_base::SocketAddress(jid_.Str(), 0, false)); + http_client_->request().verb = talk_base::HV_GET; + http_client_->request().path = remote_path; + http_client_->response().document.reset(counter); + http_client_->start(); +} + + +const FileShareSession::FileShareDescription* FileShareSession::description() +const { + ASSERT(NULL != session_); + const cricket::SessionDescription* desc = + session_->initiator() ? session_->description() + : session_->remote_description(); + return static_cast<const FileShareDescription*>(desc); +} + +void FileShareSession::DoClose(bool terminate) { + ASSERT(!is_closed_); + ASSERT(IsComplete()); + ASSERT(NULL != session_); + + is_closed_ = true; + + if (http_client_) { + http_client_->reset(); + } + if (http_server_) { + http_server_->CloseAll(true); + // Currently, CloseAll doesn't result in OnHttpRequestComplete callback. + // If we change that, the following resetting won't be necessary. + transfer_connection_id_ = talk_base::HTTP_INVALID_CONNECTION_ID; + transfer_name_.clear(); + counter_ = NULL; + } + // 'reset' and 'CloseAll' cause counter_ to clear. + ASSERT(NULL == counter_); + + if (remote_listener_) { + // Cache the address for the remote_listener_, so that we can continue to + // present a consistent URL for remote previews, which is necessary for IE + // to continue using its cached copy. + remote_listener_address_ = remote_listener_->GetLocalAddress(); + remote_listener_->Close(); + LOG(LS_INFO) << "Proxy listener closed @ " + << remote_listener_address_.ToString(); + } + + if (terminate) { + session_->Terminate(); + } +} + +////////////////////////////// +/// FileShareSessionClient // +//////////////////////////// + +void FileShareSessionClient::OnSessionCreate(cricket::Session* session, + bool received_initiate) { + VERIFY(sessions_.insert(session).second); + if (received_initiate) { + FileShareSession* share = new FileShareSession(session, user_agent_); + SignalFileShareSessionCreate(share); + UNUSED(share); // FileShareSession registers itself with the UI + } +} + +void FileShareSessionClient::OnSessionDestroy(cricket::Session* session) { + VERIFY(1 == sessions_.erase(session)); +} + +const cricket::SessionDescription* FileShareSessionClient::CreateSessionDescription( + const buzz::XmlElement* element) { + FileShareSession::FileShareDescription* share_desc = + new FileShareSession::FileShareDescription; + + if (element->Name() != QN_SHARE_DESCRIPTION) + return share_desc; + + const buzz::XmlElement* manifest = element->FirstNamed(QN_SHARE_MANIFEST); + const buzz::XmlElement* protocol = element->FirstNamed(QN_SHARE_PROTOCOL); + + if (!manifest || !protocol) + return share_desc; + + for (const buzz::XmlElement* item = manifest->FirstElement(); + item != NULL; item = item->NextElement()) { + bool is_folder; + if (item->Name() == QN_SHARE_FOLDER) { + is_folder = true; + } else if (item->Name() == QN_SHARE_FILE) { + is_folder = false; + } else { + continue; + } + std::string name; + if (const buzz::XmlElement* el_name = item->FirstNamed(QN_SHARE_NAME)) { + name = el_name->BodyText(); + } + if (name.empty()) { + continue; + } + size_t size = FileShareManifest::SIZE_UNKNOWN; + if (item->HasAttr(QN_SIZE)) { + size = strtoul(item->Attr(QN_SIZE).c_str(), NULL, 10); + } + if (is_folder) { + share_desc->manifest.AddFolder(name, size); + } else { + // Check if there is a valid image description for this file. + if (const buzz::XmlElement* image = item->FirstNamed(QN_SHARE_IMAGE)) { + if (image->HasAttr(QN_WIDTH) && image->HasAttr(QN_HEIGHT)) { + size_t width = strtoul(image->Attr(QN_WIDTH).c_str(), NULL, 10); + size_t height = strtoul(image->Attr(QN_HEIGHT).c_str(), NULL, 10); + if (AllowedImageDimensions(width, height)) { + share_desc->manifest.AddImage(name, size, width, height); + continue; + } + } + } + share_desc->manifest.AddFile(name, size); + } + } + + if (const buzz::XmlElement* http = protocol->FirstNamed(QN_SHARE_HTTP)) { + share_desc->supports_http = true; + for (const buzz::XmlElement* url = http->FirstNamed(QN_SHARE_URL); + url != NULL; url = url->NextNamed(QN_SHARE_URL)) { + if (url->Attr(buzz::QN_NAME) == kHttpSourcePath) { + share_desc->source_path = url->BodyText(); + } else if (url->Attr(buzz::QN_NAME) == kHttpPreviewPath) { + share_desc->preview_path = url->BodyText(); + } + } + } + + return share_desc; +} + +buzz::XmlElement* FileShareSessionClient::TranslateSessionDescription( + const cricket::SessionDescription* description) { + + const FileShareSession::FileShareDescription* share_desc = + static_cast<const FileShareSession::FileShareDescription*>(description); + + scoped_ptr<buzz::XmlElement> el(new buzz::XmlElement(QN_SHARE_DESCRIPTION, + true)); + + const FileShareManifest& manifest = share_desc->manifest; + el->AddElement(new buzz::XmlElement(QN_SHARE_MANIFEST)); + for (size_t i=0; i<manifest.size(); ++i) { + const FileShareManifest::Item& item = manifest.item(i); + buzz::QName qname; + if (item.type == FileShareManifest::T_FOLDER) { + qname = QN_SHARE_FOLDER; + } else if ((item.type == FileShareManifest::T_FILE) + || (item.type == FileShareManifest::T_IMAGE)) { + qname = QN_SHARE_FILE; + } else { + ASSERT(false); + continue; + } + el->AddElement(new buzz::XmlElement(qname), 1); + if (item.size != FileShareManifest::SIZE_UNKNOWN) { + char buffer[256]; + talk_base::sprintfn(buffer, sizeof(buffer), "%lu", item.size); + el->AddAttr(QN_SIZE, buffer, 2); + } + buzz::XmlElement* el_name = new buzz::XmlElement(QN_SHARE_NAME); + el_name->SetBodyText(item.name); + el->AddElement(el_name, 2); + if ((item.type == FileShareManifest::T_IMAGE) + && AllowedImageDimensions(item.width, item.height)) { + el->AddElement(new buzz::XmlElement(QN_SHARE_IMAGE), 2); + char buffer[256]; + talk_base::sprintfn(buffer, sizeof(buffer), "%lu", item.width); + el->AddAttr(QN_WIDTH, buffer, 3); + talk_base::sprintfn(buffer, sizeof(buffer), "%lu", item.height); + el->AddAttr(QN_HEIGHT, buffer, 3); + } + } + + el->AddElement(new buzz::XmlElement(QN_SHARE_PROTOCOL)); + if (share_desc->supports_http) { + el->AddElement(new buzz::XmlElement(QN_SHARE_HTTP), 1); + if (!share_desc->source_path.empty()) { + buzz::XmlElement* url = new buzz::XmlElement(QN_SHARE_URL); + url->SetAttr(buzz::QN_NAME, kHttpSourcePath); + url->SetBodyText(share_desc->source_path); + el->AddElement(url, 2); + } + if (!share_desc->preview_path.empty()) { + buzz::XmlElement* url = new buzz::XmlElement(QN_SHARE_URL); + url->SetAttr(buzz::QN_NAME, kHttpPreviewPath); + url->SetBodyText(share_desc->preview_path); + el->AddElement(url, 2); + } + } + + return el.release(); +} + +FileShareSession *FileShareSessionClient::CreateFileShareSession() { + cricket::Session* session = sm_->CreateSession(jid_.Str(), + NS_GOOGLE_SHARE); + FileShareSession* share = new FileShareSession(session, user_agent_); + SignalFileShareSessionCreate(share); + return share; +} + + +} // namespace cricket |