diff options
author | pescuma <pescuma@c086bb3d-8645-0410-b8da-73a8550f86e7> | 2011-10-10 01:39:18 +0000 |
---|---|---|
committer | pescuma <pescuma@c086bb3d-8645-0410-b8da-73a8550f86e7> | 2011-10-10 01:39:18 +0000 |
commit | e9bf8a6e2d782dc480fb97cb59928c8cfe1dd777 (patch) | |
tree | 12bb8ebaab13ef1476ce9343482796c901ee0130 /Plugins/jingle/libjingle/talk/base/virtualsocketserver.cc | |
parent | f574681d9b6fee0d05319af1831620e48cc8492f (diff) |
Moved files from BerliOS
git-svn-id: http://pescuma.googlecode.com/svn/trunk/Miranda@229 c086bb3d-8645-0410-b8da-73a8550f86e7
Diffstat (limited to 'Plugins/jingle/libjingle/talk/base/virtualsocketserver.cc')
-rw-r--r-- | Plugins/jingle/libjingle/talk/base/virtualsocketserver.cc | 616 |
1 files changed, 616 insertions, 0 deletions
diff --git a/Plugins/jingle/libjingle/talk/base/virtualsocketserver.cc b/Plugins/jingle/libjingle/talk/base/virtualsocketserver.cc new file mode 100644 index 0000000..884b324 --- /dev/null +++ b/Plugins/jingle/libjingle/talk/base/virtualsocketserver.cc @@ -0,0 +1,616 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include <algorithm> +#include <cassert> +#include <cmath> +#include <cstring> +#include <iostream> +#include <vector> +#include <errno.h> + +#include "talk/base/virtualsocketserver.h" +#include "talk/base/common.h" +#include "talk/base/time.h" + +namespace talk_base { + +const uint32 HEADER_SIZE = 28; // IP + UDP headers + +const uint32 MSG_ID_PACKET = 1; +// TODO: Add a message type for new connections. + +// Packets are passed between sockets as messages. We copy the data just like +// the kernel does. +class Packet : public MessageData { +public: + Packet(const char* data, size_t size, const SocketAddress& from) + : size_(size), from_(from) { + assert(data); + assert(size_ >= 0); + data_ = new char[size_]; + std::memcpy(data_, data, size_); + } + + virtual ~Packet() { + delete data_; + } + + const char* data() const { return data_; } + size_t size() const { return size_; } + const SocketAddress& from() const { return from_; } + + // Remove the first size bytes from the data. + void Consume(size_t size) { + assert(size < size_); + size_ -= size; + char* new_data = new char[size_]; + std::memcpy(new_data, data_, size); + delete[] data_; + data_ = new_data; + } + +private: + char* data_; + size_t size_; + SocketAddress from_; +}; + +// Implements the socket interface using the virtual network. Packets are +// passed as messages using the message queue of the socket server. +class VirtualSocket : public AsyncSocket, public MessageHandler { +public: + VirtualSocket( + VirtualSocketServer* server, int type, bool async, uint32 ip) + : server_(server), type_(type), async_(async), connected_(false), + local_ip_(ip), readable_(true), queue_size_(0) { + assert((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); + packets_ = new std::vector<Packet*>(); + } + + ~VirtualSocket() { + Close(); + + for (unsigned i = 0; i < packets_->size(); i++) + delete (*packets_)[i]; + delete packets_; + } + + SocketAddress GetLocalAddress() const { + return local_addr_; + } + + SocketAddress GetRemoteAddress() const { + return remote_addr_; + } + + int Bind(const SocketAddress& addr) { + assert(addr.port() != 0); + int result = server_->Bind(addr, this); + if (result >= 0) + local_addr_ = addr; + else + error_ = EADDRINUSE; + return result; + } + + int Connect(const SocketAddress& addr) { + assert(!connected_); + connected_ = true; + remote_addr_ = addr; + assert(type_ == SOCK_DGRAM); // stream not yet implemented + return 0; + } + + int Close() { + if (!local_addr_.IsAny()) + server_->Unbind(local_addr_, this); + + connected_ = false; + local_addr_ = SocketAddress(); + remote_addr_ = SocketAddress(); + return 0; + } + + int Send(const void *pv, size_t cb) { + assert(connected_); + return SendInternal(pv, cb, remote_addr_); + } + + int SendTo(const void *pv, size_t cb, const SocketAddress& addr) { + assert(!connected_); + return SendInternal(pv, cb, addr); + } + + int SendInternal(const void *pv, size_t cb, const SocketAddress& addr) { + // If we have not been assigned a local port, then get one. + if (local_addr_.IsAny()) { + local_addr_.SetIP(local_ip_); + int result = server_->Bind(this, &local_addr_); + if (result < 0) { + local_addr_.SetIP(0); + error_ = EADDRINUSE; + return result; + } + } + + // Send the data in a message to the appropriate socket. + return server_->Send(this, pv, cb, local_addr_, addr); + } + + int Recv(void *pv, size_t cb) { + SocketAddress addr; + return RecvFrom(pv, cb, &addr); + } + + int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) { + // If we don't have a packet, then either error or wait for one to arrive. + if (packets_->size() == 0) { + if (async_) { + error_ = EAGAIN; + return -1; + } + while (packets_->size() == 0) { + Message msg; + server_->msg_queue_->Get(&msg); + server_->msg_queue_->Dispatch(&msg); + } + } + + // Return the packet at the front of the queue. + Packet* packet = packets_->front(); + *paddr = packet->from(); + int size = (int)packet->size(); + if (size <= (int)cb) { + std::memcpy(pv, packet->data(), size); + packets_->erase(packets_->begin()); + delete packet; + return size; + } else { + std::memcpy(pv, packet->data(), cb); + packet->Consume(cb); + return (int)cb; + } + } + + int Listen(int backlog) { + assert(false); // not yet implemented + return 0; + } + + Socket* Accept(SocketAddress *paddr) { + assert(false); // not yet implemented + return 0; + } + + bool readable() { return readable_; } + void set_readable(bool value) { readable_ = value; } + + bool writable() { return false; } + void set_writable(bool value) { + // TODO: Send ourselves messages (delayed after the first) to give them a + // chance to write. + assert(false); + } + + int GetError() const { + return error_; + } + + void SetError(int error) { + error_ = error; + } + + ConnState GetState() const { + return connected_ ? CS_CONNECTED : CS_CLOSED; + } + + int SetOption(Option opt, int value) { + return 0; + } + + int EstimateMTU(uint16* mtu) { + if (!connected_) + return ENOTCONN; + else + return 65536; + } + + void OnMessage(Message *pmsg) { + if (pmsg->message_id == MSG_ID_PACKET) { + assert(pmsg->pdata); + Packet* packet = static_cast<Packet*>(pmsg->pdata); + + if (!readable_) + return; + + packets_->push_back(packet); + + if (async_) { + SignalReadEvent(this); + + // TODO: If the listeners don't want to read this packet now, we will + // need to send ourselves delayed messages to try again. + assert(packets_->size() == 0); + } + } else { + assert(false); + } + } + +private: + struct QueueEntry { + uint32 size; + uint32 done_time; + }; + + typedef std::deque<QueueEntry> SendQueue; + + VirtualSocketServer* server_; + int type_; + bool async_; + bool connected_; + uint32 local_ip_; + bool readable_; + SocketAddress local_addr_; + SocketAddress remote_addr_; + std::vector<Packet*>* packets_; + int error_; + SendQueue queue_; + uint32 queue_size_; + CriticalSection queue_crit_; + + friend class VirtualSocketServer; +}; + +VirtualSocketServer::VirtualSocketServer() + : fWait_(false), wait_version_(0), next_ip_(1), next_port_(45000), + bandwidth_(0), queue_capacity_(64 * 1024), delay_mean_(0), + delay_stddev_(0), delay_dist_(0), drop_prob_(0.0) { + msg_queue_ = new MessageQueue(); // uses physical socket server for Wait + bindings_ = new AddressMap(); + + UpdateDelayDistribution(); +} + +VirtualSocketServer::~VirtualSocketServer() { + delete bindings_; + delete msg_queue_; + delete delay_dist_; +} + +uint32 VirtualSocketServer::GetNextIP() { + return next_ip_++; +} + +Socket* VirtualSocketServer::CreateSocket(int type) { + return CreateSocketInternal(type); +} + +AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) { + return CreateSocketInternal(type); +} + +VirtualSocket* VirtualSocketServer::CreateSocketInternal(int type) { + uint32 ip = (next_ip_ > 1) ? next_ip_ - 1 : 1; + return new VirtualSocket(this, type, true, ip); +} + +bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { + ASSERT(process_io); // This can't be easily supported. + + uint32 msEnd; + if (cmsWait != kForever) + msEnd = GetMillisecondCount() + cmsWait; + uint32 cmsNext = cmsWait; + + fWait_ = true; + wait_version_ += 1; + + while (fWait_) { + Message msg; + if (!msg_queue_->Get(&msg, cmsNext)) + return true; + msg_queue_->Dispatch(&msg); + + if (cmsWait != kForever) { + uint32 msCur = GetMillisecondCount(); + if (msCur >= msEnd) + return true; + cmsNext = msEnd - msCur; + } + } + return true; +} + +const uint32 MSG_WAKE_UP = 1; + +struct WakeUpMessage : public MessageData { + WakeUpMessage(uint32 ver) : wait_version(ver) {} + virtual ~WakeUpMessage() {} + + uint32 wait_version; +}; + +void VirtualSocketServer::WakeUp() { + msg_queue_->Post(this, MSG_WAKE_UP, new WakeUpMessage(wait_version_)); +} + +void VirtualSocketServer::OnMessage(Message* pmsg) { + assert(pmsg->message_id == MSG_WAKE_UP); + assert(pmsg->pdata); + WakeUpMessage* wmsg = static_cast<WakeUpMessage*>(pmsg->pdata); + if (wmsg->wait_version == wait_version_) + fWait_ = false; + delete pmsg->pdata; +} + +int VirtualSocketServer::Bind( + const SocketAddress& addr, VirtualSocket* socket) { + assert(addr.ip() > 0); // don't support any-address right now + assert(addr.port() > 0); + assert(socket); + + if (bindings_->find(addr) == bindings_->end()) { + (*bindings_)[addr] = socket; + return 0; + } else { + return -1; + } +} + +int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) { + assert(addr->ip() > 0); // don't support any-address right now + assert(socket); + + for (int i = 0; i < 65536; i++) { + addr->SetPort(next_port_++); + if (addr->port() > 0) { + AddressMap::iterator iter = bindings_->find(*addr); + if (iter == bindings_->end()) { + (*bindings_)[*addr] = socket; + return 0; + } + } + } + + errno = EADDRINUSE; // TODO: is there a better error number? + return -1; +} + +int VirtualSocketServer::Unbind( + const SocketAddress& addr, VirtualSocket* socket) { + assert((*bindings_)[addr] == socket); + bindings_->erase(bindings_->find(addr)); + return 0; +} + +static double Random() { + return double(rand()) / RAND_MAX; +} + +int VirtualSocketServer::Send( + VirtualSocket* socket, const void *pv, size_t cb, + const SocketAddress& local_addr, const SocketAddress& remote_addr) { + + // See if we want to drop this packet. + if (Random() < drop_prob_) { + std::cerr << "Dropping packet: bad luck" << std::endl; + return 0; + } + + uint32 cur_time = GetMillisecondCount(); + uint32 send_delay; + + // Determine whether we have enough bandwidth to accept this packet. To do + // this, we need to update the send queue. Once we know it's current size, + // we know whether we can fit this packet. + // + // NOTE: There are better algorithms for maintaining such a queue (such as + // "Derivative Random Drop"); however, this algorithm is a more accurate + // simulation of what a normal network would do. + { + CritScope cs(&socket->queue_crit_); + + while ((socket->queue_.size() > 0) && + (socket->queue_.front().done_time <= cur_time)) { + assert(socket->queue_size_ >= socket->queue_.front().size); + socket->queue_size_ -= socket->queue_.front().size; + socket->queue_.pop_front(); + } + + VirtualSocket::QueueEntry entry; + entry.size = uint32(cb) + HEADER_SIZE; + + if (socket->queue_size_ + entry.size > queue_capacity_) { + std::cerr << "Dropping packet: queue capacity exceeded" << std::endl; + return 0; // not an error + } + + socket->queue_size_ += entry.size; + send_delay = SendDelay(socket->queue_size_); + entry.done_time = cur_time + send_delay; + socket->queue_.push_back(entry); + } + + // Find the delay for crossing the many virtual hops of the network. + uint32 transit_delay = GetRandomTransitDelay(); + + // Post the packet as a message to be delivered (on our own thread) + + AddressMap::iterator iter = bindings_->find(remote_addr); + if (iter != bindings_->end()) { + Packet* p = new Packet(static_cast<const char*>(pv), cb, local_addr); + uint32 delay = send_delay + transit_delay; + msg_queue_->PostDelayed(delay, iter->second, MSG_ID_PACKET, p); + } else { + std::cerr << "No one listening at " << remote_addr.ToString() << std::endl; + } + return (int)cb; +} + +uint32 VirtualSocketServer::SendDelay(uint32 size) { + if (bandwidth_ == 0) + return 0; + else + return 1000 * size / bandwidth_; +} + +void PrintFunction(std::vector<std::pair<double,double> >* f) { + for (uint32 i = 0; i < f->size(); i++) + std::cout << (*f)[i].first << '\t' << (*f)[i].second << std::endl; +} + +void VirtualSocketServer::UpdateDelayDistribution() { + Function* dist = GetDelayDistribution(); + dist = Resample(Invert(Accumulate(dist)), 0, 1); + + // We take a lock just to make sure we don't leak memory. + { + CritScope cs(&delay_crit_); + delete delay_dist_; + delay_dist_ = dist; + } +} + +const int NUM_SAMPLES = 100; // 1000; + +static double PI = 4 * std::atan(1.0); + +static double Normal(double x, double mean, double stddev) { + double a = (x - mean) * (x - mean) / (2 * stddev * stddev); + return std::exp(-a) / (stddev * sqrt(2 * PI)); +} + +#if 0 // static unused gives a warning +static double Pareto(double x, double min, double k) { + if (x < min) + return 0; + else + return k * std::pow(min, k) / std::pow(x, k+1); +} +#endif + +VirtualSocketServer::Function* VirtualSocketServer::GetDelayDistribution() { + Function* f = new Function(); + + if (delay_stddev_ == 0) { + + f->push_back(Point(delay_mean_, 1.0)); + + } else { + + double start = 0; + if (delay_mean_ >= 4 * double(delay_stddev_)) + start = delay_mean_ - 4 * double(delay_stddev_); + double end = delay_mean_ + 4 * double(delay_stddev_); + + double delay_min = 0; + if (delay_mean_ >= 1.0 * delay_stddev_) + delay_min = delay_mean_ - 1.0 * delay_stddev_; + + for (int i = 0; i < NUM_SAMPLES; i++) { + double x = start + (end - start) * i / (NUM_SAMPLES - 1); + double y = Normal(x, delay_mean_, delay_stddev_); + f->push_back(Point(x, y)); + } + + } + + return f; +} + +uint32 VirtualSocketServer::GetRandomTransitDelay() { + double delay = (*delay_dist_)[rand() % delay_dist_->size()].second; + return uint32(delay); +} + +struct FunctionDomainCmp { + bool operator ()(const VirtualSocketServer::Point& p1, const VirtualSocketServer::Point& p2) { + return p1.first < p2.first; + } + bool operator ()(double v1, const VirtualSocketServer::Point& p2) { + return v1 < p2.first; + } + bool operator ()(const VirtualSocketServer::Point& p1, double v2) { + return p1.first < v2; + } +}; + +VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) { + assert(f->size() >= 1); + double v = 0; + for (Function::size_type i = 0; i < f->size() - 1; ++i) { + double dx = (*f)[i].second * ((*f)[i+1].first - (*f)[i].first); + v = (*f)[i].second = v + dx; + } + (*f)[f->size()-1].second = v; + return f; +} + +VirtualSocketServer::Function* VirtualSocketServer::Invert(Function* f) { + for (Function::size_type i = 0; i < f->size(); ++i) + std::swap((*f)[i].first, (*f)[i].second); + + std::sort(f->begin(), f->end(), FunctionDomainCmp()); + return f; +} + +VirtualSocketServer::Function* VirtualSocketServer::Resample( + Function* f, double x1, double x2) { + Function* g = new Function(); + + for (int i = 0; i < NUM_SAMPLES; i++) { + double x = x1 + (x2 - x1) * i / (NUM_SAMPLES - 1); + double y = Evaluate(f, x); + g->push_back(Point(x, y)); + } + + delete f; + return g; +} + +double VirtualSocketServer::Evaluate(Function* f, double x) { + Function::iterator iter = + std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp()); + if (iter == f->begin()) { + return (*f)[0].second; + } else if (iter == f->end()) { + assert(f->size() >= 1); + return (*f)[f->size() - 1].second; + } else if (iter->first == x) { + return iter->second; + } else { + double x1 = (iter - 1)->first; + double y1 = (iter - 1)->second; + double x2 = iter->first; + double y2 = iter->second; + return y1 + (y2 - y1) * (x - x1) / (x2 - x1); + } +} + +} // namespace talk_base |