开发者

Reactive framework eliminate while loop in Observable.Create

I am working with an observable byte stream, coming off of the network, and I would like to take that up one layer of abstraction. The format has two bytes that contain the length of the next message. I'd like开发者_JAVA百科 to make this fit into the reactive framework pretty well. What I have so far feels not quite right, so I am wondering what tricks I may have missed to eliminate the while loop here.

Here's the concept I have in mind:

public static IObservable<Stream> GetToplevelStreams(IObservable<byte> byteStreamArg) {
    return Observable.Create((IObserver<Stream>o)=>{
        bool done = false;
        var byteStream = byteStreamArg.Do(
            b => { }, (ex) => { done = true; }, () => { done = true; });
        while (!done)
        {
            var size = byteStream.Take(2).
                           Aggregate(0, (n, b) => (n << 8) + b).Single();
            var buf = byteStream.Skip(2).Take(size);
            var stream = new MemoryStream(buf.ToEnumerable().ToArray());
            if (!done)
            {
                o.OnNext(stream);
            }
        }
        return (() => {});
    });
}


An IObservable is a bit weird here - remember that you're returning a "Future List of Streams" - I'd actually just return a Stream, or perhaps an IObservable<byte[]>, where each array represents a message. Or do even better, and return an IObservable<ParsedMessage>

Also, your While loop makes this non-async and act strangely. How about something more like this:

public static IObservable<System.IO.Stream> GetToplevelStreams(IObservable<byte> byteStream)
{
    return Observable.Create((IObserver<System.IO.Stream> o) =>
    {
        int? size1=null;
        int? size=null;
        var buf = new MemoryStream();
        var subscription = byteStream.Subscribe(v =>
        {
            if (!size1.HasValue)
            {
                size1 = ((int)v) << 8;
            }
            else if (!size.HasValue)
            {
                size = size1.Value + v;
            }
            else
            {
                buf.WriteByte(v);
            }
            if (size.HasValue && buf.Length == size)
            {
                buf.Position = 0;
                o.OnNext(buf);
                buf.SetLength(0);
                size1 = null;
                size = null;
            }

        }, (ex)=>o.OnError(ex), ()=>o.OnCompleted());
        return () => subscription.Dispose();
    });
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜