开发者

Problem with Reactive Extension Observers

I was working on an app using Reactive Extensions and got into the following problem:

say i have two observers P and Q, i want to build a third observer R that if two values of P comes without a Q, R outputs 0. And if after a P comes a Q, R outputs the result of a method passing those values, something like:

P0    Q0    ->    R0 = f(P0,Q0)    
P1          ->    R1 = 0    
P2    Q1    ->    R2 = f(P2,Q1)    
P3          ->    R3 = 0    
P4          ->    R4 = 0  开发者_StackOverflow中文版  
P5    Q2    ->    R5 = f(P5,Q2)
(...)

and the values come into the obsevers in the following order:

P0 Q0 P1 P2 Q1 P3 P4 P5 Q2

thanks for your help.


I think I have a solution for you.

If I assume that you have the following defined:

IObservable<int> ps = ...;
IObservable<int> qs = ...;

Func<int, int, int> f = ...;

First up I create a dictionary of functions to compute the final values:

var fs = new Dictionary<string, Func<int, int, int?>>()
{
    { "pp", (x, y) => 0 },
    { "pq", (x, y) => f(x, y) },
    { "qp", (x, y) => null },
    { "qq", (x, y) => null },
};

Every combination of "p" & "q" is there.

Then you can create a merged observable like this:

var pqs =
    (from p in ps select new { k = "p", v = p })
        .Merge(from q in qs select new { k = "q", v = q });

I now know which sequence produced which value.

Next, I publish the combined list as I don't know if the source observables are hot or cold - so publishing them makes them hot - and then I zip the published observable to itself skipping one and zero respectively. I then know the each pair of values and the original observables that they came from. It's easy then to apply the dictionary functions (filtering out any null values).

Here it is:

var rs =
    from kvv in pqs.Publish(_pqs =>
        _pqs.Skip(1).Zip(_pqs, (pq1, pq0) => new
        {
            k = pq0.k + pq1.k,
            v1 = pq1.v,
            v0 = pq0.v
        }))
    let r = fs[kvv.k](kvv.v0, kvv.v1)
    where r.HasValue
    select r.Value;

Does that work for you?


The general idea is simple: you merge P and Q, use BufferWithCount(2) to get pairs of values and then process pairs according to your logic:


P.Merge(Q).BufferWithCount(2).Select(values =>
{
    var first = values[0];
    var second = values[1];
    if (first is P && second is P ||
        first is Q && second is Q)
    {
        return 0;
    }

    if (first is P)
    {
        return selector(first, second);
    }
    else // suppose Q, P is a valid sequence as well.
    {
        return selector(second, first);
    }
});

Now the hard part is to merge P and Q if they are of different type and then distinguish between them in Select. If they are of the same type you could use something simple like approach proposed by Enigmativity, i.e.


var pqs =
    (from p in ps select new { k = "p", v = p })
        .Merge(from q in qs select new { k = "q", v = q });

Now the hard part is if they are of different types, to merge them we'll need some common wrapper type, something like, e.g. Data.Either from Haskell:


public abstract class Either<TLeft, TRight>
{
    private Either()
    {
    }

    public static Either<TLeft, TRight> Create(TLeft value)
    {
        return new Left(value);
    }

    public static Either<TLeft, TRight> Create(TRight value)
    {
        return new Right(value);
    }

    public abstract TResult Match<TResult>(
        Func<TLeft, TResult> onLeft,
        Func<TRight, TResult> onRight);

    public sealed class Left : Either<TLeft, TRight>
    {
        public Left(TLeft value)
        {
            this.Value = value;
        }

        public TLeft Value
        {
            get;
            private set;
        }

        public override TResult Match<TResult>(
            Func<TLeft, TResult> onLeft,
            Func<TRight, TResult> onRight)
        {
            return onLeft(this.Value);
        }
    }

    public sealed class Right : Either<TLeft, TRight>
    {
        public Right(TRight value)
        {
            this.Value = value;
        }

        public TRight Value
        {
            get;
            private set;
        }

        public override TResult Match<TResult>(
            Func<TLeft, TResult> onLeft,
            Func<TRight, TResult> onRight)
        {
            return onRight(this.Value);
        }
    }
}

Funny enough, there is already similar Either class in System.Reactive.dll, unfortunately it's internal, so we need our own implementation. Now we can put both P and Q into Either and proceed with out solution (I've generalized it a bit, so you can return any result instead of int only):


public static IObservable<TResult> SmartZip<TLeft, TRight, TResult>(
    IObservable<TLeft> leftSource,
    IObservable<TRight> rightSource,
    Func<TLeft, TRight, TResult> selector)
{
    return Observable
        .Merge(
            leftSource.Select(Either<TLeft, TRight>.Create),
            rightSource.Select(Either<TLeft, TRight>.Create))
        .BufferWithCount(2)
        .Select(values =>
            {
                // this case was not covered in your question,
                // but I've added it for the sake of completeness.
                if (values.Count < 2)
                {
                    return default(TResult);
                }

                var first = values[0];
                var second = values[1];

                // pattern-matching in C# is really ugly.
                return first.Match(
                    left => second.Match(
                        _ => default(TResult),
                        right => selector(left, right)),
                    right => second.Match(
                        left => selector(left, right),
                        _ => default(TResult)));
            });
}

And here is a small demo for all this scary ugly stuff.


private static void Main(string[] args)
{
    var psource = Observable
        .Generate(1, i => i < 100, i => i, i => i + 1)
        .Zip(Observable.Interval(TimeSpan.FromMilliseconds(10.0)), (i, _) => i);
    var qsource = Observable
        .Generate(1, i => i < 100, i => (double)i * i, i => i + 1)
        .Zip(Observable.Interval(TimeSpan.FromMilliseconds(30.0)), (i, _) => i);

    var result = SmartZip(
        psource,
        qsource,
        (p, q) => q / p).ToEnumerable();
    foreach (var item in result)
    {
        Console.WriteLine(item);
    }
}


If I have understood your question properly then below is a generic function which can handle such cases:

public static IObservable<T> MyCombiner<T>(IObservable<T> P, IObservable<T> Q, T defaultValue,Func<T,T,T> fun)
        {
            var c = P.Select(p => new { Type = 'P', Value = p })
                        .Merge(Q.Select(p => new { Type = 'Q', Value = p }));
            return c.Zip(c.Skip(1), (a, b) =>
            {
                if (a.Type == 'P' && b.Type == 'P')
                    return new { Ok = true, Value = defaultValue };
                if (a.Type == 'P' && b.Type == 'Q')
                    return new { Ok = true, Value = fun(a.Value, b.Value) };
                else
                    return new { Ok = false, Value = default(T) };
            }).Where(b => b.Ok).Select(b => b.Value);

        }


Suppose we have two methods

  1. Before, Merges two observable sequences into one observable sequence by using a selector function whenever the first observable produces an element rigth before the second one.
  2. Without, Merges an observable sequence into other observable sequence every time two items came togheter from the first observable without any item from the second one.

With this methods the problem is almost solved.

IObservable<TP> P = // observer P
IObservable<TQ> Q = // observer Q

var PP = P.Without((prev, next) => 0, Q);
var PQ = P.Before(Q, (p,q) => f(p,q)); // apply the function

var ResultSecuence = PP.Merge(PQ);

And here are the two methods

public static class Observer
{
    /// <summary>
    /// Merges two observable sequences into one observable sequence by using the selector function 
    /// whenever the first observable produces an element rigth before the second one.
    /// </summary>
    /// <param name="first"> First observable source.</param>
    /// <param name="second">Second observable source.</param>
    /// <param name="resultSelector">Function to invoke whenever the first observable produces an element rigth before the second one.</param>
    /// <returns>
    /// An observable sequence containing the result of combining elements of both sources 
    /// using the specified result selector function.
    /// </returns>
    public static IObservable<TResult> Before<TLeft, TRight, TResult>(this IObservable<TLeft> first, IObservable<TRight> second, Func<TLeft, TRight, TResult> resultSelector)
    {
        var result = new Subject<TResult>();

        bool firstCame = false;
        TLeft lastLeft = default(TLeft);

        first.Subscribe(item =>
        {
            firstCame = true;
            lastLeft = item;
        });

        second.Subscribe(item =>
        {
            if (firstCame)
                result.OnNext(resultSelector(lastLeft, item));

            firstCame = false;
        });

        return result;
    }

    /// <summary>
    /// Merges an observable sequence into one observable sequence by using the selector function 
    /// every time two items came from <paramref name="first"/> without any item of any observable
    /// in <paramref name="second"/>
    /// </summary>
    /// <param name="first"> Observable source to merge.</param>
    /// <param name="second"> Observable list to ignore.</param>
    /// <param name="resultSelector">Function to invoke whenever the first observable produces two elements without any of the observables in the secuence produces any element</param>
    /// <returns>
    /// An observable sequence containing the result of combining elements
    /// using the specified result selector function.
    /// </returns>
    public static IObservable<TResult> Without<TLeft, TResult>(this IObservable<TLeft> first,  Func<TLeft, TLeft, TResult> resultSelector,params IObservable<object>[] second)
    {
        var result = new Subject<TResult>();

        bool firstCame = false;
        TLeft lastLeft = default(TLeft);

        first.Subscribe(item =>
        {
            if (firstCame)
                result.OnNext(resultSelector(lastLeft, item));

            firstCame = true;
            lastLeft = item;
        });

        foreach (var observable in second)
            observable.Subscribe(item => firstCame = false);

        return result;
    }        
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜