/* * libjingle * Copyright 2004--2005, Google Inc. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include "talk/base/virtualsocketserver.h" #include "talk/base/common.h" #include "talk/base/time.h" namespace talk_base { const uint32 HEADER_SIZE = 28; // IP + UDP headers const uint32 MSG_ID_PACKET = 1; // TODO: Add a message type for new connections. // Packets are passed between sockets as messages. We copy the data just like // the kernel does. class Packet : public MessageData { public: Packet(const char* data, size_t size, const SocketAddress& from) : size_(size), from_(from) { assert(data); assert(size_ >= 0); data_ = new char[size_]; std::memcpy(data_, data, size_); } virtual ~Packet() { delete data_; } const char* data() const { return data_; } size_t size() const { return size_; } const SocketAddress& from() const { return from_; } // Remove the first size bytes from the data. void Consume(size_t size) { assert(size < size_); size_ -= size; char* new_data = new char[size_]; std::memcpy(new_data, data_, size); delete[] data_; data_ = new_data; } private: char* data_; size_t size_; SocketAddress from_; }; // Implements the socket interface using the virtual network. Packets are // passed as messages using the message queue of the socket server. class VirtualSocket : public AsyncSocket, public MessageHandler { public: VirtualSocket( VirtualSocketServer* server, int type, bool async, uint32 ip) : server_(server), type_(type), async_(async), connected_(false), local_ip_(ip), readable_(true), queue_size_(0) { assert((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); packets_ = new std::vector(); } ~VirtualSocket() { Close(); for (unsigned i = 0; i < packets_->size(); i++) delete (*packets_)[i]; delete packets_; } SocketAddress GetLocalAddress() const { return local_addr_; } SocketAddress GetRemoteAddress() const { return remote_addr_; } int Bind(const SocketAddress& addr) { assert(addr.port() != 0); int result = server_->Bind(addr, this); if (result >= 0) local_addr_ = addr; else error_ = EADDRINUSE; return result; } int Connect(const SocketAddress& addr) { assert(!connected_); connected_ = true; remote_addr_ = addr; assert(type_ == SOCK_DGRAM); // stream not yet implemented return 0; } int Close() { if (!local_addr_.IsAny()) server_->Unbind(local_addr_, this); connected_ = false; local_addr_ = SocketAddress(); remote_addr_ = SocketAddress(); return 0; } int Send(const void *pv, size_t cb) { assert(connected_); return SendInternal(pv, cb, remote_addr_); } int SendTo(const void *pv, size_t cb, const SocketAddress& addr) { assert(!connected_); return SendInternal(pv, cb, addr); } int SendInternal(const void *pv, size_t cb, const SocketAddress& addr) { // If we have not been assigned a local port, then get one. if (local_addr_.IsAny()) { local_addr_.SetIP(local_ip_); int result = server_->Bind(this, &local_addr_); if (result < 0) { local_addr_.SetIP(0); error_ = EADDRINUSE; return result; } } // Send the data in a message to the appropriate socket. return server_->Send(this, pv, cb, local_addr_, addr); } int Recv(void *pv, size_t cb) { SocketAddress addr; return RecvFrom(pv, cb, &addr); } int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) { // If we don't have a packet, then either error or wait for one to arrive. if (packets_->size() == 0) { if (async_) { error_ = EAGAIN; return -1; } while (packets_->size() == 0) { Message msg; server_->msg_queue_->Get(&msg); server_->msg_queue_->Dispatch(&msg); } } // Return the packet at the front of the queue. Packet* packet = packets_->front(); *paddr = packet->from(); int size = (int)packet->size(); if (size <= (int)cb) { std::memcpy(pv, packet->data(), size); packets_->erase(packets_->begin()); delete packet; return size; } else { std::memcpy(pv, packet->data(), cb); packet->Consume(cb); return (int)cb; } } int Listen(int backlog) { assert(false); // not yet implemented return 0; } Socket* Accept(SocketAddress *paddr) { assert(false); // not yet implemented return 0; } bool readable() { return readable_; } void set_readable(bool value) { readable_ = value; } bool writable() { return false; } void set_writable(bool value) { // TODO: Send ourselves messages (delayed after the first) to give them a // chance to write. assert(false); } int GetError() const { return error_; } void SetError(int error) { error_ = error; } ConnState GetState() const { return connected_ ? CS_CONNECTED : CS_CLOSED; } int SetOption(Option opt, int value) { return 0; } int EstimateMTU(uint16* mtu) { if (!connected_) return ENOTCONN; else return 65536; } void OnMessage(Message *pmsg) { if (pmsg->message_id == MSG_ID_PACKET) { assert(pmsg->pdata); Packet* packet = static_cast(pmsg->pdata); if (!readable_) return; packets_->push_back(packet); if (async_) { SignalReadEvent(this); // TODO: If the listeners don't want to read this packet now, we will // need to send ourselves delayed messages to try again. assert(packets_->size() == 0); } } else { assert(false); } } private: struct QueueEntry { uint32 size; uint32 done_time; }; typedef std::deque SendQueue; VirtualSocketServer* server_; int type_; bool async_; bool connected_; uint32 local_ip_; bool readable_; SocketAddress local_addr_; SocketAddress remote_addr_; std::vector* packets_; int error_; SendQueue queue_; uint32 queue_size_; CriticalSection queue_crit_; friend class VirtualSocketServer; }; VirtualSocketServer::VirtualSocketServer() : fWait_(false), wait_version_(0), next_ip_(1), next_port_(45000), bandwidth_(0), queue_capacity_(64 * 1024), delay_mean_(0), delay_stddev_(0), delay_dist_(0), drop_prob_(0.0) { msg_queue_ = new MessageQueue(); // uses physical socket server for Wait bindings_ = new AddressMap(); UpdateDelayDistribution(); } VirtualSocketServer::~VirtualSocketServer() { delete bindings_; delete msg_queue_; delete delay_dist_; } uint32 VirtualSocketServer::GetNextIP() { return next_ip_++; } Socket* VirtualSocketServer::CreateSocket(int type) { return CreateSocketInternal(type); } AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) { return CreateSocketInternal(type); } VirtualSocket* VirtualSocketServer::CreateSocketInternal(int type) { uint32 ip = (next_ip_ > 1) ? next_ip_ - 1 : 1; return new VirtualSocket(this, type, true, ip); } bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { ASSERT(process_io); // This can't be easily supported. uint32 msEnd; if (cmsWait != kForever) msEnd = GetMillisecondCount() + cmsWait; uint32 cmsNext = cmsWait; fWait_ = true; wait_version_ += 1; while (fWait_) { Message msg; if (!msg_queue_->Get(&msg, cmsNext)) return true; msg_queue_->Dispatch(&msg); if (cmsWait != kForever) { uint32 msCur = GetMillisecondCount(); if (msCur >= msEnd) return true; cmsNext = msEnd - msCur; } } return true; } const uint32 MSG_WAKE_UP = 1; struct WakeUpMessage : public MessageData { WakeUpMessage(uint32 ver) : wait_version(ver) {} virtual ~WakeUpMessage() {} uint32 wait_version; }; void VirtualSocketServer::WakeUp() { msg_queue_->Post(this, MSG_WAKE_UP, new WakeUpMessage(wait_version_)); } void VirtualSocketServer::OnMessage(Message* pmsg) { assert(pmsg->message_id == MSG_WAKE_UP); assert(pmsg->pdata); WakeUpMessage* wmsg = static_cast(pmsg->pdata); if (wmsg->wait_version == wait_version_) fWait_ = false; delete pmsg->pdata; } int VirtualSocketServer::Bind( const SocketAddress& addr, VirtualSocket* socket) { assert(addr.ip() > 0); // don't support any-address right now assert(addr.port() > 0); assert(socket); if (bindings_->find(addr) == bindings_->end()) { (*bindings_)[addr] = socket; return 0; } else { return -1; } } int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) { assert(addr->ip() > 0); // don't support any-address right now assert(socket); for (int i = 0; i < 65536; i++) { addr->SetPort(next_port_++); if (addr->port() > 0) { AddressMap::iterator iter = bindings_->find(*addr); if (iter == bindings_->end()) { (*bindings_)[*addr] = socket; return 0; } } } errno = EADDRINUSE; // TODO: is there a better error number? return -1; } int VirtualSocketServer::Unbind( const SocketAddress& addr, VirtualSocket* socket) { assert((*bindings_)[addr] == socket); bindings_->erase(bindings_->find(addr)); return 0; } static double Random() { return double(rand()) / RAND_MAX; } int VirtualSocketServer::Send( VirtualSocket* socket, const void *pv, size_t cb, const SocketAddress& local_addr, const SocketAddress& remote_addr) { // See if we want to drop this packet. if (Random() < drop_prob_) { std::cerr << "Dropping packet: bad luck" << std::endl; return 0; } uint32 cur_time = GetMillisecondCount(); uint32 send_delay; // Determine whether we have enough bandwidth to accept this packet. To do // this, we need to update the send queue. Once we know it's current size, // we know whether we can fit this packet. // // NOTE: There are better algorithms for maintaining such a queue (such as // "Derivative Random Drop"); however, this algorithm is a more accurate // simulation of what a normal network would do. { CritScope cs(&socket->queue_crit_); while ((socket->queue_.size() > 0) && (socket->queue_.front().done_time <= cur_time)) { assert(socket->queue_size_ >= socket->queue_.front().size); socket->queue_size_ -= socket->queue_.front().size; socket->queue_.pop_front(); } VirtualSocket::QueueEntry entry; entry.size = uint32(cb) + HEADER_SIZE; if (socket->queue_size_ + entry.size > queue_capacity_) { std::cerr << "Dropping packet: queue capacity exceeded" << std::endl; return 0; // not an error } socket->queue_size_ += entry.size; send_delay = SendDelay(socket->queue_size_); entry.done_time = cur_time + send_delay; socket->queue_.push_back(entry); } // Find the delay for crossing the many virtual hops of the network. uint32 transit_delay = GetRandomTransitDelay(); // Post the packet as a message to be delivered (on our own thread) AddressMap::iterator iter = bindings_->find(remote_addr); if (iter != bindings_->end()) { Packet* p = new Packet(static_cast(pv), cb, local_addr); uint32 delay = send_delay + transit_delay; msg_queue_->PostDelayed(delay, iter->second, MSG_ID_PACKET, p); } else { std::cerr << "No one listening at " << remote_addr.ToString() << std::endl; } return (int)cb; } uint32 VirtualSocketServer::SendDelay(uint32 size) { if (bandwidth_ == 0) return 0; else return 1000 * size / bandwidth_; } void PrintFunction(std::vector >* f) { for (uint32 i = 0; i < f->size(); i++) std::cout << (*f)[i].first << '\t' << (*f)[i].second << std::endl; } void VirtualSocketServer::UpdateDelayDistribution() { Function* dist = GetDelayDistribution(); dist = Resample(Invert(Accumulate(dist)), 0, 1); // We take a lock just to make sure we don't leak memory. { CritScope cs(&delay_crit_); delete delay_dist_; delay_dist_ = dist; } } const int NUM_SAMPLES = 100; // 1000; static double PI = 4 * std::atan(1.0); static double Normal(double x, double mean, double stddev) { double a = (x - mean) * (x - mean) / (2 * stddev * stddev); return std::exp(-a) / (stddev * sqrt(2 * PI)); } #if 0 // static unused gives a warning static double Pareto(double x, double min, double k) { if (x < min) return 0; else return k * std::pow(min, k) / std::pow(x, k+1); } #endif VirtualSocketServer::Function* VirtualSocketServer::GetDelayDistribution() { Function* f = new Function(); if (delay_stddev_ == 0) { f->push_back(Point(delay_mean_, 1.0)); } else { double start = 0; if (delay_mean_ >= 4 * double(delay_stddev_)) start = delay_mean_ - 4 * double(delay_stddev_); double end = delay_mean_ + 4 * double(delay_stddev_); double delay_min = 0; if (delay_mean_ >= 1.0 * delay_stddev_) delay_min = delay_mean_ - 1.0 * delay_stddev_; for (int i = 0; i < NUM_SAMPLES; i++) { double x = start + (end - start) * i / (NUM_SAMPLES - 1); double y = Normal(x, delay_mean_, delay_stddev_); f->push_back(Point(x, y)); } } return f; } uint32 VirtualSocketServer::GetRandomTransitDelay() { double delay = (*delay_dist_)[rand() % delay_dist_->size()].second; return uint32(delay); } struct FunctionDomainCmp { bool operator ()(const VirtualSocketServer::Point& p1, const VirtualSocketServer::Point& p2) { return p1.first < p2.first; } bool operator ()(double v1, const VirtualSocketServer::Point& p2) { return v1 < p2.first; } bool operator ()(const VirtualSocketServer::Point& p1, double v2) { return p1.first < v2; } }; VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) { assert(f->size() >= 1); double v = 0; for (Function::size_type i = 0; i < f->size() - 1; ++i) { double dx = (*f)[i].second * ((*f)[i+1].first - (*f)[i].first); v = (*f)[i].second = v + dx; } (*f)[f->size()-1].second = v; return f; } VirtualSocketServer::Function* VirtualSocketServer::Invert(Function* f) { for (Function::size_type i = 0; i < f->size(); ++i) std::swap((*f)[i].first, (*f)[i].second); std::sort(f->begin(), f->end(), FunctionDomainCmp()); return f; } VirtualSocketServer::Function* VirtualSocketServer::Resample( Function* f, double x1, double x2) { Function* g = new Function(); for (int i = 0; i < NUM_SAMPLES; i++) { double x = x1 + (x2 - x1) * i / (NUM_SAMPLES - 1); double y = Evaluate(f, x); g->push_back(Point(x, y)); } delete f; return g; } double VirtualSocketServer::Evaluate(Function* f, double x) { Function::iterator iter = std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp()); if (iter == f->begin()) { return (*f)[0].second; } else if (iter == f->end()) { assert(f->size() >= 1); return (*f)[f->size() - 1].second; } else if (iter->first == x) { return iter->second; } else { double x1 = (iter - 1)->first; double y1 = (iter - 1)->second; double x2 = iter->first; double y2 = iter->second; return y1 + (y2 - y1) * (x - x1) / (x2 - x1); } } } // namespace talk_base