开发者

.net Rx: in-order batch-processing of messages

I am attempting to implement an asynchronous workflow using Rx and I seem to be doing it completely wrong.

What I would like to do is this:

From an undefined asynchronous stream of un-parsed message strings (i.e. an IObservable<string>)
parse the message strings asynchronously, but preserve their order. (IObservable<Message>)
Batch up parsed Messages in groups of 100 or so (IObservable<IEnumerable<Message>>)
Send each batch, when complete, to the UI thread to be processed. Batches must arrive in the same order they were started.

I can't seem to get the order-preservation, and also Rx doesn't appear to be doing things asynchronously when I expected them to.

I made an attempt at order preservation by using an IEnumerable instead of an IObservable, and then calling the .AsParallel().AsO开发者_JAVA技巧rdered() operators on it. Here is the code. See notes below for the issues I'm having:

    private IObservable<IEnumerable<Message>> messageSource;
    public IObservable<IEnumerable<Message>> MessageSource { get { return messageSource; } }

    /// <summary>
    /// Sub-classes of MessageProviderBase provide this IEnumerable to 
    /// generate unparsed message strings synchronously
    /// </summary>
    protected abstract IEnumerable<string> UnparsedMessages { get; }

    public MessageProviderBase()
    {
        // individual parsed messages as a PLINQ query
        var parsedMessages = from unparsedMessage in UnparsedMessages.AsParallel().AsOrdered()
                                                 select ParseMessage(unparsedMessage);

        // convert the above PLINQ query to an observable, buffering up to 100 messages at a time
        var batchedMessages
            = parsedMessages.ToObservable().BufferWithTimeOrCount(TimeSpan.FromMilliseconds(200), 100);

        // ISSUE #1:
        // batchedMessages seems to call OnNext before all of the messages in its buffer are parsed.
        // If you convert the IObservable<Message> it generates to an enumerable, it blocks
        // when you try to enumerate it. 

        // Convert each batch to an IEnumerable
        // ISSUE #2: Even if the following Rx query were to run asynchronously (it doesn't now, see the above comment),
        // it could still deliver messages out of order. Only, instead of delivering individual
        // messages out of order, the message batches themselves could arrive out of order.
        messageSource = from messageBatch in batchedMessages
                                        select messageBatch.ToEnumerable().ToList();
    }


My answer below is somewhat based on Enigmativity's code, but fixes a number of race conditions related to completion and also adds support for cancellation and custom schedulers (which would make unit testing it significantly easier).

public static IObservable<U> Fork<T, U>(this IObservable<T> source,
    Func<T, U> selector)
{
    return source.Fork<T, U>(selector, Scheduler.TaskPool);
}

public static IObservable<U> Fork<T, U>(this IObservable<T> source,
    Func<T, U> selector, IScheduler scheduler)
{
    return Observable.CreateWithDisposable<U>(observer =>
    {
        var runningTasks = new CompositeDisposable();

        var lockGate = new object();
        var queue = new Queue<ForkTask<U>>();
        var completing = false;
        var subscription = new MutableDisposable();

        Action<Exception> onError = ex =>
        {
            lock(lockGate)
            {
                queue.Clear();
                observer.OnError(ex);
            }
        };

        Action dequeue = () =>
        {
            lock (lockGate)
            {
                var error = false;
                while (queue.Count > 0 && queue.Peek().Completed)
                {
                    var task = queue.Dequeue();
                    observer.OnNext(task.Value);
                }
                if (completing && queue.Count == 0)
                {
                    observer.OnCompleted();
                }
            }
        };

        Action onCompleted = () =>
        {
            lock (lockGate)
            {
                completing = true;
                dequeue();
            }
        };

        Action<T> enqueue = t =>
        {
            var cancellation = new MutableDisposable();
            var task = new ForkTask<U>();

            lock(lockGate)
            {
                runningTasks.Add(cancellation);
                queue.Enqueue(task);
            }

            cancellation.Disposable = scheduler.Schedule(() =>
            {
                try
                {
                    task.Value = selector(t);

                    lock(lockGate)
                    {
                        task.Completed = true;
                        runningTasks.Remove(cancellation);
                        dequeue();
                    }
                }
                catch(Exception ex)
                {
                    onError(ex);
                }
            });
        };

        return new CompositeDisposable(runningTasks, 
            source.AsObservable().Subscribe(
                t => { enqueue(t); },
                x => { onError(x); },
                () => { onCompleted(); }
            ));
    });
}

private class ForkTask<T>
{
    public T Value = default(T);
    public bool Completed = false;
}

Here is a sample that randomizes the task execution time to test it:

AutoResetEvent are = new AutoResetEvent(false);

Random rand = new Random();

Observable.Range(0, 5)
    .Fork(i =>
    {
        int delay = rand.Next(50, 500);
        Thread.Sleep(delay);

        return i + 1;
    })
    .Subscribe(
        i => Console.WriteLine(i),
        () => are.Set()
    );

are.WaitOne();

Console.ReadLine();


Given you have:

IObservable<string> UnparsedMessages = ...;
Func<string, Message> ParseMessage = ...;

Then you could use a SelectAsync extension method like so:

IObservable<Message> ParsedMessages = UnparsedMessages.SelectAsync(ParseMessage);

The SelectAsync extension method processes each unparsed message asynchronously and ensures that the results come back in the order they arrived.

Let me know if this does what you need.

Here's the code:

public static IObservable<U> SelectAsync<T, U>(this IObservable<T> source,
    Func<T, U> selector)
{
    var subject = new Subject<U>();
    var queue = new Queue<System.Threading.Tasks.Task<U>>();
    var completing = false;
    var subscription = (IDisposable)null;

    Action<Exception> onError = ex =>
    {
        queue.Clear();
        subject.OnError(ex);
        subscription.Dispose();
    };

    Action dequeue = () =>
    {
        lock (queue)
        {
            var error = false;
            while (queue.Count > 0 && queue.Peek().IsCompleted)
            {
                var task = queue.Dequeue();
                if (task.Exception != null)
                {
                    error = true;
                    onError(task.Exception);
                    break;
                }
                else
                {
                    subject.OnNext(task.Result);
                }
            }
            if (!error && completing && queue.Count == 0)
            {
                subject.OnCompleted();
                subscription.Dispose();
            }
        }
    };

    Action<T> enqueue = t =>
    {
        if (!completing)
        {
            var task = new System.Threading.Tasks.Task<U>(() => selector(t));
            queue.Enqueue(task);
            task.ContinueWith(tu => dequeue());
            task.Start();
        }
    };

    subscription = source.Subscribe(
        t => { lock(queue) enqueue(t); },
        x => { lock(queue) onError(x); },
        () => { lock(queue) completing = true; });

    return subject.AsObservable();
}

I ended up needing to revisit this for work and wrote a more robust version of this code (based also on Richard's answer.)

The key advantage to this code is the absence of any explicit queue. I'm purely using task continuations to put the results back in order. Works like a treat!

public static IObservable<U> ForkSelect<T, U>(this IObservable<T> source, Func<T, U> selector)
{
    return source.ForkSelect<T, U>(t => Task<U>.Factory.StartNew(() => selector(t)));
}

public static IObservable<U> ForkSelect<T, U>(this IObservable<T> source, Func<T, Task<U>> selector)
{
    if (source == null) throw new ArgumentNullException("source");
    if (selector == null) throw new ArgumentNullException("selector");
    return Observable.CreateWithDisposable<U>(observer =>
    {
        var gate = new object();
        var onNextTask = Task.Factory.StartNew(() => { });
        var sourceCompleted = false;
        var taskErrored = false;

        Action<Exception> onError = ex =>
        {
            sourceCompleted = true;
            onNextTask = onNextTask.ContinueWith(t => observer.OnError(ex));
        };

        Action onCompleted = () =>
        {
            sourceCompleted = true;
            onNextTask = onNextTask.ContinueWith(t => observer.OnCompleted());
        };

        Action<T> onNext = t =>
        {
            var task = selector(t);
            onNextTask = Task.Factory.ContinueWhenAll(new[] { onNextTask, task }, ts =>
            {
                if (!taskErrored)
                {
                    if (task.IsFaulted)
                    {
                        taskErrored = true;
                        observer.OnError(task.Exception);
                    }
                    else
                    {
                        observer.OnNext(task.Result);
                    }
                }
            });
        };

        var subscription = source
            .AsObservable()
            .Subscribe(
                t => { if (!sourceCompleted) lock (gate) onNext(t); },
                ex => { if (!sourceCompleted) lock (gate) onError(ex); },
                () => { if (!sourceCompleted) lock (gate) onCompleted(); });

        var @return = new CompositeDisposable(subscription);

        return @return;
    });
}

And the SelectMany overloads to allow LINQ to be used are:

public static IObservable<U> SelectMany<T, U>(this IObservable<T> source, Func<T, Task<U>> selector)
{
    return source.ForkSelect<T, U>(selector);
}

public static IObservable<V> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Task<U>> taskSelector, Func<T, U, V> resultSelector)
{
    if (source == null) throw new ArgumentNullException("source");
    if (taskSelector == null) throw new ArgumentNullException("taskSelector");
    if (resultSelector == null) throw new ArgumentNullException("resultSelector");
    return source.Zip(source.ForkSelect<T, U>(taskSelector), (t, u) => resultSelector(t, u));
}

So these methods can now be used like this:

var observableOfU = observableOfT.ForkSelect(funcOfT2U);

Or:

var observableOfU = observableOfT.ForkSelect(funcOfT2TaskOfU);

Or:

var observableOfU =
    from t in observableOfT
    from u in funcOfT2TaskOfU(t)
    select u;

Enjoy!

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜