开发者

How to implement buffering with timeout in RX

I need to implement an event processing, that is done delayed when there are no new events arriving for a certain period. (I have to queue up a parsing task when the text buffer changed, but I don't want to start the parsing when the user is still typing.)

I'm new in RX, but as far as I see, I would need a combination of BufferWithTime and the Timeout methods. I imagine this to be working like this: it buffers the events until they are received regularly within a specified time period between the subsequent events. If there is a gap in the event flow (longer than the timespan) it should return propagate the events buffered so far.

Having a look at how Buffer and Timeout is imp开发者_JS百科lemented, I could probably implement my BufferWithTimeout method (if everyone have one, please share with me), but I wonder if this can be achieved just by combining the existing methods. Any ideas?


This is quite an old question, but I do believe the following answer is worth mentioning, since all other solutions have forced the user to subscribe manually, track changes, etc.

I offer the following as an "Rx-y" solution.

var buffers = source
    .GroupByUntil(
        // yes. yes. all items belong to the same group.
        x => true,
        g => Observable.Amb<int>(
               // close the group after 5 seconds of inactivity
               g.Throttle(TimeSpan.FromSeconds(5)),
               // close the group after 10 items
               g.Skip(9)
             ))
    // Turn those groups into buffers
    .SelectMany(x => x.ToArray());

Basically, the source is windowed until some observerable defined in terms of the newest window. A new window (grouped observable) is created, and we use that window to determine when the window should close. In this case, I'm closing the window after 5 seconds of inactivity or a maximum length of 10 (9+1).


I think BufferWithTime is what you are after.

There is nothing built in, but something like this should work:

Note: If an error occurs from the source, the buffer is not flushed. This matches the current (or current last time I checked) functionality of BufferWith*

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout)
{
    return source.BufferWithTimeout(timeout, Scheduler.TaskPool);
}

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout, IScheduler scheduler)
{
    return Observable.CreateWithDisposable<TSource[]>(observer =>
    {
        object lockObject = new object();
        List<TSource> buffer = new List<TSource>();

        MutableDisposable timeoutDisposable = new MutableDisposable();

        Action flushBuffer = () =>
        {
            TSource[] values;

            lock(lockObject)
            {
                values = buffer.ToArray();
                buffer.Clear();
            }

            observer.OnNext(values);
        };

        var sourceSubscription = source.Subscribe(
            value =>
            {
                lock(lockObject)
                {
                    buffer.Add(value);
                }

                timeoutDisposable.Disposable = 
                    scheduler.Schedule(flushBuffer, timeout);
            },
            observer.OnError,
            () =>
            {
                flushBuffer();
                observer.OnCompleted();
            });

        return new CompositeDisposable(sourceSubscription, timeoutDisposable);
    });
}


In addition to Richard Szalay's answer I've just been looking into the new Window operator from the latest rx release. It 'kind of' solves you problem in that you can 'buffer with a time out', i.e. get the output within a window of time that lasts until the timeout is reached, but instead of receiving the results as an IEnumerable you actually get them as an IObservable.

Here's a quick example of what I mean:

private void SetupStream()
{
    var inputStream = Observable.FromEvent<MouseButtonEventHandler, MouseButtonEventArgs>(
        h => new MouseButtonEventHandler(h), 
        h => MouseDown += h,
        h => MouseDown -= h);

    var timeout = inputStream.Select(evt => Observable.Timer(TimeSpan.FromSeconds(10), Scheduler.Dispatcher))
        .Switch();

    inputStream.Window(() => timeout)
        .Subscribe(OnWindowOpen);
}


private void OnWindowOpen(IObservable<IEvent<MouseButtonEventArgs>> window)
{
    Trace.WriteLine(string.Format("Window open"));

    var buffer = new List<IEvent<MouseButtonEventArgs>>();

    window.Subscribe(click =>
    {

        Trace.WriteLine(string.Format("Click"));

        buffer.Add(click);

    }, () => ProcessEvents(buffer));
}

private void ProcessEvents(IEnumerable<IEvent<MouseButtonEventArgs>> clicks)
{
    Trace.WriteLine(string.Format("Window closed"));

    //...
}

Every time the window opens, you receive all the events as and when they come in, store them in a buffer and process when the window completes (which actually happens when the next window opens).

Not sure if Richard would change his example to use Window now it's available but thought it might be worth raising as an alternative.


If you just need to run an operation when the user stops typing for a certain amount of time, and don't necessarily need the intermediate events, then Throttle is the operation you're after. Check here for an example of its usage in that scenario.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜