开发者

Does reactive extensions support rolling buffers?

I'm using reactive extensions to collate data into buffers of 100ms:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100))
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);

This works fine. However开发者_开发问答, I want slightly different behavior than that provided by the Buffer operation. Essentially, I want to reset the timer if another data item is received. Only when no data has been received for the entire 100ms do I want to handle it. This opens up the possibility of never handling the data, so I should also be able to specify a maximum count. I would imagine something along the lines of:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)

I've had a look around and haven't been able to find anything like this in Rx? Can anyone confirm/deny this?


This is possible by combining the built-in Window and Throttle methods of Observable. First, let's solve the simpler problem where we ignore the maximum count condition:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

The powerful Window method did the heavy lifting. Now it's easy enough to see how to add a maximum count:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
{
    var closes = stream.Throttle(delay);
    if (max != null)
    {
        var overflows = stream.Where((x,index) => index+1>=max);
        closes = closes.Merge(overflows);
    }
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

I'll write a post explaining this on my blog. https://gist.github.com/2244036

Documentation for the Window method:

  • http://leecampbell.blogspot.co.uk/2011/03/rx-part-9join-window-buffer-and-group.html
  • http://enumeratethis.com/2011/07/26/financial-charts-reactive-extensions/


I wrote an extension to do most of what you're after - BufferWithInactivity.

Here it is:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source,
    TimeSpan inactivity,
    int maximumBufferSize)
{
    return Observable.Create<IEnumerable<T>>(o =>
    {
        var gate = new object();
        var buffer = new List<T>();
        var mutable = new SerialDisposable();
        var subscription = (IDisposable)null;
        var scheduler = Scheduler.ThreadPool;

        Action dump = () =>
        {
            var bts = buffer.ToArray();
            buffer = new List<T>();
            if (o != null)
            {
                o.OnNext(bts);
            }
        };

        Action dispose = () =>
        {
            if (subscription != null)
            {
                subscription.Dispose();
            }
            mutable.Dispose();
        };

        Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
            onAction =>
            {
                lock (gate)
                {
                    dispose();
                    dump();
                    if (o != null)
                    {
                        onAction(o);
                    }
                }
            };

        Action<Exception> onError = ex =>
            onErrorOrCompleted(x => x.OnError(ex));

        Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());

        Action<T> onNext = t =>
        {
            lock (gate)
            {
                buffer.Add(t);
                if (buffer.Count == maximumBufferSize)
                {
                    dump();
                    mutable.Disposable = Disposable.Empty;
                }
                else
                {
                    mutable.Disposable = scheduler.Schedule(inactivity, () =>
                    {
                        lock (gate)
                        {
                            dump();
                        }
                    });
                }
            }
        };

        subscription =
            source
                .ObserveOn(scheduler)
                .Subscribe(onNext, onError, onCompleted);

        return () =>
        {
            lock (gate)
            {
                o = null;
                dispose();
            }
        };
    });
}


With Rx Extensions 2.0, your can answer both requirements with a new Buffer overload accepting a timeout and a size:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100), 1)
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);

See https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx for the documentation.


As Rohit Sharma mentioned with his comment at Colonel Panic's solution, there is a problem with where items will be buffered and will not be pushed to subscriber unless an item is generated.

As described in this comment the problem is p.Window(() => closes), because it opens up a gap in which events can be missed.

That lambda is going to be invoked after each window is processed. And the Window operator is going to call Subscribe on what the lambda returns each time, because as far as it knows, you might return a completely different IObservable from that lambda every time.

Since now always the same lambda is used, we need to adjust the maxCount. Without the change the maxCount would never be reseted and after it was hit once, every new event would be over the maxCount.

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
    var publish = stream.Publish(p =>
    {
        var closes = p.Throttle(delay);

        if (maxCount != null)
        {
            Int32 i = 0;

            var overflows = p.Where(x =>
            {
                ++i;

                if (i >= maxCount)
                {
                    i = 0;
                    return true;
                }

                return false;
            });

            closes = closes.Merge(overflows);
        }

        return p.Window(closes).SelectMany(window => window.ToList());
    });

    return publish;
}

Update:
After further tests i found out that still, in some cases, items will not be correctly pushed to the subscriber.

Here is the workaround which works for us since already 4 months without any problems.

The workaround is adding .Delay(...) with any TimeSpan.

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
    var publish = stream.Publish(p =>
    {
        var closes = p.Throttle(delay);

        if (maxCount != null)
        {
            var overflows = stream.Where((x, index) => index + 1 >= maxCount);
            closes = closes.Merge(overflows);
        }

        return p.Window(() => closes).SelectMany(window => window.ToList()).Delay(TimeSpan.Zero);
    });

    return publish;
}


I guess this can be implemented on top of Buffer method as shown below:

public static IObservable<IList<T>> SlidingBuffer<T>(this IObservable<T> obs, TimeSpan span, int max)
        {
            return Observable.CreateWithDisposable<IList<T>>(cl =>
            {
                var acc = new List<T>();
                return obs.Buffer(span)
                        .Subscribe(next =>
                        {
                            if (next.Count == 0) //no activity in time span
                            {
                                cl.OnNext(acc);
                                acc.Clear();
                            }
                            else
                            {
                                acc.AddRange(next);
                                if (acc.Count >= max) //max items collected
                                {
                                    cl.OnNext(acc);
                                    acc.Clear();
                                }
                            }
                        }, err => cl.OnError(err), () => { cl.OnNext(acc); cl.OnCompleted(); });
            });
        }

NOTE: I haven't tested it, but I hope it gives you the idea.


Colonel Panic's solution is almost perfect. The only thing that is missing is a Publish component, in order to make the solution work with cold sequences too.

/// <summary>
/// Projects each element of an observable sequence into a buffer that's sent out
/// when either a given inactivity timespan has elapsed, or it's full,
/// using the specified scheduler to run timers.
/// </summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, int maxCount,
    IScheduler scheduler = default)
{
    if (maxCount < 1) throw new ArgumentOutOfRangeException(nameof(maxCount));
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
    {
        var combinedBoundaries = Observable.Merge
        (
            published.Throttle(dueTime, scheduler),
            published.Skip(maxCount - 1)
        );

        return published
            .Window(() => combinedBoundaries)
            .SelectMany(window => window.ToList());
    });
}

Beyond adding the Publish, I've also replaced the original .Where((_, index) => index + 1 >= maxCount) with the equivalent but shorter .Skip(maxCount - 1). For completeness there is also an IScheduler parameter, which configures the scheduler where the timer is run.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜