diff options
Diffstat (limited to 'Plugins/jingle/libjingle/talk/base/streamutils.cc')
-rw-r--r-- | Plugins/jingle/libjingle/talk/base/streamutils.cc | 194 |
1 files changed, 194 insertions, 0 deletions
diff --git a/Plugins/jingle/libjingle/talk/base/streamutils.cc b/Plugins/jingle/libjingle/talk/base/streamutils.cc new file mode 100644 index 0000000..52e6da7 --- /dev/null +++ b/Plugins/jingle/libjingle/talk/base/streamutils.cc @@ -0,0 +1,194 @@ +/* + * libjingle + * Copyright 2004--2006, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "talk/base/common.h" +#include "talk/base/streamutils.h" + +/////////////////////////////////////////////////////////////////////////////// +// TODO: Extend so that one side can close, and other side can send +// buffered data. + +StreamRelay::StreamRelay(talk_base::StreamInterface* s1, + talk_base::StreamInterface* s2, + size_t buffer_size) : buffer_size_(buffer_size) { + dir_[0].stream = s1; + dir_[1].stream = s2; + + ASSERT(s1->GetState() != talk_base::SS_CLOSED); + ASSERT(s2->GetState() != talk_base::SS_CLOSED); + + for (size_t i=0; i<2; ++i) { + dir_[i].stream->SignalEvent.connect(this, &StreamRelay::OnEvent); + dir_[i].buffer = new char[buffer_size_]; + dir_[i].data_len = 0; + } +} + +StreamRelay::~StreamRelay() { + for (size_t i=0; i<2; ++i) { + delete dir_[i].stream; + delete [] dir_[i].buffer; + } +} + +void +StreamRelay::Circulate() { + int error = 0; + if (!Flow(0, &error) || !Flow(1, &error)) { + Close(); + SignalClosed(this, error); + } +} + +void +StreamRelay::Close() { + for (size_t i=0; i<2; ++i) { + dir_[i].stream->SignalEvent.disconnect(this); + dir_[i].stream->Close(); + } +} + +bool +StreamRelay::Flow(int read_index, int* error) { + Direction& reader = dir_[read_index]; + Direction& writer = dir_[Complement(read_index)]; + + bool progress; + do { + progress = false; + + while (reader.stream->GetState() == talk_base::SS_OPEN) { + size_t available = buffer_size_ - reader.data_len; + if (available == 0) + break; + + *error = 0; + size_t read = 0; + talk_base::StreamResult result + = reader.stream->Read(reader.buffer + reader.data_len, available, + &read, error); + if ((result == talk_base::SR_BLOCK) || (result == talk_base::SR_EOS)) + break; + + if (result == talk_base::SR_ERROR) + return false; + + progress = true; + ASSERT((read > 0) && (read <= available)); + reader.data_len += read; + } + + size_t total_written = 0; + while (writer.stream->GetState() == talk_base::SS_OPEN) { + size_t available = reader.data_len - total_written; + if (available == 0) + break; + + *error = 0; + size_t written = 0; + talk_base::StreamResult result + = writer.stream->Write(reader.buffer + total_written, + available, &written, error); + if ((result == talk_base::SR_BLOCK) || (result == talk_base::SR_EOS)) + break; + + if (result == talk_base::SR_ERROR) + return false; + + progress = true; + ASSERT((written > 0) && (written <= available)); + total_written += written; + } + + reader.data_len -= total_written; + if (reader.data_len > 0) { + memmove(reader.buffer, reader.buffer + total_written, reader.data_len); + } + } while (progress); + + return true; +} + +void StreamRelay::OnEvent(talk_base::StreamInterface* stream, int events, + int error) { + int index = Index(stream); + + // Note: In the following cases, we are treating the open event as both + // readable and writeable, for robustness. It won't hurt if we are wrong. + + if ((events & talk_base::SE_OPEN | talk_base::SE_READ) + && !Flow(index, &error)) { + events = talk_base::SE_CLOSE; + } + + if ((events & talk_base::SE_OPEN | talk_base::SE_WRITE) + && !Flow(Complement(index), &error)) { + events = talk_base::SE_CLOSE; + } + + if (events & talk_base::SE_CLOSE) { + Close(); + SignalClosed(this, error); + } +} + +/////////////////////////////////////////////////////////////////////////////// +// StreamCounter - counts the number of bytes which are transferred in either +// direction. +/////////////////////////////////////////////////////////////////////////////// + +StreamCounter::StreamCounter(talk_base::StreamInterface* stream) + : StreamAdapterInterface(stream), count_(0) { +} + +talk_base::StreamResult StreamCounter::Read(void* buffer, size_t buffer_len, + size_t* read, int* error) { + size_t tmp; + if (!read) + read = &tmp; + talk_base::StreamResult result + = StreamAdapterInterface::Read(buffer, buffer_len, + read, error); + if (result == talk_base::SR_SUCCESS) + count_ += *read; + SignalUpdateByteCount(count_); + return result; +} + +talk_base::StreamResult StreamCounter::Write( + const void* data, size_t data_len, size_t* written, int* error) { + size_t tmp; + if (!written) + written = &tmp; + talk_base::StreamResult result + = StreamAdapterInterface::Write(data, data_len, written, error); + if (result == talk_base::SR_SUCCESS) + count_ += *written; + SignalUpdateByteCount(count_); + return result; +} |