开发者

Using Rx Framework for async calls using the void AsyncMethod(Action<T> callback) pattern

I've seen tons of examples on how to use the Observable.FromAsyncPattern() in the Rx Framework to simplify Async calls, but I'm using an interface that doesn't use the standard Async pattern of IAsyncResult BeginXXX/EndXXX(IAsyncResult), so this doesn't work for me.

The library I'm working with exposes async functions with a callback patter:

void GetAllObjects(Action<List开发者_高级运维<Object>> callback)

In an ideal world I'd like to turn this:

var isLoadingUsers = true;
var isLoadingSystems = true;
var isLoadingCustomers = true;
var isLoadingRules = true;

mClient.GetAllUsers(UsersCallback);
mClient.GetAllCustomers(CustomersCallback);
mClient.GetAllRules(RulesCallback);

// set the IsLoadingXXX variables to false in callbacks
// once all are false

mClient.GetAllSystems(SystemsCallback);

into something like this:

var o = Observable.ForkJoin(
                     Observable.Start(GetAllUsers()),
                     Observable.Start(GetAllCustomers()),
                     Observable.Start(GetAllRules())
                    ).Finally(() => GetAllSystems);

How would one go about turning that pattern into something that returns an IObservable?


Func<IObservable<TRet>> FromListCallbackPattern(Action<Action<List<TRet>>> function)
{
    return () => {
        // We use a ReplaySubject so that if people subscribe *after* the
        // real method finishes, they'll still get all the items
        ret = new ReplaySubject<TRet>();

        function((list) => {
            // We're going to "rebroadcast" the list onto the Subject
            // This isn't the most Rx'iest way to do this, but it is the most
            // comprehensible :)
            foreach(var v in list) {
                ret.OnNext(v);
            }
            ret.OnCompleted();
        });

        return ret;
    };
}

Now, you can do something like:

var getAllUsers = FromListCallbackPattern(mClient.GetAllUsers);
getAllUsers().Subscribe(x => /* ... */);


Try Observable.Create(), perhaps something like this:

public IObservable<Object> ObserveAllObjects()
{
    return Observable.Create<Object>(
        observer =>
            () => GetAllObjects(objects => objects.ForEach(o => observer.OnNext(o))));
}


I like Observable.Create for this, but @dahlbyk answer is incorrect(misses completion and performs the action in the unsubscribe handler). Should be something like this:

    IObservable<List<T>> FromListCallbackPattern<T>(
        Action<Action<List<T>>> listGetter)
    {
        return Observable
            .Create<List<T>>(observer =>
            {
                var subscribed = true;
                listGetter(list =>
                {
                    if (!subscribed) return;
                    observer.OnNext(list);
                    observer.OnCompleted();
                });
                return () =>
                {
                    subscribed = false;
                };
            });
    }

Also, since the originating API returns an entire list altogether, I don't see a reason to transform it to observable too early. Let the resulting observable return a list as well, and if a caller needs to flatten it, he can use .SelectMany

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜