Reactive framework eliminate while loop in Observable.Create
I am working with an observable byte stream, coming off of the network, and I would like to take that up one layer of abstraction. The format has two bytes that contain the length of the next message. I'd like开发者_JAVA百科 to make this fit into the reactive framework pretty well. What I have so far feels not quite right, so I am wondering what tricks I may have missed to eliminate the while loop here.
Here's the concept I have in mind:
public static IObservable<Stream> GetToplevelStreams(IObservable<byte> byteStreamArg) {
return Observable.Create((IObserver<Stream>o)=>{
bool done = false;
var byteStream = byteStreamArg.Do(
b => { }, (ex) => { done = true; }, () => { done = true; });
while (!done)
{
var size = byteStream.Take(2).
Aggregate(0, (n, b) => (n << 8) + b).Single();
var buf = byteStream.Skip(2).Take(size);
var stream = new MemoryStream(buf.ToEnumerable().ToArray());
if (!done)
{
o.OnNext(stream);
}
}
return (() => {});
});
}
An IObservable is a bit weird here - remember that you're returning a "Future List of Streams" - I'd actually just return a Stream
, or perhaps an IObservable<byte[]>
, where each array represents a message. Or do even better, and return an IObservable<ParsedMessage>
Also, your While loop makes this non-async and act strangely. How about something more like this:
public static IObservable<System.IO.Stream> GetToplevelStreams(IObservable<byte> byteStream)
{
return Observable.Create((IObserver<System.IO.Stream> o) =>
{
int? size1=null;
int? size=null;
var buf = new MemoryStream();
var subscription = byteStream.Subscribe(v =>
{
if (!size1.HasValue)
{
size1 = ((int)v) << 8;
}
else if (!size.HasValue)
{
size = size1.Value + v;
}
else
{
buf.WriteByte(v);
}
if (size.HasValue && buf.Length == size)
{
buf.Position = 0;
o.OnNext(buf);
buf.SetLength(0);
size1 = null;
size = null;
}
}, (ex)=>o.OnError(ex), ()=>o.OnCompleted());
return () => subscription.Dispose();
});
}
精彩评论