开发者

Using Rx to block (and possibly timeout) on an asynchronous operation

I'm trying to rewrite some code using Reactive Extensions for .NET but I need some guidance on how to achieve my goal.

I have a class that encapsulates some asynchronous behavior in a low level library. Think something that either reads or writes the network. When the class is started it will try to connect to the environment and when succesful it will signal this back by calling from a worker thread.

I want to turn this asynchronous behavior into a synchronous call and I have created a greatly simplified example below on how that can be achieved:

ManualResetEvent readyEvent = new ManualResetEvent(false);

public void Start(TimeSpan timeout) {
  // Simulate a background process
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Wait for startup to complete.
  if (!this.readyEvent.WaitOne(timeout))
    throw new TimeoutException();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay); // Simulate startup delay.
  this.readyEvent.Set();
}

Running AsyncStart on a worker thread is just a way to simulate the asynchronous behavior of the library a开发者_运维知识库nd is not part of my real code where the low level library supplies the thread and calls my code on a callback.

Notice that the Start method will throw a TimeoutException if start hasn't completed within the timeout interval.

I want to rewrite this code to use Rx. Here is my first attempt:

Subject<Unit> readySubject = new Subject<Unit>();

public void Start(TimeSpan timeout) {
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Point A - see below
  this.readySubject.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

This is a decent attempt but unfortunately it contains a race condition. If the startup completes fast (e.g. if delay is 0) and if there is an additonal delay at point A then OnNext will be called on readySubject before First has executed. In essence the IObservable I'm applying Timeout and First never sees that startup has completed and a TimeoutException will be thrown instead.

It seems that Observable.Defer has been created to handle problems like this. Here is slightly more complex attempt to use Rx:

Subject<Unit> readySubject = new Subject<Unit>();

void Start(TimeSpan timeout) {
  var ready = Observable.Defer(() => {
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
    // Point B - see below
    return this.readySubject.AsObservable();
  });
  ready.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

Now the asynchronous operation is not started immediately but only when the IObservable is being used. Unfortunately there is still a race condition but this time at point B. If the asynchronous operation started calls OnNext before the Defer lambda returns it is still lost and a TimeoutException will be thrown by Timeout.

I know I can use operators like Replay to buffer events but my initial example without Rx doesn't use any kind of buffering. Is there a way for me to use Rx to solve my problem without race conditions? In essence starting the asynchronous operation only after the IObservable has been connected to in this case Timeout and First?


Based on Ana Betts's answer here is working solution:

void Start(TimeSpan timeout) {
  var readySubject = new AsyncSubject<Unit>();
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
  // Point C - see below
  readySubject.Timeout(timeout).First();
}

void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
  Thread.Sleep(delay);
  readySubject.OnNext(new Unit());
  readySubject.OnCompleted();
}

The interesting part is when there is a delay at point C that is longer than the time it takes for AsyncStart to complete. AsyncSubject retains the last notification sent and Timeout and First will still perform as expected.


So, one thing to know about Rx I think a lot of people do at first (myself included!): if you're using any traditional threading function like ResetEvents, Thread.Sleeps, or whatever, you're Doing It Wrong (tm) - it's like casting things to Arrays in LINQ because you know that the underlying type happens to be an array.

The key thing to know is that an async func is represented by a function that returns IObservable<TResult> - that's the magic sauce that lets you signal when something has completed. So here's how you'd "Rx-ify" a more traditional async func, like you'd see in a Silverlight web service:

IObservable<byte[]> readFromNetwork()
{
    var ret = new AsyncSubject();
    // Here's a traditional async function that you provide a callback to
    asyncReaderFunc(theFile, buffer => {
        ret.OnNext(buffer);
        ret.OnCompleted();
    });

    return ret;
}

This is a decent attempt but unfortunately it contains a race condition.

This is where AsyncSubject comes in - this makes sure that even if asyncReaderFunc beats the Subscribe to the punch, AsyncSubject will still "replay" what happened.

So, now that we've got our function, we can do lots of interesting things to it:

// Make it into a sync function
byte[] results = readFromNetwork().First();

// Keep reading blocks one at a time until we run out
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => {
    Console.WriteLine("Read {0} bytes in chunk", bytes.Length);
})

// Read the entire stream and get notified when the whole deal is finished
readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .Subscribe(ms => {
        Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length);
    });

// Or just get the entire thing as a MemoryStream and wait for it
var memoryStream = readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .First();


I would further add to Paul's comment of adding WaitHandles means you are doing it wrong, that using Subjects directly usually means you are doing it wrong too. ;-)

Try to consider your Rx code working with sequences or pipelines. Subjects offer read and write capabilities which means you are no longer working with a pipeline or a sequence anymore (unless you have pipleines that go both ways or sequences that can reverse?!?)

So first Paul's code is pretty cool, but let's "Rx the hell out of it".

1st The AsyncStart method change it to this

IObservable<Unit> AsyncStart(TimeSpan delay) 
{
  Observable.Timer(delay).Select(_=>Unit.Default);
}

So easy! Look no subjects and data only flows one way. The important thing here is the signature change. It will push stuff to us. This is now very explicit. Passing in a Subject to me is very ambiguous.

2nd. We now dont need the Subject defined in the start method. We can also leverage the Scheduler features instead of the old-skool ThreadPool.QueueUserWorkItem.

void Start(TimeSpan timeout) 
{
    var isReady = AsyncStart(TimeSpan.FromSeconds(1))
                    .SubscribeOn(Scheduler.ThreadPool)
                    .PublishLast();
    isReady.Connect();
    isReady.Timeout(timeout).First();
}

Now we have a clear pipeline or sequence of events

AsyncStart --> isReady --> Start

Instead of Start-->AsyncStart-->Start

If I knew more of your problem space, I am sure we could come up with an even better way of doing this that did not require the blocking nature of the start method. The more you use Rx the more you will find that your old assumptions on when you need to block, use waithandles, etc can be thrown out the window.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜