开发者

Calling an expensive Reactive Extensions `IObservable` function only when values not in local cache

I'm using the Reactive Extensions (Rx) and a repository pattern to facilitate getting data from a relatively slow data source. I have the following (simplified) interface:

public interface IStorage
{
    IObservable<INode> Fetch(IObservable<Guid> ids);
}

Creating an instance of the implementation of IStorage is slow - think creating a web service or db connection. Each Guid in the ids observable results in a one-to-one INode (or null) in the return observable and each result is expensive. Therefore , it makes sense to me only to instantiate IStorage only if I have at least one value to fetch and then to use IStorage to fetch only the values once for each Guid.

To limit the calls to IStorage I cache the results in my Repository class that looks like this:

public class Repository
{
    private Dictionary<Guid, INode> NodeCache { ge开发者_运维知识库t; set; }

    private Func<IStorage> StorageFactory { get; set; }

    public IObservable<INode> Fetch(IObservable<Guid> ids)
    {
        var lazyStorage = new Lazy<IStorage>(this.StorageFactory);

        // from id in ids
        // if NodeCache contains id select NodeCache[id]
        // else select node from lazyStorage.Value.Fetch(...)
    }
}

In the Repository.Fetch(...) method I've included comments indicating what I'm trying to do.

Essentially though, if the NodeCache contains all of the ids being fetched then IStorage is never instantiated and the results are returned with almost no delay. However, if any one id is not in the cache then IStorage is instantiated and all of the unknown ids are passed through the IStorage.Fetch(...) method.

The one-to-one mapping, including order preservation, needs to be maintained.

Any ideas?


It took a while to work it out, but I finally got my own solution.

I have defined two extension methods called FromCacheOrFetch with these signatures:

IObservable<R> FromCacheOrFetch<T, R>(
    this IObservable<T> source,
    Func<T, R> cache,
    Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)
        where R : class

IObservable<R> FromCacheOrFetch<T, R>(
    this IObservable<T> source,
    Func<T, Maybe<R>> cache,
    Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)

The first uses standard CLR/Rx types and the second uses a Maybe monad (nullable types not restricted to value types).

The first just turns the Func<T, R> into Func<T, Maybe<R>> and calls the second method.

The basic idea behind is that when the source is to be queried the cache is examined for each value to see if a result already exists and if it does the result is immediately returned. If, however, any result is missing then and only then is the fetch function called by passing in a Subject<T> and now all cache misses are passed through the fetch function. The calling code is responsible for adding the results to the cache. The code asynchronously processes all the values through the fetch function and reassembles the results, along with cached results, into the correct order.

Works like a treat. :-)

Here's the code:

public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
    Func<T, R> cache, Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)
        where R : class
{
    return source
        .FromCacheOrFetch<T, R>(t => cache(t).ToMaybe(null), fetch, scheduler);
}

public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
    Func<T, Maybe<R>> cache, Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)
{
    var results = new Subject<R>();

    var disposables = new CompositeDisposable();

    var loop = new EventLoopScheduler();
    disposables.Add(loop);

    var sourceDone = false;
    var pairsDone = true;
    var exception = (Exception)null;

    var fetchIn = new Subject<T>();
    var fetchOut = (IObservable<R>)null;
    var pairs = (IObservable<KeyValuePair<int, R>>)null;

    var lookup = new Dictionary<T, int>();
    var list = new List<Maybe<R>>();
    var cursor = 0;

    Action checkCleanup = () =>
    {
        if (sourceDone && pairsDone)
        {
            if (exception == null)
            {
                results.OnCompleted();
            }
            else
            {
                results.OnError(exception);
            }
            loop.Schedule(() => disposables.Dispose());
        }
    };

    Action dequeue = () =>
    {
        while (cursor != list.Count)
        {
            var mr = list[cursor];
            if (mr.HasValue)
            {
                results.OnNext(mr.Value);
                cursor++;
            }
            else
            {
                break;
            }
        }
    };

    Action<KeyValuePair<int, R>> nextPairs = kvp =>
    {
        list[kvp.Key] = Maybe<R>.Something(kvp.Value);
        dequeue();
    };

    Action<Exception> errorPairs = ex =>
    {
        fetchIn.OnCompleted();
        pairsDone = true;
        exception = ex;
        checkCleanup();
    };

    Action completedPairs = () =>
    {
        pairsDone = true;
        checkCleanup();
    };

    Action<T> sourceNext = t =>
    {
        var mr = cache(t);
        list.Add(mr);
        if (mr.IsNothing)
        {
            lookup[t] = list.Count - 1;
            if (fetchOut == null)
            {
                pairsDone = false;
                fetchOut = fetch(fetchIn.ObserveOn(Scheduler.ThreadPool));
                pairs = fetchIn
                    .Select(x => lookup[x])
                    .Zip(fetchOut, (i, r2) => new KeyValuePair<int, R>(i, r2));
                disposables.Add(pairs
                    .ObserveOn(loop)
                    .Subscribe(nextPairs, errorPairs, completedPairs));
            }
            fetchIn.OnNext(t);
        }
        else
        {
            dequeue();
        }
    };

    Action<Exception> errorSource = ex =>
    {
        sourceDone = true;
        exception = ex;
        fetchIn.OnCompleted();
        checkCleanup();
    };

    Action completedSource = () =>
    {
        sourceDone = true;
        fetchIn.OnCompleted();
        checkCleanup();
    };

    disposables.Add(source
        .ObserveOn(loop)
        .Subscribe(sourceNext, errorSource, completedSource));

    return results.ObserveOn(scheduler);
}


Something like this (I assumed you want to instantiate the storage only once for all subscribers):

public class Repository
{
    public Repository()
    {
        _lazyStorage = new Lazy<IStorage>(StorageFactory);
    }
    private readonly Lazy<IStorage> _lazyStorage;
    private Dictionary<Guid, INode> NodeCache { get; set; }
    private Func<IStorage> StorageFactory { get; set; }
    public IObservable<INode> Fetch(IObservable<Guid> ids)
    {
        return Observable
            .CreateWithDisposable<INode>(observer =>
                ids.Subscribe(x =>
                {
                    INode node;
                    if (NodeCache.TryGetValue(x, out node))
                        observer.OnNext(node);
                    else
                    {
                        node = _lazyStorage.Value.Fetch(x);
                        NodeCache[x] = node;
                        observer.OnNext(node);
                    }
                }, observer.OnError, observer.OnCompleted));
    }
}

EDIT: Hmm, this order preservation while IStorage.Fetch asynchronous is interesting - waiting for IStorage.Fetch should block all future values... Thinking...

I think I got it... Maybe... If you need order preservation, you need a queue. In RX world, queue is .Concat. Would the below work for you?

public class Repository
{
    public Repository()
    {
        _lazyStorage = new Lazy<IStorage>(StorageFactory);
    }
    private readonly Lazy<IStorage> _lazyStorage;
    private Dictionary<Guid, INode> NodeCache { get; set; }
    private Func<IStorage> StorageFactory { get; set; }
    private IObservable<INode> Fetcher(Guid id)
    {
        return Observable.Defer(() =>
        {
            INode node;
            return NodeCache.TryGetValue(id, out node)
                ? Observable.Return(node)
                : _lazyStorage.Value.Fetch(id).Do(x => NodeCache[id] = x);
        });
    }
    public IObservable<INode> Fetch(IObservable<Guid> ids)
    {
        return ids.Select(Fetcher).Concat();
    }
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜