开发者

Using Rx to queue operations I don't want executed until particular time?

Summary: I have a web app that executes workflows on business objects and sometimes needs to deliberately wait several seconds or minutes between steps. I'm looking to (perhaps via Rx.NET), improve the execution of these workflows so I do not exhaust the ThreadPool and make the website unresponsive when the system is under heavy load.

A very simplified version of the workflow is:

  1. Create an object
  2. Load data into it from System A
  3. POST this data to System B

If System A is down, my app waits and retries later. The wait time is modeled after GMail's escalating delays in retry: Wait 1 second, double on each subsequent retry (maxing out at 1 hour). The app saves state to the database obsessively so if the whole app blows up, when it restarts it will resume all workflows where it left off.

Currently (please be gentle) each step in the workflow is executed by calling ThreadPool.QueueUserWorkItem to queue up a method that calls Thread.Sleep if necessary for the retry delay described above, then actually executes the step.

If the system is performing well (no errors), it can easily handle all the traffic we throw at it, and the ThreadPool nicely manages parallel execution of all these workflow instances. But if System B is down for a while, retry count and thus delay grows, and pretty soon the ThreadPool is filled with all the sleeping threads, causing the website to become unresponsive to new requests.

Essentially I want to throw all these pending wor开发者_运维知识库kflows into a queue ordered by (last execution time + desired retry delay). Despite reading a lot about and being excited by Rx, I've never had an opportunity to use it, but it seems like it might be a helpful way to handle this. If Rx can magically manage spitting out these objects when they're ready to fire it seems like it would

  1. Greatly simplify and clarify this logic, and
  2. Prevent the wasteful use of lots of threads that are just sleeping 99% of the time

Any guidance to an Rx newbie would be greatly appreciated, even if it's just to explain why this is in fact not a good use case for Rx.


In this case, I might stick with your current solution, because of this bit:

The app saves state to the database obsessively so if the whole app blows up, when it restarts it will resume all workflows where it left off.

"Resuming" a pipeline (i.e. x.Where().Select().Timeout().Bla()) via deserialization on startup is...tricky.

It's hard to give you a more detailed solution without more info, it might actually work pretty well with Rx if you don't try to model the entire flow, just the transaction bit (i.e. load from A, send to B).

Anyway, the way to solve your thread pool exhaustion is via the System.Threading.Timer class, which tells the thread pool to simply wait until the timeout before queueing a new item.


You will definitely have to adapt:

public IDisposable StartProcess<T>(Action<T> load, Action<T> post) where T : new()
{
    return StartProcess(TimeSpan.FromSeconds(1), new T())
                .Do(load)
                .Subscribe(post);
}

private IObservable<long> StartProcess<T>(TimeSpan span, T obj) where T : new()
{
    Observable
        .Interval(span)
        .OnErrorResumeNext(Observable.Defer(() => StartProcess(IncreaseSpan(span), obj)))
        .Concat(Observable.Defer(() => StartProcess(TimeSpan.FromSeconds(1), new T())));
}

private TimeSpan IncreaseSpan(TimeSpan span)
{
    return TimeSpan.FromSeconds(span.TotalSeconds < 1800? span.TotalSeconds * 2 : 3600);
}

Now I'd much rather have load instantiate and fill the object rather than doing it explicitly since functional programming discourages mutability and you may wish load to actually go to a database and restore the state like you mentioned.

I wasn't sure if you wanted to preserve the state object in case the call to post or load crashed and you will need to adapt because currently, it'll preserve the state whether load or post crashes and will call load again without a fresh state if post crashes which may definitely not be what you want to do.

I didn't test the code, but Rx is suitable for what you want to do.


Check out this post on the Rx forums. Pretty handy operator for the kind of problem you want to solve: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/af43b14e-fb00-42d4-8fb1-5c45862f7796/

Rx is a great way to deal with problems like this (and in particular), because you can have your async functions/observables and apply generic operators like the described Retry operator to them.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜