Using Rx to throttle callbacks from non-asynchronous calls
My question is similar to this question, but as always is slightly different.
I currently work on a project that uses a simple pattern to asynchronously receive notifications of underlying data changes.
Class Foo is responsible for subscribing to these data changes and provides a method for classes to register their interest in these changes by registering an instance of a class that implements a given interface:
public Guid SubscribeToChanges(ISomeCallbackInterface callback开发者_开发问答)
Class Bar implements this callback and registers itself:
public class Bar : ISomeCallbackInterface
{
...
//initialise instance of Foo class and subscribe to updates
_foo.SubscribeToChanges(this);
...
public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
{
...
}
}
Ideally we would like to throttle these callbacks, as we can for instance get a callback that changes the value of a particular piece of data from x to y and then back to x within the space of a second. Looking at the Rx documentation this would be trivial by using the DistinctUntilChanged operator.
However, the question is how to create an IObservable collection that I can then apply the operator to given my callback implementation above. The docs are very clear about how to create an IObservable from standard .Net events or Begin.../End... method pairs.
UPDATE: As Richard Szalay pointed out in his comment, I will need to use DistinctUntilChanged in tandem with Throttle to achieve what I need.
Again, thanks to Richard, I also should have mentioned I need the ability to unsubscribe from the callbacks. In the current model I simply call Unscubscribe(Guid subscriptionToken) on the instance of Foo.
The only way to do it I think of is with a custom implementation of ISomeCallbackInterface
and Observable.Create
. Still, it feels shoe-horned into the solution:
public static IObservable<Tuple<string,IEnumerable<Tuple<string, int, object>>>
FromCustomCallbackPattern(ISomeCallbackPublisher publisher)
{
return Observable.CreateWithDisposable<T>(observer =>
{
var subject = new Subject<
Tuple<string,IEnumerable<Tuple<string, int, object>>>();
var callback = new ObservableSomeCallback(subject);
Guid subscription = publisher.SubscribeToChanges(callback);
return new CompositeDisposable(
subject.Subscribe(observer),
Disposable.Create(() => publisher.UnsubscribeFromChanges(subscription))
);
});
}
private class ObservableSomeCallback : ISomeCallbackInterface
{
private IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer;
public ObservableSomeCallback(
IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer)
{
this.observer = observer;
}
public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
{
this.observer.OnNext(new Tuple<
string,IEnumerable<Tuple<string, int, object>>(id, data));
}
}
精彩评论