diff options
Diffstat (limited to 'Plugins/jingle/libjingle/talk/base/socketpool.cc')
-rw-r--r-- | Plugins/jingle/libjingle/talk/base/socketpool.cc | 263 |
1 files changed, 263 insertions, 0 deletions
diff --git a/Plugins/jingle/libjingle/talk/base/socketpool.cc b/Plugins/jingle/libjingle/talk/base/socketpool.cc new file mode 100644 index 0000000..614ce89 --- /dev/null +++ b/Plugins/jingle/libjingle/talk/base/socketpool.cc @@ -0,0 +1,263 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include <errno.h> + +#include "talk/base/asyncsocket.h" +#include "talk/base/logging.h" +#include "talk/base/socketfactory.h" +#include "talk/base/socketpool.h" +#include "talk/base/socketstream.h" + +namespace talk_base { + +/////////////////////////////////////////////////////////////////////////////// +// StreamCache - Caches a set of open streams, defers creation to a separate +// StreamPool. +/////////////////////////////////////////////////////////////////////////////// + +StreamCache::StreamCache(StreamPool* pool) : pool_(pool) { +} + +StreamCache::~StreamCache() { + for (ConnectedList::iterator it = active_.begin(); it != active_.end(); + ++it) { + delete it->second; + } + for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); + ++it) { + delete it->second; + } +} + +StreamInterface* StreamCache::RequestConnectedStream( + const SocketAddress& remote, int* err) { + LOG_F(LS_VERBOSE) << "(" << remote.ToString() << ")"; + for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); + ++it) { + if (remote == it->first) { + it->second->SignalEvent.disconnect(this); + // Move from cached_ to active_ + active_.push_front(*it); + cached_.erase(it); + if (err) + *err = 0; + LOG_F(LS_VERBOSE) << "Providing cached stream"; + return active_.front().second; + } + } + if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { + // We track active streams so that we can remember their address + active_.push_front(ConnectedStream(remote, stream)); + LOG_F(LS_VERBOSE) << "Providing new stream"; + return active_.front().second; + } + return NULL; +} + +void StreamCache::ReturnConnectedStream(StreamInterface* stream) { + for (ConnectedList::iterator it = active_.begin(); it != active_.end(); + ++it) { + if (stream == it->second) { + LOG_F(LS_VERBOSE) << "(" << it->first.ToString() << ")"; + if (stream->GetState() == SS_CLOSED) { + // Return closed streams + LOG_F(LS_VERBOSE) << "Returning closed stream"; + pool_->ReturnConnectedStream(it->second); + } else { + // Monitor open streams + stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent); + LOG_F(LS_VERBOSE) << "Caching stream"; + cached_.push_front(*it); + } + active_.erase(it); + return; + } + } + ASSERT(false); +} + +void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) { + if ((events & SE_CLOSE) == 0) { + LOG_F(LS_WARNING) << "(" << events << ", " << err + << ") received non-close event"; + return; + } + for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); + ++it) { + if (stream == it->second) { + LOG_F(LS_VERBOSE) << "(" << it->first.ToString() << ")"; + // We don't cache closed streams, so return it. + it->second->SignalEvent.disconnect(this); + LOG_F(LS_VERBOSE) << "Returning closed stream"; + pool_->ReturnConnectedStream(it->second); + cached_.erase(it); + return; + } + } + ASSERT(false); +} + +////////////////////////////////////////////////////////////////////// +// NewSocketPool +////////////////////////////////////////////////////////////////////// + +NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) { +} + +NewSocketPool::~NewSocketPool() { + for (size_t i = 0; i < used_.size(); ++i) { + delete used_[i]; + } +} + +StreamInterface* +NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { + AsyncSocket* socket = factory_->CreateAsyncSocket(SOCK_STREAM); + if (!socket) { + ASSERT(false); + if (err) + *err = -1; + return NULL; + } + if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) { + if (err) + *err = socket->GetError(); + delete socket; + return NULL; + } + if (err) + *err = 0; + return new SocketStream(socket); +} + +void +NewSocketPool::ReturnConnectedStream(StreamInterface* stream) { + used_.push_back(stream); +} + +////////////////////////////////////////////////////////////////////// +// ReuseSocketPool +////////////////////////////////////////////////////////////////////// + +ReuseSocketPool::ReuseSocketPool(SocketFactory* factory, AsyncSocket* socket) + : factory_(factory), stream_(NULL) { + stream_ = socket ? new SocketStream(socket) : NULL; +} + +ReuseSocketPool::~ReuseSocketPool() { + delete stream_; +} + +void +ReuseSocketPool::setSocket(AsyncSocket* socket) { + ASSERT(false); // TODO: need ref-counting to make this work + delete stream_; + stream_ = socket ? new SocketStream(socket) : NULL; +} + +StreamInterface* +ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { + if (!stream_) { + LOG(LS_INFO) << "ReuseSocketPool - Creating new socket"; + AsyncSocket* socket = factory_->CreateAsyncSocket(SOCK_STREAM); + if (!socket) { + ASSERT(false); + if (err) + *err = -1; + return NULL; + } + stream_ = new SocketStream(socket); + } + if ((stream_->GetState() == SS_OPEN) && + (stream_->GetSocket()->GetRemoteAddress() == remote)) { + LOG(LS_INFO) << "ReuseSocketPool - Reusing connection to: " + << remote.ToString(); + } else { + stream_->Close(); + if ((stream_->GetSocket()->Connect(remote) != 0) + && !stream_->GetSocket()->IsBlocking()) { + if (err) + *err = stream_->GetSocket()->GetError(); + return NULL; + } else { + LOG(LS_INFO) << "ReuseSocketPool - Opening connection to: " + << remote.ToString(); + } + } + if (err) + *err = 0; + return stream_; +} + +void +ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) { + // Note: this might not be true with the advent of setSocket + ASSERT(stream == stream_); +} + +/////////////////////////////////////////////////////////////////////////////// +// LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached +// LoggingAdapters. +/////////////////////////////////////////////////////////////////////////////// + +LoggingPoolAdapter::LoggingPoolAdapter( + StreamPool* pool, LoggingSeverity level, const std::string& label, + bool binary_mode) + : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) { +} + +LoggingPoolAdapter::~LoggingPoolAdapter() { + for (StreamList::iterator it = recycle_bin_.begin(); + it != recycle_bin_.end(); ++it) { + delete *it; + } +} + +StreamInterface* LoggingPoolAdapter::RequestConnectedStream( + const SocketAddress& remote, int* err) { + if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { + if (recycle_bin_.empty()) { + return new LoggingAdapter(stream, level_, label_, binary_mode_); + } + LoggingAdapter* logging = recycle_bin_.front(); + recycle_bin_.pop_front(); + logging->Attach(stream); + return logging; + } + return NULL; +} + +void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) { + LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream); + pool_->ReturnConnectedStream(logging->Detach()); + recycle_bin_.push_back(logging); +} + +/////////////////////////////////////////////////////////////////////////////// + +} // namespace talk_base |