/* * libjingle * Copyright 2004--2006, Google Inc. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "talk/base/common.h" #include "talk/base/streamutils.h" /////////////////////////////////////////////////////////////////////////////// // TODO: Extend so that one side can close, and other side can send // buffered data. StreamRelay::StreamRelay(talk_base::StreamInterface* s1, talk_base::StreamInterface* s2, size_t buffer_size) : buffer_size_(buffer_size) { dir_[0].stream = s1; dir_[1].stream = s2; ASSERT(s1->GetState() != talk_base::SS_CLOSED); ASSERT(s2->GetState() != talk_base::SS_CLOSED); for (size_t i=0; i<2; ++i) { dir_[i].stream->SignalEvent.connect(this, &StreamRelay::OnEvent); dir_[i].buffer = new char[buffer_size_]; dir_[i].data_len = 0; } } StreamRelay::~StreamRelay() { for (size_t i=0; i<2; ++i) { delete dir_[i].stream; delete [] dir_[i].buffer; } } void StreamRelay::Circulate() { int error = 0; if (!Flow(0, &error) || !Flow(1, &error)) { Close(); SignalClosed(this, error); } } void StreamRelay::Close() { for (size_t i=0; i<2; ++i) { dir_[i].stream->SignalEvent.disconnect(this); dir_[i].stream->Close(); } } bool StreamRelay::Flow(int read_index, int* error) { Direction& reader = dir_[read_index]; Direction& writer = dir_[Complement(read_index)]; bool progress; do { progress = false; while (reader.stream->GetState() == talk_base::SS_OPEN) { size_t available = buffer_size_ - reader.data_len; if (available == 0) break; *error = 0; size_t read = 0; talk_base::StreamResult result = reader.stream->Read(reader.buffer + reader.data_len, available, &read, error); if ((result == talk_base::SR_BLOCK) || (result == talk_base::SR_EOS)) break; if (result == talk_base::SR_ERROR) return false; progress = true; ASSERT((read > 0) && (read <= available)); reader.data_len += read; } size_t total_written = 0; while (writer.stream->GetState() == talk_base::SS_OPEN) { size_t available = reader.data_len - total_written; if (available == 0) break; *error = 0; size_t written = 0; talk_base::StreamResult result = writer.stream->Write(reader.buffer + total_written, available, &written, error); if ((result == talk_base::SR_BLOCK) || (result == talk_base::SR_EOS)) break; if (result == talk_base::SR_ERROR) return false; progress = true; ASSERT((written > 0) && (written <= available)); total_written += written; } reader.data_len -= total_written; if (reader.data_len > 0) { memmove(reader.buffer, reader.buffer + total_written, reader.data_len); } } while (progress); return true; } void StreamRelay::OnEvent(talk_base::StreamInterface* stream, int events, int error) { int index = Index(stream); // Note: In the following cases, we are treating the open event as both // readable and writeable, for robustness. It won't hurt if we are wrong. if ((events & talk_base::SE_OPEN | talk_base::SE_READ) && !Flow(index, &error)) { events = talk_base::SE_CLOSE; } if ((events & talk_base::SE_OPEN | talk_base::SE_WRITE) && !Flow(Complement(index), &error)) { events = talk_base::SE_CLOSE; } if (events & talk_base::SE_CLOSE) { Close(); SignalClosed(this, error); } } /////////////////////////////////////////////////////////////////////////////// // StreamCounter - counts the number of bytes which are transferred in either // direction. /////////////////////////////////////////////////////////////////////////////// StreamCounter::StreamCounter(talk_base::StreamInterface* stream) : StreamAdapterInterface(stream), count_(0) { } talk_base::StreamResult StreamCounter::Read(void* buffer, size_t buffer_len, size_t* read, int* error) { size_t tmp; if (!read) read = &tmp; talk_base::StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read, error); if (result == talk_base::SR_SUCCESS) count_ += *read; SignalUpdateByteCount(count_); return result; } talk_base::StreamResult StreamCounter::Write( const void* data, size_t data_len, size_t* written, int* error) { size_t tmp; if (!written) written = &tmp; talk_base::StreamResult result = StreamAdapterInterface::Write(data, data_len, written, error); if (result == talk_base::SR_SUCCESS) count_ += *written; SignalUpdateByteCount(count_); return result; }