开发者

Using Rx to throttle callbacks from non-asynchronous calls

My question is similar to this question, but as always is slightly different.

I currently work on a project that uses a simple pattern to asynchronously receive notifications of underlying data changes.

Class Foo is responsible for subscribing to these data changes and provides a method for classes to register their interest in these changes by registering an instance of a class that implements a given interface:

public Guid SubscribeToChanges(ISomeCallbackInterface callback开发者_开发问答)

Class Bar implements this callback and registers itself:

public class Bar : ISomeCallbackInterface
{
    ...
    //initialise instance of Foo class and subscribe to updates
    _foo.SubscribeToChanges(this);
    ...

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        ...
    }
}

Ideally we would like to throttle these callbacks, as we can for instance get a callback that changes the value of a particular piece of data from x to y and then back to x within the space of a second. Looking at the Rx documentation this would be trivial by using the DistinctUntilChanged operator.

However, the question is how to create an IObservable collection that I can then apply the operator to given my callback implementation above. The docs are very clear about how to create an IObservable from standard .Net events or Begin.../End... method pairs.

UPDATE: As Richard Szalay pointed out in his comment, I will need to use DistinctUntilChanged in tandem with Throttle to achieve what I need.

Again, thanks to Richard, I also should have mentioned I need the ability to unsubscribe from the callbacks. In the current model I simply call Unscubscribe(Guid subscriptionToken) on the instance of Foo.


The only way to do it I think of is with a custom implementation of ISomeCallbackInterface and Observable.Create. Still, it feels shoe-horned into the solution:

public static IObservable<Tuple<string,IEnumerable<Tuple<string, int, object>>> 
    FromCustomCallbackPattern(ISomeCallbackPublisher publisher)
{
    return Observable.CreateWithDisposable<T>(observer =>
    {
        var subject = new Subject<
            Tuple<string,IEnumerable<Tuple<string, int, object>>>();

        var callback = new ObservableSomeCallback(subject);

        Guid subscription = publisher.SubscribeToChanges(callback);

        return new CompositeDisposable(
            subject.Subscribe(observer),
            Disposable.Create(() => publisher.UnsubscribeFromChanges(subscription))
        );
    });
}

private class ObservableSomeCallback : ISomeCallbackInterface
{
    private IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer;

    public ObservableSomeCallback(
        IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer)
    {
        this.observer = observer;
    }

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        this.observer.OnNext(new Tuple<
            string,IEnumerable<Tuple<string, int, object>>(id, data));
    }
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜