diff options
Diffstat (limited to 'Plugins/jingle/libjingle/talk/p2p/client')
6 files changed, 1371 insertions, 0 deletions
diff --git a/Plugins/jingle/libjingle/talk/p2p/client/basicportallocator.cc b/Plugins/jingle/libjingle/talk/p2p/client/basicportallocator.cc new file mode 100644 index 0000000..b7960f4 --- /dev/null +++ b/Plugins/jingle/libjingle/talk/p2p/client/basicportallocator.cc @@ -0,0 +1,690 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if defined(_MSC_VER) && _MSC_VER < 1300 +#pragma warning(disable:4786) +#endif + +#include "talk/base/common.h" +#include "talk/base/helpers.h" +#include "talk/base/host.h" +#include "talk/base/logging.h" +#include "talk/p2p/client/basicportallocator.h" +#include "talk/p2p/base/common.h" +#include "talk/p2p/base/port.h" +#include "talk/p2p/base/relayport.h" +#include "talk/p2p/base/stunport.h" +#include "talk/p2p/base/tcpport.h" +#include "talk/p2p/base/udpport.h" + +namespace { + +const uint32 MSG_CONFIG_START = 1; +const uint32 MSG_CONFIG_READY = 2; +const uint32 MSG_ALLOCATE = 3; +const uint32 MSG_ALLOCATION_PHASE = 4; +const uint32 MSG_SHAKE = 5; + +const uint32 ALLOCATE_DELAY = 250; +const uint32 ALLOCATION_STEP_DELAY = 1 * 1000; + +const int PHASE_UDP = 0; +const int PHASE_RELAY = 1; +const int PHASE_TCP = 2; +const int PHASE_SSLTCP = 3; +const int kNumPhases = 4; + +const float PREF_LOCAL_UDP = 1.0f; +const float PREF_LOCAL_STUN = 0.9f; +const float PREF_LOCAL_TCP = 0.8f; +const float PREF_RELAY = 0.5f; + +const float RELAY_PRIMARY_PREF_MODIFIER = 0.0f; // modifiers of the above constants +const float RELAY_BACKUP_PREF_MODIFIER = -0.2f; + + +// Returns the phase in which a given local candidate (or rather, the port that +// gave rise to that local candidate) would have been created. +int LocalCandidateToPhase(const cricket::Candidate& candidate) { + cricket::ProtocolType proto; + bool result = cricket::StringToProto(candidate.protocol().c_str(), proto); + if (result) { + if (candidate.type() == cricket::LOCAL_PORT_TYPE) { + switch (proto) { + case cricket::PROTO_UDP: return PHASE_UDP; + case cricket::PROTO_TCP: return PHASE_TCP; + default: assert(false); + } + } else if (candidate.type() == cricket::STUN_PORT_TYPE) { + return PHASE_UDP; + } else if (candidate.type() == cricket::RELAY_PORT_TYPE) { + switch (proto) { + case cricket::PROTO_UDP: return PHASE_RELAY; + case cricket::PROTO_TCP: return PHASE_TCP; + case cricket::PROTO_SSLTCP: return PHASE_SSLTCP; + default: assert(false); + } + } else { + assert(false); + } + } else { + assert(false); + } + return PHASE_UDP; // reached only with assert failure +} + +const int SHAKE_MIN_DELAY = 45 * 1000; // 45 seconds +const int SHAKE_MAX_DELAY = 90 * 1000; // 90 seconds + +int ShakeDelay() { + int range = SHAKE_MAX_DELAY - SHAKE_MIN_DELAY + 1; + return SHAKE_MIN_DELAY + cricket::CreateRandomId() % range; +} + +} + +namespace cricket { + +// Performs the allocation of ports, in a sequenced (timed) manner, for a given +// network and IP address. +class AllocationSequence : public talk_base::MessageHandler { +public: + AllocationSequence(BasicPortAllocatorSession* session, + talk_base::Network* network, + PortConfiguration* config); + ~AllocationSequence(); + + // Determines whether this sequence is operating on an equivalent network + // setup to the one given. + bool IsEquivalent(talk_base::Network* network); + + // Starts and stops the sequence. When started, it will continue allocating + // new ports on its own timed schedule. + void Start(); + void Stop(); + + // MessageHandler: + void OnMessage(talk_base::Message* msg); + + void EnableProtocol(ProtocolType proto); + bool ProtocolEnabled(ProtocolType proto) const; + +private: + BasicPortAllocatorSession* session_; + talk_base::Network* network_; + uint32 ip_; + PortConfiguration* config_; + bool running_; + int step_; + int step_of_phase_[kNumPhases]; + + typedef std::vector<ProtocolType> ProtocolList; + ProtocolList protocols_; + + void CreateUDPPorts(); + void CreateTCPPorts(); + void CreateStunPorts(); + void CreateRelayPorts(); +}; + + +// BasicPortAllocator + +BasicPortAllocator::BasicPortAllocator( + talk_base::NetworkManager* network_manager) + : network_manager_(network_manager), best_writable_phase_(-1), + stun_address_(NULL), relay_address_(NULL) { +} + +BasicPortAllocator::BasicPortAllocator( + talk_base::NetworkManager* network_manager, + talk_base::SocketAddress* stun_address, + talk_base::SocketAddress *relay_address) + : network_manager_(network_manager), best_writable_phase_(-1), + stun_address_(stun_address), relay_address_(relay_address) { +} + +BasicPortAllocator::~BasicPortAllocator() { +} + +int BasicPortAllocator::best_writable_phase() const { + // If we are configured with an HTTP proxy, the best bet is to use the relay + if ((best_writable_phase_ == -1) + && ((proxy().type == talk_base::PROXY_HTTPS) + || (proxy().type == talk_base::PROXY_UNKNOWN))) { + return PHASE_RELAY; + } + return best_writable_phase_; +} + +PortAllocatorSession *BasicPortAllocator::CreateSession( + const std::string &name, const std::string &session_type) { + return new BasicPortAllocatorSession(this, name, session_type, stun_address_, + relay_address_); +} + +void BasicPortAllocator::AddWritablePhase(int phase) { + if ((best_writable_phase_ == -1) || (phase < best_writable_phase_)) + best_writable_phase_ = phase; +} + +// BasicPortAllocatorSession + +BasicPortAllocatorSession::BasicPortAllocatorSession( + BasicPortAllocator *allocator, + const std::string &name, + const std::string &session_type) + : PortAllocatorSession(allocator->flags()), allocator_(allocator), + name_(name), network_thread_(NULL), session_type_(session_type), + allocation_started_(false), running_(false), stun_address_(NULL), + relay_address_(NULL) { +} + +BasicPortAllocatorSession::BasicPortAllocatorSession( + BasicPortAllocator *allocator, + const std::string &name, + const std::string &session_type, + talk_base::SocketAddress *stun_address, + talk_base::SocketAddress *relay_address) + : PortAllocatorSession(allocator->flags()), allocator_(allocator), + name_(name), session_type_(session_type), network_thread_(NULL), + allocation_started_(false), running_(false), stun_address_(stun_address), + relay_address_(relay_address) { +} + +BasicPortAllocatorSession::~BasicPortAllocatorSession() { + if (network_thread_ != NULL) + network_thread_->Clear(this); + + std::vector<PortData>::iterator it; + for (it = ports_.begin(); it != ports_.end(); it++) + delete it->port; + + for (uint32 i = 0; i < configs_.size(); ++i) + delete configs_[i]; + + for (uint32 i = 0; i < sequences_.size(); ++i) + delete sequences_[i]; +} + +void BasicPortAllocatorSession::GetInitialPorts() { + network_thread_ = talk_base::Thread::Current(); + + network_thread_->Post(this, MSG_CONFIG_START); + + if (flags() & PORTALLOCATOR_ENABLE_SHAKER) + network_thread_->PostDelayed(ShakeDelay(), this, MSG_SHAKE); +} + +void BasicPortAllocatorSession::StartGetAllPorts() { + assert(talk_base::Thread::Current() == network_thread_); + running_ = true; + if (allocation_started_) + network_thread_->PostDelayed(ALLOCATE_DELAY, this, MSG_ALLOCATE); + for (uint32 i = 0; i < sequences_.size(); ++i) + sequences_[i]->Start(); + for (size_t i = 0; i < ports_.size(); ++i) + ports_[i].port->Start(); +} + +void BasicPortAllocatorSession::StopGetAllPorts() { + assert(talk_base::Thread::Current() == network_thread_); + running_ = false; + network_thread_->Clear(this, MSG_ALLOCATE); + for (uint32 i = 0; i < sequences_.size(); ++i) + sequences_[i]->Stop(); +} + +void BasicPortAllocatorSession::OnMessage(talk_base::Message *message) { + switch (message->message_id) { + case MSG_CONFIG_START: + assert(talk_base::Thread::Current() == network_thread_); + GetPortConfigurations(); + break; + + case MSG_CONFIG_READY: + assert(talk_base::Thread::Current() == network_thread_); + OnConfigReady(static_cast<PortConfiguration*>(message->pdata)); + break; + + case MSG_ALLOCATE: + assert(talk_base::Thread::Current() == network_thread_); + OnAllocate(); + break; + + case MSG_SHAKE: + assert(talk_base::Thread::Current() == network_thread_); + OnShake(); + break; + + default: + assert(false); + } +} + +void BasicPortAllocatorSession::GetPortConfigurations() { + PortConfiguration* config = NULL; + if (stun_address_ != NULL) + config = new PortConfiguration(*stun_address_, + CreateRandomString(16), + CreateRandomString(16), + ""); + PortConfiguration::PortList ports; + if (relay_address_ != NULL) { + ports.push_back(ProtocolAddress(*relay_address_, PROTO_UDP)); + config->AddRelay(ports, RELAY_PRIMARY_PREF_MODIFIER); + } + + ConfigReady(config); +} + +void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) { + network_thread_->Post(this, MSG_CONFIG_READY, config); +} + +// Adds a configuration to the list. +void BasicPortAllocatorSession::OnConfigReady(PortConfiguration* config) { + if (config) + configs_.push_back(config); + + AllocatePorts(); +} + +void BasicPortAllocatorSession::AllocatePorts() { + assert(talk_base::Thread::Current() == network_thread_); + + if (allocator_->proxy().type != talk_base::PROXY_NONE) + Port::set_proxy(allocator_->user_agent(), allocator_->proxy()); + + network_thread_->Post(this, MSG_ALLOCATE); +} + +// For each network, see if we have a sequence that covers it already. If not, +// create a new sequence to create the appropriate ports. +void BasicPortAllocatorSession::OnAllocate() { + std::vector<talk_base::Network*> networks; + allocator_->network_manager()->GetNetworks(networks); + + for (uint32 i = 0; i < networks.size(); ++i) { + if (HasEquivalentSequence(networks[i])) + continue; + + PortConfiguration* config = NULL; + if (configs_.size() > 0) + config = configs_.back(); + + AllocationSequence* sequence = + new AllocationSequence(this, networks[i], config); + if (running_) + sequence->Start(); + + sequences_.push_back(sequence); + } + + allocation_started_ = true; + if (running_) + network_thread_->PostDelayed(ALLOCATE_DELAY, this, MSG_ALLOCATE); +} + +bool BasicPortAllocatorSession::HasEquivalentSequence( + talk_base::Network* network) { + for (uint32 i = 0; i < sequences_.size(); ++i) + if (sequences_[i]->IsEquivalent(network)) + return true; + return false; +} + +void BasicPortAllocatorSession::AddAllocatedPort(Port* port, + AllocationSequence * seq, + float pref, + bool prepare_address) { + if (!port) + return; + + port->set_name(name_); + port->set_preference(pref); + port->set_generation(generation()); + PortData data; + data.port = port; + data.sequence = seq; + data.ready = false; + ports_.push_back(data); + port->SignalAddressReady.connect(this, &BasicPortAllocatorSession::OnAddressReady); + port->SignalConnectionCreated.connect(this, &BasicPortAllocatorSession::OnConnectionCreated); + port->SignalDestroyed.connect(this, &BasicPortAllocatorSession::OnPortDestroyed); + LOG_J(LS_INFO, port) << "Added port to allocator"; + if (prepare_address) + port->PrepareAddress(); + if (running_) + port->Start(); +} + +void BasicPortAllocatorSession::OnAddressReady(Port *port) { + assert(talk_base::Thread::Current() == network_thread_); + std::vector<PortData>::iterator it + = std::find(ports_.begin(), ports_.end(), port); + assert(it != ports_.end()); + if (it->ready) + return; + it->ready = true; + SignalPortReady(this, port); + + // Only accumulate the candidates whose protocol has been enabled + std::vector<Candidate> candidates; + const std::vector<Candidate>& potentials = port->candidates(); + for (size_t i=0; i<potentials.size(); ++i) { + ProtocolType pvalue; + if (!StringToProto(potentials[i].protocol().c_str(), pvalue)) + continue; + if (it->sequence->ProtocolEnabled(pvalue)) { + candidates.push_back(potentials[i]); + } + } + if (!candidates.empty()) { + SignalCandidatesReady(this, candidates); + } +} + +void BasicPortAllocatorSession::OnProtocolEnabled(AllocationSequence * seq, + ProtocolType proto) { + std::vector<Candidate> candidates; + for (std::vector<PortData>::iterator it = ports_.begin(); it != ports_.end(); ++it) { + if (!it->ready || (it->sequence != seq)) + continue; + + const std::vector<Candidate>& potentials = it->port->candidates(); + for (size_t i=0; i<potentials.size(); ++i) { + ProtocolType pvalue; + if (!StringToProto(potentials[i].protocol().c_str(), pvalue)) + continue; + if (pvalue == proto) { + candidates.push_back(potentials[i]); + } + } + } + if (!candidates.empty()) { + SignalCandidatesReady(this, candidates); + } +} + +void BasicPortAllocatorSession::OnPortDestroyed(Port* port) { + assert(talk_base::Thread::Current() == network_thread_); + std::vector<PortData>::iterator iter = + find(ports_.begin(), ports_.end(), port); + assert(iter != ports_.end()); + ports_.erase(iter); + + LOG_J(LS_INFO, port) << "Removed port from allocator (" + << static_cast<int>(ports_.size()) << " remaining)"; +} + +void BasicPortAllocatorSession::OnConnectionCreated(Port* port, + Connection* conn) { + conn->SignalStateChange.connect(this, + &BasicPortAllocatorSession::OnConnectionStateChange); +} + +void BasicPortAllocatorSession::OnConnectionStateChange(Connection* conn) { + if (conn->write_state() == Connection::STATE_WRITABLE) + allocator_->AddWritablePhase( + LocalCandidateToPhase(conn->local_candidate())); +} + +void BasicPortAllocatorSession::OnShake() { + LOG(INFO) << ">>>>> SHAKE <<<<< >>>>> SHAKE <<<<< >>>>> SHAKE <<<<<"; + + std::vector<Port*> ports; + std::vector<Connection*> connections; + + for (size_t i = 0; i < ports_.size(); ++i) { + if (ports_[i].ready) + ports.push_back(ports_[i].port); + } + + for (size_t i = 0; i < ports.size(); ++i) { + Port::AddressMap::const_iterator iter; + for (iter = ports[i]->connections().begin(); + iter != ports[i]->connections().end(); + ++iter) { + connections.push_back(iter->second); + } + } + + LOG(INFO) << ">>>>> Destroying " << (int)ports.size() << " ports and " + << (int)connections.size() << " connections"; + + for (size_t i = 0; i < connections.size(); ++i) + connections[i]->Destroy(); + + if (running_ || (ports.size() > 0) || (connections.size() > 0)) + network_thread_->PostDelayed(ShakeDelay(), this, MSG_SHAKE); +} + +// AllocationSequence + +AllocationSequence::AllocationSequence(BasicPortAllocatorSession* session, + talk_base::Network* network, + PortConfiguration* config) + : session_(session), network_(network), ip_(network->ip()), config_(config), + running_(false), step_(0) { + + // All of the phases up until the best-writable phase so far run in step 0. + // The other phases follow sequentially in the steps after that. If there is + // no best-writable so far, then only phase 0 occurs in step 0. + int last_phase_in_step_zero = + talk_base::_max(0, session->allocator()->best_writable_phase()); + for (int phase = 0; phase < kNumPhases; ++phase) + step_of_phase_[phase] = talk_base::_max(0, phase - last_phase_in_step_zero); + + // Immediately perform phase 0. + OnMessage(NULL); +} + +AllocationSequence::~AllocationSequence() { + session_->network_thread()->Clear(this); +} + +bool AllocationSequence::IsEquivalent(talk_base::Network* network) { + return (network == network_) && (ip_ == network->ip()); +} + +void AllocationSequence::Start() { + running_ = true; + session_->network_thread()->PostDelayed(ALLOCATION_STEP_DELAY, + this, + MSG_ALLOCATION_PHASE); +} + +void AllocationSequence::Stop() { + running_ = false; + session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE); +} + +void AllocationSequence::OnMessage(talk_base::Message* msg) { + assert(talk_base::Thread::Current() == session_->network_thread()); + if (msg) + assert(msg->message_id == MSG_ALLOCATION_PHASE); + + const char* const PHASE_NAMES[kNumPhases] = { + "Udp", "Relay", "Tcp", "SslTcp" + }; + + // Perform all of the phases in the current step. + for (int phase = 0; phase < kNumPhases; phase++) { + if (step_of_phase_[phase] != step_) + continue; + + LOG_J(LS_INFO, network_) << "Allocation Phase=" << PHASE_NAMES[phase] + << " (Step=" << step_ << ")"; + + switch (phase) { + case PHASE_UDP: + CreateUDPPorts(); + CreateStunPorts(); + EnableProtocol(PROTO_UDP); + break; + + case PHASE_RELAY: + CreateRelayPorts(); + break; + + case PHASE_TCP: + CreateTCPPorts(); + EnableProtocol(PROTO_TCP); + break; + + case PHASE_SSLTCP: + EnableProtocol(PROTO_SSLTCP); + break; + + default: + ASSERT(false); + } + } + + // TODO: use different delays for each stage + step_ += 1; + if (running_) { + session_->network_thread()->PostDelayed(ALLOCATION_STEP_DELAY, + this, + MSG_ALLOCATION_PHASE); + } +} + +void AllocationSequence::EnableProtocol(ProtocolType proto) { + if (!ProtocolEnabled(proto)) { + protocols_.push_back(proto); + session_->OnProtocolEnabled(this, proto); + } +} + +bool AllocationSequence::ProtocolEnabled(ProtocolType proto) const { + for (ProtocolList::const_iterator it = protocols_.begin(); it != protocols_.end(); ++it) { + if (*it == proto) + return true; + } + return false; +} + +void AllocationSequence::CreateUDPPorts() { + if (session_->flags() & PORTALLOCATOR_DISABLE_UDP) + return; + + Port* port = new UDPPort(session_->network_thread(), NULL, network_, + talk_base::SocketAddress(ip_, 0)); + session_->AddAllocatedPort(port, this, PREF_LOCAL_UDP); +} + +void AllocationSequence::CreateTCPPorts() { + if (session_->flags() & PORTALLOCATOR_DISABLE_TCP) + return; + + Port* port = new TCPPort(session_->network_thread(), NULL, network_, + talk_base::SocketAddress(ip_, 0)); + session_->AddAllocatedPort(port, this, PREF_LOCAL_TCP); +} + +void AllocationSequence::CreateStunPorts() { + if (session_->flags() & PORTALLOCATOR_DISABLE_STUN) + return; + + if (!config_ || config_->stun_address.IsAny()) + return; + + Port* port = new StunPort(session_->network_thread(), NULL, network_, + talk_base::SocketAddress(ip_, 0), + config_->stun_address); + session_->AddAllocatedPort(port, this, PREF_LOCAL_STUN); +} + +void AllocationSequence::CreateRelayPorts() { + if (session_->flags() & PORTALLOCATOR_DISABLE_RELAY) + return; + + if (!config_) + return; + + PortConfiguration::RelayList::const_iterator relay; + for (relay = config_->relays.begin(); + relay != config_->relays.end(); + ++relay) { + + RelayPort *port = new RelayPort(session_->network_thread(), NULL, network_, + talk_base::SocketAddress(ip_, 0), + config_->username, config_->password, + config_->magic_cookie); + // Note: We must add the allocated port before we add addresses because + // the latter will create candidates that need name and preference + // settings. However, we also can't prepare the address (normally + // done by AddAllocatedPort) until we have these addresses. So we + // wait to do that until below. + session_->AddAllocatedPort(port, this, PREF_RELAY + relay->pref_modifier, + false); + + // Add the addresses of this protocol. + PortConfiguration::PortList::const_iterator relay_port; + for (relay_port = relay->ports.begin(); + relay_port != relay->ports.end(); + ++relay_port) { + port->AddServerAddress(*relay_port); + port->AddExternalAddress(*relay_port); + } + + // Start fetching an address for this port. + port->PrepareAddress(); + } +} + +// PortConfiguration + +PortConfiguration::PortConfiguration(const talk_base::SocketAddress& sa, + const std::string& un, + const std::string& pw, + const std::string& mc) + : stun_address(sa), username(un), password(pw), magic_cookie(mc) { +} + +void PortConfiguration::AddRelay(const PortList& ports, float pref_modifier) { + RelayServer relay; + relay.ports = ports; + relay.pref_modifier = pref_modifier; + relays.push_back(relay); +} + +bool PortConfiguration::SupportsProtocol( + const PortConfiguration::RelayServer& relay, ProtocolType type) { + PortConfiguration::PortList::const_iterator relay_port; + for (relay_port = relay.ports.begin(); + relay_port != relay.ports.end(); + ++relay_port) { + if (relay_port->proto == type) + return true; + } + return false; +} + +} // namespace cricket diff --git a/Plugins/jingle/libjingle/talk/p2p/client/basicportallocator.h b/Plugins/jingle/libjingle/talk/p2p/client/basicportallocator.h new file mode 100644 index 0000000..0e3d313 --- /dev/null +++ b/Plugins/jingle/libjingle/talk/p2p/client/basicportallocator.h @@ -0,0 +1,175 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _BASICPORTALLOCATOR_H_ +#define _BASICPORTALLOCATOR_H_ + +#include "talk/base/thread.h" +#include "talk/base/messagequeue.h" +#include "talk/base/network.h" +#include "talk/p2p/base/portallocator.h" +#include <string> +#include <vector> + +namespace cricket { + +class BasicPortAllocator : public PortAllocator { +public: + BasicPortAllocator(talk_base::NetworkManager* network_manager); + BasicPortAllocator(talk_base::NetworkManager* network_manager, + talk_base::SocketAddress *stun_server, talk_base::SocketAddress *relay_server); + virtual ~BasicPortAllocator(); + + talk_base::NetworkManager* network_manager() { return network_manager_; } + + // Returns the best (highest preference) phase that has produced a port that + // produced a writable connection. If no writable connections have been + // produced, this returns -1. + int best_writable_phase() const; + + virtual PortAllocatorSession *CreateSession(const std::string &name, const std::string &session_type); + + // Called whenever a connection becomes writable with the argument being the + // phase that the corresponding port was created in. + void AddWritablePhase(int phase); + +private: + talk_base::NetworkManager* network_manager_; + talk_base::SocketAddress* stun_address_; + talk_base::SocketAddress* relay_address_; + int best_writable_phase_; +}; + +struct PortConfiguration; +class AllocationSequence; + +class BasicPortAllocatorSession: public PortAllocatorSession, + public talk_base::MessageHandler { +public: + BasicPortAllocatorSession(BasicPortAllocator *allocator, + const std::string &name, + const std::string &session_type); + BasicPortAllocatorSession(BasicPortAllocator *allocator, + const std::string &name, + const std::string &session_type, + talk_base::SocketAddress *stun_address, + talk_base::SocketAddress *relay_address); + ~BasicPortAllocatorSession(); + + BasicPortAllocator* allocator() { return allocator_; } + const std::string& name() const { return name_; } + const std::string& session_type() const { return session_type_; } + talk_base::Thread* network_thread() { return network_thread_; } + + virtual void GetInitialPorts(); + virtual void StartGetAllPorts(); + virtual void StopGetAllPorts(); + virtual bool IsGettingAllPorts() { return running_; } + +protected: + // Starts the process of getting the port configurations. + virtual void GetPortConfigurations(); + + // Adds a port configuration that is now ready. Once we have one for each + // network (or a timeout occurs), we will start allocating ports. + void ConfigReady(PortConfiguration* config); + + // MessageHandler. Can be overriden if message IDs do not conflict. + virtual void OnMessage(talk_base::Message *message); + +private: + void OnConfigReady(PortConfiguration* config); + void OnConfigTimeout(); + void AllocatePorts(); + void OnAllocate(); + bool HasEquivalentSequence(talk_base::Network* network); + void AddAllocatedPort(Port* port, AllocationSequence * seq, float pref, + bool prepare_address = true); + void OnAddressReady(Port *port); + void OnProtocolEnabled(AllocationSequence * seq, ProtocolType proto); + void OnPortDestroyed(Port* port); + void OnConnectionCreated(Port* port, Connection* conn); + void OnConnectionStateChange(Connection* conn); + void OnShake(); + + BasicPortAllocator *allocator_; + std::string name_; + std::string session_type_; + talk_base::Thread* network_thread_; + bool configuration_done_; + bool allocation_started_; + bool running_; // set when StartGetAllPorts is called + std::vector<PortConfiguration*> configs_; + std::vector<AllocationSequence*> sequences_; + talk_base::SocketAddress *stun_address_; + talk_base::SocketAddress *relay_address_; + + struct PortData { + Port * port; + AllocationSequence * sequence; + bool ready; + + bool operator==(Port * rhs) const { return (port == rhs); } + }; + std::vector<PortData> ports_; + + friend class AllocationSequence; +}; + +// Records configuration information useful in creating ports. +struct PortConfiguration : public talk_base::MessageData { + talk_base::SocketAddress stun_address; + std::string username; + std::string password; + std::string magic_cookie; + + typedef std::vector<ProtocolAddress> PortList; + struct RelayServer { + PortList ports; + float pref_modifier; // added to the protocol modifier to get the + // preference for this particular server + }; + + typedef std::vector<RelayServer> RelayList; + RelayList relays; + + PortConfiguration(const talk_base::SocketAddress& stun_address, + const std::string& username, + const std::string& password, + const std::string& magic_cookie); + + // Adds another relay server, with the given ports and modifier, to the list. + void AddRelay(const PortList& ports, float pref_modifier); + + // Determines whether the given relay server supports the given protocol. + static bool SupportsProtocol(const PortConfiguration::RelayServer& relay, + ProtocolType type); +}; + +} // namespace cricket + +#endif // _BASICPORTALLOCATOR_H_ diff --git a/Plugins/jingle/libjingle/talk/p2p/client/httpportallocator.cc b/Plugins/jingle/libjingle/talk/p2p/client/httpportallocator.cc new file mode 100644 index 0000000..9837478 --- /dev/null +++ b/Plugins/jingle/libjingle/talk/p2p/client/httpportallocator.cc @@ -0,0 +1,190 @@ +#if defined(_MSC_VER) && _MSC_VER < 1300 +#pragma warning(disable:4786) +#endif +#include "talk/base/asynchttprequest.h" +#include "talk/base/basicdefs.h" +#include "talk/base/common.h" +#include "talk/base/helpers.h" +#include "talk/base/logging.h" +#include "talk/base/signalthread.h" +#include "talk/p2p/client/httpportallocator.h" +#include <cassert> +#include <ctime> + +namespace { + +// Records the port on the hosts that will receive HTTP requests. +const uint16 kHostPort = 80; + +// Records the URL that we will GET in order to create a session. +const std::string kCreateSessionURL = "/create_session"; + +// The number of HTTP requests we should attempt before giving up. +const size_t kNumRetries = 5; + +// The delay before we give up on an HTTP request; +const int TIMEOUT = 5 * 1000; // 5 seconds + +const uint32 MSG_TIMEOUT = 100; // must not conflict with BasicPortAllocator.cpp + +// Helper routine to remove whitespace from the ends of a string. +void Trim(std::string& str) { + size_t first = str.find_first_not_of(" \t\r\n"); + if (first == std::string::npos) { + str.clear(); + return; + } + + size_t last = str.find_last_not_of(" \t\r\n"); + ASSERT(last != std::string::npos); +} + +// Parses the lines in the result of the HTTP request that are of the form +// 'a=b' and returns them in a map. +typedef std::map<std::string,std::string> StringMap; +void ParseMap(const std::string& string, StringMap& map) { + size_t start_of_line = 0; + size_t end_of_line = 0; + + for (;;) { // for each line + start_of_line = string.find_first_not_of("\r\n", end_of_line); + if (start_of_line == std::string::npos) + break; + + end_of_line = string.find_first_of("\r\n", start_of_line); + if (end_of_line == std::string::npos) { + end_of_line = string.length(); + } + + size_t equals = string.find('=', start_of_line); + if ((equals >= end_of_line) || (equals == std::string::npos)) + continue; + + std::string key(string, start_of_line, equals - start_of_line); + std::string value(string, equals + 1, end_of_line - equals - 1); + + Trim(key); + Trim(value); + + if ((key.size() > 0) && (value.size() > 0)) + map[key] = value; + } +} + +} + +namespace cricket { + +// HttpPortAllocator + +HttpPortAllocator::HttpPortAllocator(talk_base::NetworkManager* network_manager, const std::string &user_agent) + : BasicPortAllocator(network_manager), agent_(user_agent) { + relay_hosts_.push_back("relay.l.google.com"); + stun_hosts_.push_back(talk_base::SocketAddress("stun.l.google.com",19302)); +} + +HttpPortAllocator::~HttpPortAllocator() { +} + +PortAllocatorSession *HttpPortAllocator::CreateSession(const std::string &name, const std::string &session_type) { + return new HttpPortAllocatorSession(this, name, session_type, stun_hosts_, relay_hosts_, relay_token_, agent_); +} + +// HttpPortAllocatorSession + +HttpPortAllocatorSession::HttpPortAllocatorSession(HttpPortAllocator* allocator, const std::string &name, + const std::string &session_type, + const std::vector<talk_base::SocketAddress> &stun_hosts, + const std::vector<std::string> &relay_hosts, + const std::string &relay_token, + const std::string &user_agent) + : BasicPortAllocatorSession(allocator, name, session_type), + attempts_(0), relay_hosts_(relay_hosts), stun_hosts_(stun_hosts), relay_token_(relay_token), agent_(user_agent) { +} + +void HttpPortAllocatorSession::GetPortConfigurations() { + + if (attempts_ == kNumRetries) { + LOG(WARNING) << "HttpPortAllocator: maximum number of requests reached"; + return; + } + + if (relay_hosts_.size() <= 0) { + LOG(WARNING) << "HttpPortAllocator: no relay hosts found"; + return; + } + + // Choose the next host to try. + std::string host = relay_hosts_[attempts_ % relay_hosts_.size()]; + attempts_++; + LOG(INFO) << "HTTPPortAllocator: sending to host " << host; + + // Initiate an HTTP request to create a session through the chosen host. + + talk_base::AsyncHttpRequest* request = new talk_base::AsyncHttpRequest(agent_); + request->SignalWorkDone.connect(this, &HttpPortAllocatorSession::OnRequestDone); + + request->set_proxy(allocator()->proxy()); + request->response().document.reset(new talk_base::MemoryStream); + request->request().verb = talk_base::HV_GET; + request->request().path = kCreateSessionURL; + request->request().addHeader("X-Talk-Google-Relay-Auth", relay_token_, true); + request->request().addHeader("X-Google-Relay-Auth", relay_token_, true); + request->request().addHeader("X-Session-Type", session_type(), true); + request->set_host(host); + request->set_port(kHostPort); + request->Start(); + request->Release(); +} + +void HttpPortAllocatorSession::OnRequestDone(talk_base::SignalThread* data) { + talk_base::AsyncHttpRequest *request = + static_cast<talk_base::AsyncHttpRequest*> (data); + if (request->response().scode != 200) { + LOG(WARNING) << "HTTPPortAllocator: request " + << " received error " << request->response().scode; + GetPortConfigurations(); + return; + } + LOG(INFO) << "HTTPPortAllocator: request succeeded"; + + StringMap map; + talk_base::MemoryStream *stream = static_cast<talk_base::MemoryStream*>(request->response().document.get()); + stream->Rewind(); + size_t length; + stream->GetSize(&length); + std::string resp = std::string(stream->GetBuffer(), length); + ParseMap(resp, map); + + std::string username = map["username"]; + std::string password = map["password"]; + std::string magic_cookie = map["magic_cookie"]; + + std::string relay_ip = map["relay.ip"]; + std::string relay_udp_port = map["relay.udp_port"]; + std::string relay_tcp_port = map["relay.tcp_port"]; + std::string relay_ssltcp_port = map["relay.ssltcp_port"]; + + PortConfiguration* config = new PortConfiguration(stun_hosts_[0], + username, + password, + magic_cookie); + + PortConfiguration::PortList ports; + if (!relay_udp_port.empty()) { + talk_base::SocketAddress address(relay_ip, atoi(relay_udp_port.c_str())); + ports.push_back(ProtocolAddress(address, PROTO_UDP)); + } + if (!relay_tcp_port.empty()) { + talk_base::SocketAddress address(relay_ip, atoi(relay_tcp_port.c_str())); + ports.push_back(ProtocolAddress(address, PROTO_TCP)); + } + if (!relay_ssltcp_port.empty()) { + talk_base::SocketAddress address(relay_ip, atoi(relay_ssltcp_port.c_str())); + ports.push_back(ProtocolAddress(address, PROTO_SSLTCP)); + } + config->AddRelay(ports, 0.0f); + ConfigReady(config); +} + +} // namespace cricket diff --git a/Plugins/jingle/libjingle/talk/p2p/client/httpportallocator.h b/Plugins/jingle/libjingle/talk/p2p/client/httpportallocator.h new file mode 100644 index 0000000..96f3e72 --- /dev/null +++ b/Plugins/jingle/libjingle/talk/p2p/client/httpportallocator.h @@ -0,0 +1,61 @@ +#ifndef _HTTPPORTALLOCATOR_H_ +#define _HTTPPORTALLOCATOR_H_ + +#include "talk/p2p/client/basicportallocator.h" + +namespace talk_base { + class SignalThread; +} + +namespace cricket { + +class HttpPortAllocator : public BasicPortAllocator { +public: + HttpPortAllocator(talk_base::NetworkManager* network_manager, const std::string &user_agent); + virtual ~HttpPortAllocator(); + + virtual PortAllocatorSession *CreateSession(const std::string &name, + const std::string &session_type); + void SetStunHosts(const std::vector<talk_base::SocketAddress> &hosts) {stun_hosts_ = hosts;} + void SetRelayHosts(const std::vector<std::string> &hosts) {relay_hosts_ = hosts;} + void SetRelayToken(const std::string &relay) {relay_token_ = relay;} + std::string relay_token() const { return relay_token_; } +private: + std::vector<talk_base::SocketAddress> stun_hosts_; + std::vector<std::string> relay_hosts_; + std::string relay_token_; + std::string agent_; +}; + +class RequestData; + +class HttpPortAllocatorSession : public BasicPortAllocatorSession { + public: + HttpPortAllocatorSession(HttpPortAllocator *allocator, + const std::string &name, + const std::string &session_type, + const std::vector<talk_base::SocketAddress> &stun_hosts, + const std::vector<std::string> &relay_hosts, + const std::string &relay, + const std::string &agent); + ~HttpPortAllocatorSession() {}; + +protected: + virtual void GetPortConfigurations(); + +private: + std::vector<std::string> relay_hosts_; + std::vector<talk_base::SocketAddress> stun_hosts_; + std::string relay_token_; + std::string agent_; + + void OnRequestDone(talk_base::SignalThread* request); + HttpPortAllocator* http_allocator() { + return static_cast<HttpPortAllocator*>(allocator()); + } + int attempts_; +}; + +} // namespace cricket + +#endif // _XMPPPORTALLOCATOR_H_ diff --git a/Plugins/jingle/libjingle/talk/p2p/client/socketmonitor.cc b/Plugins/jingle/libjingle/talk/p2p/client/socketmonitor.cc new file mode 100644 index 0000000..88d37ce --- /dev/null +++ b/Plugins/jingle/libjingle/talk/p2p/client/socketmonitor.cc @@ -0,0 +1,164 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "talk/p2p/client/socketmonitor.h" +#include "talk/base/common.h" + +namespace cricket { + +const uint32 MSG_MONITOR_POLL = 1; +const uint32 MSG_MONITOR_START = 2; +const uint32 MSG_MONITOR_STOP = 3; +const uint32 MSG_MONITOR_SIGNAL = 4; + +SocketMonitor::SocketMonitor(Session* session, + TransportChannel* channel, + talk_base::Thread *monitor_thread) { + session_ = session; + channel_ = channel; + channel_thread_ = session->session_manager()->worker_thread(); + monitoring_thread_ = monitor_thread; + monitoring_ = false; +} + +SocketMonitor::~SocketMonitor() { + channel_thread_->Clear(this); + monitoring_thread_->Clear(this); +} + +void SocketMonitor::Start(int milliseconds) { + rate_ = milliseconds; + if (rate_ < 250) + rate_ = 250; + channel_thread_->Post(this, MSG_MONITOR_START); +} + +void SocketMonitor::Stop() { + channel_thread_->Post(this, MSG_MONITOR_STOP); +} + +void SocketMonitor::OnMessage(talk_base::Message *message) { + talk_base::CritScope cs(&crit_); + + switch (message->message_id) { + case MSG_MONITOR_START: + ASSERT(talk_base::Thread::Current() == channel_thread_); + if (!monitoring_) { + monitoring_ = true; + if (GetP2PChannel() != NULL) { + GetP2PChannel()->SignalConnectionMonitor.connect( + this, &SocketMonitor::OnConnectionMonitor); + } + PollSocket(true); + } + break; + + case MSG_MONITOR_STOP: + ASSERT(talk_base::Thread::Current() == channel_thread_); + if (monitoring_) { + monitoring_ = false; + if (GetP2PChannel() != NULL) + GetP2PChannel()->SignalConnectionMonitor.disconnect(this); + channel_thread_->Clear(this); + } + break; + + case MSG_MONITOR_POLL: + ASSERT(talk_base::Thread::Current() == channel_thread_); + PollSocket(true); + break; + + case MSG_MONITOR_SIGNAL: + { + ASSERT(talk_base::Thread::Current() == monitoring_thread_); + std::vector<ConnectionInfo> infos = connection_infos_; + crit_.Leave(); + SignalUpdate(this, infos); + crit_.Enter(); + } + break; + } +} + +void SocketMonitor::OnConnectionMonitor(P2PTransportChannel* channel) { + talk_base::CritScope cs(&crit_); + if (monitoring_) + PollSocket(false); +} + +void SocketMonitor::PollSocket(bool poll) { + ASSERT(talk_base::Thread::Current() == channel_thread_); + talk_base::CritScope cs(&crit_); + + // Gather connection infos + + P2PTransportChannel* p2p_channel = GetP2PChannel(); + if (p2p_channel != NULL) { + connection_infos_.clear(); + const std::vector<Connection *> &connections = p2p_channel->connections(); + std::vector<Connection *>::const_iterator it; + for (it = connections.begin(); it != connections.end(); it++) { + Connection *connection = *it; + ConnectionInfo info; + info.best_connection = p2p_channel->best_connection() == connection; + info.readable = connection->read_state() == Connection::STATE_READABLE; + info.writable = connection->write_state() == Connection::STATE_WRITABLE; + info.timeout = connection->write_state() == Connection::STATE_WRITE_TIMEOUT; + info.new_connection = !connection->reported(); + connection->set_reported(true); + info.rtt = connection->rtt(); + info.sent_total_bytes = connection->sent_total_bytes(); + info.sent_bytes_second = connection->sent_bytes_second(); + info.recv_total_bytes = connection->recv_total_bytes(); + info.recv_bytes_second = connection->recv_bytes_second(); + info.local_candidate = connection->local_candidate(); + info.remote_candidate = connection->remote_candidate(); + info.est_quality = connection->port()->network()->quality(); + info.key = reinterpret_cast<void *>(connection); + connection_infos_.push_back(info); + } + } + + // Signal the monitoring thread, start another poll timer + + monitoring_thread_->Post(this, MSG_MONITOR_SIGNAL); + if (poll) + channel_thread_->PostDelayed(rate_, this, MSG_MONITOR_POLL); +} + +P2PTransportChannel* SocketMonitor::GetP2PChannel() { + if (session_->transport() == NULL) + return NULL; + if (session_->transport()->name() != kNsP2pTransport) + return NULL; + TransportChannelImpl* impl = session_->GetImplementation(channel_); + if (impl == NULL) + return NULL; + return static_cast<P2PTransportChannel*>(impl); +} + +} diff --git a/Plugins/jingle/libjingle/talk/p2p/client/socketmonitor.h b/Plugins/jingle/libjingle/talk/p2p/client/socketmonitor.h new file mode 100644 index 0000000..ced86b8 --- /dev/null +++ b/Plugins/jingle/libjingle/talk/p2p/client/socketmonitor.h @@ -0,0 +1,91 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _SOCKETMONITOR_H_ +#define _SOCKETMONITOR_H_ + +#include "talk/base/thread.h" +#include "talk/base/sigslot.h" +#include "talk/base/criticalsection.h" +#include "talk/p2p/base/session.h" +#include "talk/p2p/base/p2ptransportchannel.h" +#include "talk/p2p/base/port.h" +#include <vector> + +namespace cricket { + +struct ConnectionInfo { + bool best_connection; + bool writable; + bool readable; + bool timeout; + bool new_connection; + size_t rtt; + size_t sent_total_bytes; + size_t sent_bytes_second; + size_t recv_total_bytes; + size_t recv_bytes_second; + Candidate local_candidate; + Candidate remote_candidate; + double est_quality; + void *key; +}; + +class SocketMonitor : public talk_base::MessageHandler, + public sigslot::has_slots<> { +public: + SocketMonitor(Session* session, TransportChannel* channel, + talk_base::Thread *monitor_thread); + ~SocketMonitor(); + + void Start(int cms); + void Stop(); + + talk_base::Thread *monitor_thread() { return monitoring_thread_; } + + sigslot::signal2<SocketMonitor *, + const std::vector<ConnectionInfo> &> SignalUpdate; + +protected: + void OnMessage(talk_base::Message *message); + void OnConnectionMonitor(P2PTransportChannel* channel); + void PollSocket(bool poll); + P2PTransportChannel* GetP2PChannel(); + + std::vector<ConnectionInfo> connection_infos_; + Session* session_; + TransportChannel* channel_; + talk_base::Thread* channel_thread_; + talk_base::Thread* monitoring_thread_; + talk_base::CriticalSection crit_; + uint32 rate_; + bool monitoring_; +}; + +} + +#endif // _SOCKETMONITOR_H_ |