开发者

Implementing an async "read all currently available data from stream" operation

I recently provided an answer to this question: C# - Realtime console output redirection.

As often happens, explaining stuff (here "stuff" was how I tackled a similar problem) leads you to greater understanding and/or, as is the case here, "oops" moments. I realized that my solution, as implemented, has a bug. The bug has little practical importance, but it has an extremely large importance to me as a developer: I can't rest easy knowing that my code has the potential to blow up.

Squashing the bug is the purpose of this question. I apologize for the long intro, so let's get dirty.

I wanted to build a class that allows me to receive input from a console's standard output Stream. Console output streams are of type FileStream; the implementation can cast to that, if needed. There is also an associated StreamReader already present to leverage.

There is only one thing I need to implement in this class to achieve my desired functionality: an async "read all the data available this moment" operation. Reading to the end of the stream is not viable because the stream will not end unless the process closes the console output handle, and it will not do that because it is interactive and expecting input before continuing.

I will be using that hypothetical async operation to implement event-based notification, which will be more convenient for my callers.

The public interface of the class is this:

public class ConsoleAutomator {
    public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;

    public void StartSendingEvents();
    public void StopSendingEvents();
}

StartSendingEvents and StopSendingEvents do what they advertise; for the purposes of this discussion, we can assume that events are always being sent without loss of generality.

The class uses these two fields internally:

    protected readonly StringBuilder inputAccumulator = new StringBuilder();

    protected readonly byte[] buffer = new byte[256];

The functionality of the class is implemented in the methods below. To get the ball rolling:

    public void StartSendingEvents();
    {
        this.stopAutomation = false;
        this.BeginReadAsync();
    }

To read data out of the Stream without blocking, and also without requiring a carriage return char, BeginRead is called:

    protected void BeginReadAsync()
    {
        if (!this.stopAutomation) {
            this.StandardOutput.BaseStream.BeginRead(
                this.buffer, 0, this.buffer.Length, this.ReadHappened, null);
        }
    }

The challenging part:

BeginRead requires using a buffer. This means that when reading from the stream, it is possible that the bytes available to read ("incoming chunk") are larger than the buffer. Remember that the goal here is to read all of the chunk and call event subscribers exactly once for each chunk.

To this end, if the buffer is full after EndRead, we don't send its contents to subscribers immediately but instead append them to a StringBuilder. The contents of the StringBuilder are only sent back whenever there is no more to read from the stream.

    private void ReadHappened(IAsyncResult asyncResult)
    {
        var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
        if (bytesRead == 0) {
            this.OnAutomationStopped();
            return;
        }

        var input = this.StandardOutput.CurrentEncoding.GetString(
            this.buffer, 0, bytesRead);
        this.inputAccumulator.Append(input);

        if (bytesRead < this.buffer.Length) {
            this.OnInputRead(); // only send back if we 're sure we got it all
        }

        this.BeginReadAsync(); // continue "looping" with BeginRead
    }

After any read which is not enough to fill the buffer (in which case we know that there was no more data to be read during the last read operation), all accumulated data is sent to the subscribers:

    private void OnInputRead()
    {
        var handler = this.StandardOutputRead;
        if (handler == null) {
            return;
        }

        handler(this, 
                new ConsoleOutputReadEventArgs(this.inputAccumulator.ToString()));
        this.inputAccumulator.Clear();
    }

(I know that as long as there are no subscribers the data gets accumulated forever. This is a deliberate decision).

The good

This scheme works almost perfectly:

  • Async functionality without spawning any threads
  • Very convenient to the calling code (just subscribe to an event)
  • Never more than one event for each time data is available to be read
  • Is almost agnostic to the buffer size

The bad

That last almost is a very big one. Consider what happens when there is an incoming chunk with length exactly equal to the size of the buffer. The chunk will be read and buffered, but the event will not be triggered. This will be followed up by a BeginRead that expects to find more data belonging to the current chunk in order to send it back all in one piece, but... there will be no more data in the stream.

In fact, as long as data is put into the stream in chunks with length exactly equal to the buffer size, the data will be buffered and the event will never be triggered.

This scenario may be highly unlikely to occur in practice, especially since we can pick any number for the buffer size, but the problem开发者_JAVA技巧 is there.

Solution?

Unfortunately, after checking the available methods on FileStream and StreamReader, I can't find anything which lets me peek into the stream while also allowing async methods to be used on it.

One "solution" would be to have a thread wait on a ManualResetEvent after the "buffer filled" condition is detected. If the event is not signaled (by the async callback) in a small amount of time, then more data from the stream will not be forthcoming and the data accumulated so far should be sent to subscribers. However, this introduces the need for another thread, requires thread synchronization, and is plain inelegant.

Specifying a timeout for BeginRead would also suffice (call back into my code every now and then so I can check if there's data to be sent back; most of the time there will not be anything to do, so I expect the performance hit to be negligible). But it looks like timeouts are not supported in FileStream.

Since I imagine that async calls with timeouts are an option in bare Win32, another approach might be to PInvoke the hell out of the problem. But this is also undesirable as it will introduce complexity and simply be a pain to code.

Is there an elegant way to get around the problem?

Thanks for being patient enough to read all of this.

Update:

I definitely did not communicate the scenario well in my initial writeup. I have since revised the writeup quite a bit, but to be extra sure:

The question is about how to implement an async "read all the data available this moment" operation.

My apologies to the people who took the time to read and answer without me making my intent clear enough.


In theory, I agree with Jason; your implementation has bigger problems than having a logic hole in the case of a chunk of data being evenly divisible by your buffer. The biggest problem I see is that your reader must have enough knowledge about the file type to know how it can separate data into "chunks" that your subscribers know how to deal with.

Streams have no inherent knowledge about what they're receiving or sending; only the mechanism by which they are transporting the data. A NetworkStream may be sending HTML or a ZIP file; a FileStream may be reading a text file or an MP3. It's the reader (XmlReader, TextReader, Image.FromStream(), etc) that has this knowledge. Therefore, your async reader has to know at least something about the data, but it would be useful not to have that knowledge hard-coded.

In order to work with "streaming" data, incremental sends must be individually useful; you must know enough about what you're getting to know that what you've gotten is a "chunk" that is individually processable. My suggestion is to provide that information to your async reader in an encapsulated fashion, either by having your subscribers tell you, or by providing some format-specific "chunkifier" seperate from the listeners (as this reader is listening to console output, and all listeners should treat it the same way, this second plan may be better).

A logical implementation:

public class MyStreamManager {
    public delegate bool ValidChunkTester(StringBuilder builder);

    private readonly List<ValidChunkTester> validators = new List<ValidChunkTester>();
    public event ValidChunkTester IsValidChunk
    { add{validators.Add(value);} remove {validators.Remove(value);}}

    public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;


    public void StartSendingEvents();
    public void StopSendingEvents();
}

...

private void ReadHappened(IAsyncResult asyncResult)
{
    var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
    if (bytesRead == 0) {
        this.OnAutomationStopped();
        return;
    }

    var input = this.StandardOutput.CurrentEncoding.GetString(
        this.buffer, 0, bytesRead);
    this.inputAccumulator.Append(input);

    if (validators.Any() && StandardOutputRead !-= null 
            && validators.Aggregate(true, (valid, validator)=>valid && validator(inputAccumulator))) {
        this.OnInputRead(); // send when all listeners can work with the buffer contents
    }

    this.BeginReadAsync(); // continue "looping" with BeginRead
}

...

This model requires that subscribers not modify the StringBuilder; you can provide something immutable for them to examine if you choose. An example listener might be:

public bool IsACompleteLine(StringBuilder builder)
{
    return builder.Contains(Environment.NewLine);
}

or:

public bool Contains256Bytes(StringBuilder builder)
{
    return builder.Length >= 256;
}

... you get the idea. The event determining the worthiness of the current buffer to be released to listeners is conceptually separate from the listeners themselves, but doesn't have to be concretely so, so it will support either a single output-specific test or multiple listener-based tests.


If you read from a FileStream in the manner you have described, then the entire contents of the underlying file will be read. Thus, you will only have one "chunk" of data, which you will read into a StringBuilder (somewhat inefficiently) in tiny bites. Nothing in your implementation gives any way of breaking the data into smaller "chunks", because the read will continue filling your buffer until the file is exhausted. At this level of abstraction, only the client knows what size these chunks should be, so you will have to hand the data over to them to be decoded into chunks. Which defeats the original purpose of your buffer.

If you have some other form of stream that delivers data in bursts (eg. console output or comms packets), then you will get bursts of data, but you still can't guarantee that a read ending with less than a buffer-ful of data means that you have reached the end of a packet, simply that there is a pause in the transmission. Usually in these cases you need to buffer the data and process it (i.e. have knowledger of the data format) to determine when a complete chunk/packet has been received. In this scenario you will always have an "unfinished chunk" waiting in your buffer until some more data is received to terminate the chunk or start a new chunk, and "push it out" of your buffer. This can be a problem in comms where the next packet may not arrive for a long time.

So ultimately, you will need to prime your reader with knowledge of how the data should be divided into chunks, which means you need the client to do the decoding, which is why the base stream classes don't already deliver data in the manner you are trying to implement.

So, by adding this intermediate class, what will you gain? At best it will add an extra layer of complexity and overhead to your I/O (let's face it, what you're trying to abstract out of your client code is only a few lines of code). At worst, it will be unable to break the data into chunks as you require, so will be of no use at all.

Beware "This scenario may be highly unlikely to occur in practice": When streaming large amounts of data you can be assured that even "highly unlikely" events will occur with considerable regularity - certainly often enouhg that you can't assume they will never happen.

[edit - added]

If you are not seeking to generalise your solution, then you can add logic to the class that handles the problem easily.

Two possible solutions are:

  • If you know the maximum limit of the console lines that will be output to you, you can simply use a large enough buffer that you can guarantee your edge case will never occur. (e.g. CreateProcess commands are limited to 32k, cmd.exe limits commands to 8k. You may find similar limits pply to the data "chunks" you are receiving)

  • If your chunks are always lines (newline terminated blocks of text), then simply check if the last character in your buffer looks like a terminator (0x0a or 0x0d). If not, there is more data to read.


I would be inclined to remove the "double buffering" (the part where you fill the StringBuilder then pass the data when it is full) and return the data received from the Stream's buffer whenever bytes are read. So in ReadHappened, you would have:

if (bytesRead > 0) {
    this.OnInputRead(); // only send back if we 're sure we got it all
}

As others have stated the subscriber will need to know something about the message/chunk of data and how to combine multiple parts into one whole. Therefore you may as well return each part as you receive it. If the subscriber is a "dumb subscriber" which simply acts as a conduit this would work too.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜