开发者

C#: Implementing NetworkStream.Peek?

Currently, there isn't a NetworkStream.Peek m开发者_如何学编程ethod in C#. What is the best way of implementing such a method which functions just like NetworkStream.ReadByte except that the returned byte is not actually removed from the Stream?


I ran into the same 'peek for magic number and then decide which stream processor to send the stream to' requirement and unfortunately can't weasel my way out of that problem - as suggested in comments to Aaronaught's answer - by passing the already consumed bytes into the stream processing methods in separate parameters, as those methods are a given and they expect System.IO.Stream and nothing else.

I solved this by creating a more or less universal PeekableStream class that wraps a Stream. It works for NetworkStreams, but also for any other Stream, provided you Stream.CanRead it.


Edit

Alternatively, you could use the brand new ReadSeekableStream and do

var readSeekableStream = new ReadSeekableStream(networkStream, /* >= */ count);
...
readSeekableStream.Read(..., count);
readSeekableStream.Seek(-count, SeekOrigin.Current);

In any event, here comes PeekableStream:

/// <summary>
/// PeekableStream wraps a Stream and can be used to peek ahead in the underlying stream,
/// without consuming the bytes. In other words, doing Peek() will allow you to look ahead in the stream,
/// but it won't affect the result of subsequent Read() calls.
/// 
/// This is sometimes necessary, e.g. for peeking at the magic number of a stream of bytes and decide which
/// stream processor to hand over the stream.
/// </summary>
public class PeekableStream : Stream
{
    private readonly Stream underlyingStream;
    private readonly byte[] lookAheadBuffer;

    private int lookAheadIndex;

    public PeekableStream(Stream underlyingStream, int maxPeekBytes)
    {
        this.underlyingStream = underlyingStream;
        lookAheadBuffer = new byte[maxPeekBytes];
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing)
            underlyingStream.Dispose();

        base.Dispose(disposing);
    }

    /// <summary>
    /// Peeks at a maximum of count bytes, or less if the stream ends before that number of bytes can be read.
    /// 
    /// Calls to this method do not influence subsequent calls to Read() and Peek().
    /// 
    /// Please note that this method will always peek count bytes unless the end of the stream is reached before that - in contrast to the Read()
    /// method, which might read less than count bytes, even though the end of the stream has not been reached.
    /// </summary>
    /// <param name="buffer">An array of bytes. When this method returns, the buffer contains the specified byte array with the values between offset and
    /// (offset + number-of-peeked-bytes - 1) replaced by the bytes peeked from the current source.</param>
    /// <param name="offset">The zero-based byte offset in buffer at which to begin storing the data peeked from the current stream.</param>
    /// <param name="count">The maximum number of bytes to be peeked from the current stream.</param>
    /// <returns>The total number of bytes peeked into the buffer. If it is less than the number of bytes requested then the end of the stream has been reached.</returns>
    public virtual int Peek(byte[] buffer, int offset, int count)
    {
        if (count > lookAheadBuffer.Length)
            throw new ArgumentOutOfRangeException("count", "must be smaller than peekable size, which is " + lookAheadBuffer.Length);

        while (lookAheadIndex < count)
        {
            int bytesRead = underlyingStream.Read(lookAheadBuffer, lookAheadIndex, count - lookAheadIndex);

            if (bytesRead == 0) // end of stream reached
                break;

            lookAheadIndex += bytesRead;
        }

        int peeked = Math.Min(count, lookAheadIndex);
        Array.Copy(lookAheadBuffer, 0, buffer, offset, peeked);
        return peeked;
    }

    public override bool CanRead { get { return true; } }

    public override long Position
    {
        get
        {
            return underlyingStream.Position - lookAheadIndex;
        }
        set
        {
            underlyingStream.Position = value;
            lookAheadIndex = 0; // this needs to be done AFTER the call to underlyingStream.Position, as that might throw NotSupportedException, 
                                // in which case we don't want to change the lookAhead status
        }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        int bytesTakenFromLookAheadBuffer = 0;
        if (count > 0 && lookAheadIndex > 0)
        {
            bytesTakenFromLookAheadBuffer = Math.Min(count, lookAheadIndex);
            Array.Copy(lookAheadBuffer, 0, buffer, offset, bytesTakenFromLookAheadBuffer);
            count -= bytesTakenFromLookAheadBuffer;
            offset += bytesTakenFromLookAheadBuffer;
            lookAheadIndex -= bytesTakenFromLookAheadBuffer;
            if (lookAheadIndex > 0) // move remaining bytes in lookAheadBuffer to front
                // copying into same array should be fine, according to http://msdn.microsoft.com/en-us/library/z50k9bft(v=VS.90).aspx :
                // "If sourceArray and destinationArray overlap, this method behaves as if the original values of sourceArray were preserved
                // in a temporary location before destinationArray is overwritten."
                Array.Copy(lookAheadBuffer, lookAheadBuffer.Length - bytesTakenFromLookAheadBuffer + 1, lookAheadBuffer, 0, lookAheadIndex);
        }

        return count > 0
            ? bytesTakenFromLookAheadBuffer + underlyingStream.Read(buffer, offset, count)
            : bytesTakenFromLookAheadBuffer;
    }

    public override int ReadByte()
    {
        if (lookAheadIndex > 0)
        {
            lookAheadIndex--;
            byte firstByte = lookAheadBuffer[0];
            if (lookAheadIndex > 0) // move remaining bytes in lookAheadBuffer to front
                Array.Copy(lookAheadBuffer, 1, lookAheadBuffer, 0, lookAheadIndex);
            return firstByte;
        }
        else
        {
            return underlyingStream.ReadByte();
        }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        long ret = underlyingStream.Seek(offset, origin);
        lookAheadIndex = 0; // this needs to be done AFTER the call to underlyingStream.Seek(), as that might throw NotSupportedException,
                            // in which case we don't want to change the lookAhead status
        return ret;
    }

    // from here on, only simple delegations to underlyingStream

    public override bool CanSeek { get { return underlyingStream.CanSeek; } }
    public override bool CanWrite { get { return underlyingStream.CanWrite; } }
    public override bool CanTimeout { get { return underlyingStream.CanTimeout; } }
    public override int ReadTimeout { get { return underlyingStream.ReadTimeout; } set { underlyingStream.ReadTimeout = value; } }
    public override int WriteTimeout { get { return underlyingStream.WriteTimeout; } set { underlyingStream.WriteTimeout = value; } }
    public override void Flush() { underlyingStream.Flush(); }
    public override long Length { get { return underlyingStream.Length; } }
    public override void SetLength(long value) { underlyingStream.SetLength(value); }
    public override void Write(byte[] buffer, int offset, int count) { underlyingStream.Write(buffer, offset, count); }
    public override void WriteByte(byte value) { underlyingStream.WriteByte(value); }
}


If you don't need to actually retrieve the byte, you can refer to the DataAvailable property.

Otherwise, you can wrap it with a StreamReader and invoke its Peek method.

Note that neither of these are particularly reliable for reading from a network stream, due to latency issues. The data might become available (present in the read buffer) the very instant after you peek.

I'm not sure what it is that you intend to do with this, but the Read method on NetworkStream is a blocking call, so you don't really need to check for status, even if you are receiving in chunks. If you are trying to keep the application responsive while reading from the stream, you should use a thread or asynchronous call to receive the data instead.

Edit: According to this post, StreamReader.Peek is buggy on a NetworkStream, or at least has undocumented behaviour, so be careful if you choose to go that route.


Updated - response to comments

The notion of a "peek" on the actual stream itself is actually impossible; it's just a stream, and once the byte is received then it is no longer on the stream. Some streams support seeking so you could technically re-read that byte, but NetworkStream isn't one of them.

Peeking only applies when are reading the stream into a buffer; once the data is in a buffer then peeking is easy because you just check whatever's at the current position in the buffer. That's why a StreamReader is able to do this; no Stream class will generally have its own Peek method.

Now, for this problem specifically, I question whether or not this is really the right answer. I understand the idea of dynamically selecting a method for processing the stream, but do you really need to do this on the raw stream? Can you not read the stream into a byte array first, or even copy it into a MemoryStream, and process it from that point on?

The main issue I see is that if something bad happens when you're reading from a network stream, the data is gone. But if you read it into a temporary location first, you can debug this. You can find out what the data was and why the object that was trying to process the data failed halfway through.

In general, the very first thing you want to do with a NetworkStream is read it into a local buffer. The only reason I can think of not to do this is if you're reading an enormous amount of data - and even then, I might consider using the file system as an intermediate buffer if it won't fit in memory.

I don't know your exact requirements, but from what I've learned so far, my advice would be: Don't try to process your data directly from the NetworkStream unless there is a compelling reason to do so. Consider reading the data into memory or onto disk first, then processing the copy.


If you have access to the Socket object, you could try the Receive method, passing SocketFlags.Peek. This is analogous to the MSG_PEEK flag that can be passed to the recv call in BSD Sockets or Winsock.


Here is a very simple PeekStream implementation that allows you to peek a certain number of bytes at the start of the stream only (as opposed to being able to peek at any time). The peeked bytes are returned as a Stream themselves, to minimize changes to existing code.

Here's how you use it:

Stream nonSeekableStream = ...;
PeekStream peekStream = new PeekStream(nonSeekableStream, 30); // Peek max 30 bytes
Stream initialBytesStream = peekStream.GetInitialBytesStream();
ParseHeaders(initialBytesStream);  // Work on initial bytes of nonSeekableStream
peekStream.Read(...) // Read normally, the read will start from the beginning

GetInitialBytesStream() returns a seekable stream that contains up to peekSize initial bytes of the underlying stream (less if the stream is shorter than peekSize).

Because of its simplicity, reading PeekStream should only be marginally slower (if at all) than reading underlying stream directly.

public class PeekStream : Stream
{
    private Stream m_stream;
    private byte[] m_buffer;
    private int m_start;
    private int m_end;

    public PeekStream(Stream stream, int peekSize)
    {
        if (stream == null)
        {
            throw new ArgumentNullException("stream");
        }
        if (!stream.CanRead)
        {
            throw new ArgumentException("Stream is not readable.");
        }
        if (peekSize < 0)
        {
            throw new ArgumentOutOfRangeException("peekSize");
        }
        m_stream = stream;
        m_buffer = new byte[peekSize];
        m_end = stream.Read(m_buffer, 0, peekSize);
    }

    public override bool CanRead
    {
        get
        {
            return true;
        }
    }

    public override bool CanWrite
    {
        get
        {
            return false;
        }
    }

    public override bool CanSeek
    {
        get
        {
            return false;
        }
    }

    public override long Length
    {
        get
        {
            throw new NotSupportedException();
        }
    }

    public override long Position
    {
        get
        {
            throw new NotSupportedException();
        }
        set
        {
            throw new NotSupportedException();
        }
    }

    public MemoryStream GetInitialBytesStream()
    {
        return new MemoryStream(m_buffer, 0, m_end, false);
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException();
    }

    public override void SetLength(long value)
    {
        throw new NotSupportedException();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        // Validate arguments
        if (buffer == null)
        {
            throw new ArgumentNullException("buffer");
        }
        if (offset < 0)
        {
            throw new ArgumentOutOfRangeException("offset");
        }
        if (offset + count > buffer.Length)
        {
            throw new ArgumentOutOfRangeException("count");
        }

        int totalRead = 0;

        // Read from buffer
        if (m_start < m_end)
        {
            int toRead = Math.Min(m_end - m_start, count);
            Array.Copy(m_buffer, m_start, buffer, offset, toRead);
            m_start += toRead;
            offset += toRead;
            count -= toRead;
            totalRead += toRead;
        }

        // Read from stream
        if (count > 0)
        {
            totalRead += m_stream.Read(buffer, offset, count);
        }

        // Return total bytes read
        return totalRead;
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotImplementedException();
    }

    public override int ReadByte()
    {
        if (m_start < m_end)
        {
            return m_buffer[m_start++];
        }
        else
        {
            return m_stream.ReadByte();
        }
    }

    public override void Flush()
    {
        m_stream.Flush();
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            m_stream.Dispose();
        }
        base.Dispose(disposing);
    }
}

Disclaimer: PeekStream above is taken from a working program, but it's not comprehensively tested, so may contain bugs. It works for me, but you might uncover some corner cases where is fails.


FWIW, here is a peekable stream over a non-seekable one, optimized for just one byte ahead:

public class OneBytePeekableStream : Stream
{
    private readonly bool _disposeStreamOnDispose;
    private readonly Stream _stream;
    private int _buffer; // byte or -1
    private int _bufferLength; // 0 or 1

    public OneBytePeekableStream(Stream stream, bool disposeStreamOnDispose)
    {
        if (stream == null)
            throw new ArgumentNullException(nameof(stream));
            
        _stream = stream;
        _disposeStreamOnDispose = disposeStreamOnDispose;
    }

    public override long Length => _stream.Length;
    public override bool CanRead => _stream.CanRead;
    public override bool CanSeek => _stream.CanSeek;
    public override bool CanWrite => _stream.CanWrite;
    public override bool CanTimeout => _stream.CanTimeout;
    public override int ReadTimeout { get => _stream.ReadTimeout; set => _stream.ReadTimeout = value; }
    public override int WriteTimeout { get => _stream.WriteTimeout; set => _stream.WriteTimeout = value; }
    public override long Position { get => _stream.Position - _bufferLength; set { _stream.Position = value; _bufferLength = 0; } }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (buffer == null)
            throw new ArgumentNullException(nameof(buffer));

        if (offset < 0)
            throw new ArgumentOutOfRangeException(nameof(offset));

        if (count < 0)
            throw new ArgumentOutOfRangeException(nameof(count));

        if (buffer.Length - offset < count)
            throw new ArgumentOutOfRangeException(nameof(count));

        if (count == 0)
            return 0;

        if (_bufferLength == 0)
            return _stream.Read(buffer, offset, count);

        if (_buffer < 0)
            return 0;

        _bufferLength = 0;
        buffer[offset] = (byte)_buffer;
        if (count == 1)
            return count;

        var read = _stream.Read(buffer, offset + 1, count - 1);
        return read + 1;
    }

    // this is the sole reason of this class
    // returns -1 is stream is EOF
    public virtual int PeekByte()
    {
        if (_bufferLength > 0)
            return _buffer;

        _buffer = _stream.ReadByte();
        _bufferLength = 1;
        return _buffer;
    }

    public override int ReadByte()
    {
        if (_bufferLength == 0)
            return _stream.ReadByte();

        if (_buffer < 0)
            return -1;

        _bufferLength = 0;
        return _buffer;
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        var ret = _stream.Seek(offset, origin);
        _bufferLength = 0;
        return ret;
    }

    public override void Flush() => _stream.Flush();
    public override void SetLength(long value) => _stream.SetLength(value);
    public override void WriteByte(byte value) => _stream.WriteByte(value);
    public override void Write(byte[] buffer, int offset, int count) => _stream.Write(buffer, offset, count);

    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            if (_disposeStreamOnDispose)
            {
                _stream.Dispose();
            }
        }

        base.Dispose(disposing);
    }
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜