开发者

merge multiple observables to an observable array

Hi I am trying to merge a number of observables to an observable array. Here an example that works in fsi. (sorry that it is lengthy)

#r "./bin/Debug/System.Reactive.dll"

open System
open System.Reactive.Linq

/// Subscribes to the Observable with all 3 callbacks.
let subscribeComplete next error completed (observable: IObservable<'T>) = 
    observable.Subscribe(
        (fun x -> next x),
        (fun e -> error e),
        (fun () -> completed()))

/// Subscribes to the Observable with a next and an error-function.
let subscribeWithError next error observable = 
    subscribeComplete next error (fun () -> ()) observable

/// Subscribes to the Observable with a next-function
let subscribe (next: 'T -> unit) (observable: IObservable<'T>) : IDisposable = 
    subscribeWithError next ignore observable

/// Static method to generate observable from input functions
let ObsGenerate (initState: 'TS) (termCond: 'TS -> bool) (iterStep: 'TS -> 'TS)
        (resSelect: 'TS -> 'TR) (timeSelect : 'TS -> System.TimeSpan) =
            Observable.Generate(initState, termCond, iterStep, resSelect, timeSelect)

//maps the given observable with the given function
let obsMap (f: 'T -> 'U) (observable : IObservable<'T>) : IObservable<'U> =
    Observable.Select(observable, Func<_,_>(f))

/// Merges two observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest (obs1: IObservable<'T>) (obs2: IObservable<'U>) : IObservable<'T * 'U> = 
    Observable.CombineLatest(
        obs1, obs2, Func<_,_,_>(fun a b -> a, b))    

/// Merges three observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest3 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) : IObservable<'T * 'U * 'V> = 
    let obs12 =obs1.CombineLatest(obs2, Func<_,_,_>(fun a b -> a, b))    
    obs12.CombineLatest(obs3, Func<_,_,_>(fun (a,b) c -> a, b, c))    

/// Merges four observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest4 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) (obs4: IObservable<'W>) : IObservable<'T * 'U * 'V * 'W> = 
    let obsNew = combineLatest3 obs1 obs2 obs3
    obsNew.CombineLatest(obs4, Func<_,_,_>(fun (a,b,c) d -> a, b, c, d))    

// second section generating arrays
let combineLatestArray (obs1: IObservable<'T>) (obs2: IObservable<'T>) =
    combineLatest obs1 obs2 
    |> obsMap (fun (a, b) -> [a; b] |> List.toArray)

let combineLatest3Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) =
    combineLatest3 obs1 obs2 obs3 
    |> obsMap (fun (a, b, c) -> [a; b; c] |> List.toArray)

let combineLatest4Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) (obs4: IObservable<'T>) =
    combineLatest4 obs1 obs2 obs3 obs4
    |> obsMap (fun (a, b, c, d) -> [a; b; c; d] |> List.toArray)

let combineLatestListToArray (list: IObservable<'T> List) =
    match list.Length with
    | 2 -> combineLatestArray list.[0] list.[1]
    | 3 -> combineLatest3Array list.[0] list.[1] list.[2]
    | 4 -> combineLatest4Array list.[0] list.[1] list.[2] list.[3]
    | _ -> failwith "combine latest on unsupported list size"

type FooType = 
        {   NameVal :   string
            IdVal   :   int
            RetVal  :   float }

        member x.StringKey() =
            x.NameVal.ToString() + ";" + x.IdVal.ToString()


// example code starts here

let rnd = System.Random()

let fooListeners = Collections.Generic.Dictionary()

let AddAFoo (foo : FooType) =
    let fooId = foo.StringKey()

    if fooListeners.ContainsKey(fooId)
        then fooListeners.[fooId]
    else
        let myObs = ObsGenerate {NameVal = foo.NameVal; IdVal = foo.IdVal; RetVal = foo.RetVal} (fun x -> true) (fun x -> {NameVal = (x.NameVal); IdVal = (x.IdVal); RetVal = (x.RetVal + rnd.NextDouble() - 0.5)}) (fun x -> x) (fun x -> System.TimeSpan.FromMilliseconds(rnd.NextDouble() * 2000.0))
        fooListeners.Add(fooId,myObs)
        myObs

let fooInit =   [6..9]
                |> List.map (fun index -> {NameVal = (string index + "st"); IdVal = index; RetVal = (float index + 1.0)})     
                |> List.map (fun foo -> AddAFoo foo)

let fooValuesArray =    fooInit
                        |> List.map(fun x -> (x |> obsMap (fun x -> x.RetVal)))
                        |> combineLatestListToArray

let mySub =
    fooValuesArray
    |> subscribe (fun fooVals -> printfn "fooArray: %A" fooVals)

//execute until here to start example

// execute this last line to unsubscribe
mySub.Dispose()

I have two questions about this code:

  1. Is there a smarter way of merging the observables to arrays? (it gets very lengthy as I need to merge larger arrays)

  2. I want to throttle the updates. What I mean by that is that I want all updates that occur within (say) the same half a second window to be handled as one update on the array. Ideally, I want this window to open only when a first update comes in, i.e if no updates arrive in 2 seconds, then one update arrives, then we wait and include further updates for 0.5 seconds and then trigger the observable. I don't want it to publish periodically every 0.5 seconds although no observables are triggered. I hope this description is clear enough.

update: I have decided to accept one of the F# answ开发者_开发技巧ers, but I haven't done the C# answers justice yet. I hope to be able to check them out properly soon.


For 1, an application of List.fold and List.toArray and a few Observable operators should work nicely. Something like:

let combineLatest observables =
    Observable.Select(
        (observables 
         |> List.fold (fun ol o 
                         -> Observable.CombineLatest(o, ol, (fun t tl -> t :: tl))
                      ) (Observable.Return<_>([]))
        ), 
        List.toArray)

Due to the nesting, you may end up with performance issues if you have a large list of Observables, but it's worth at least trying before you resort to writing it by hand.

For 2, I would agree with the other answers to apply Throttling to the result.


I'm sorry this isn't F# - I wish I had time to learn it - but here's a possible answer in C#.

Here are a set of extension methods that will combine the latest from an IEnumerable<IObservable<T>> to an IObservable<IEnumerable<T>>:

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IObservable<T> second)
{
    if (first == null) throw new ArgumentNullException("first");
    if (second == null) throw new ArgumentNullException("second");
    return first.CombineLatest(second, (t0, t1) => EnumerableEx.Return(t0).Concat(EnumerableEx.Return(t1)));
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IObservable<T> second)
{
    if (firsts == null) throw new ArgumentNullException("firsts");
    if (second == null) throw new ArgumentNullException("second");
    return firsts.CombineLatest(second, (t0s, t1) => t0s.Concat(EnumerableEx.Return(t1)));
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources)
{
    if (sources == null) throw new ArgumentNullException("sources");
    return sources.CombineLatest(() => sources.First().CombineLatest(sources.Skip(1)), () => Observable.Empty<IEnumerable<T>>());
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IEnumerable<IObservable<T>> seconds)
{
    if (first == null) throw new ArgumentNullException("first");
    if (seconds == null) throw new ArgumentNullException("seconds");
    return seconds.CombineLatest(() => first.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => first.Select(t => EnumerableEx.Return(t)));
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IEnumerable<IObservable<T>> seconds)
{
    if (firsts == null) throw new ArgumentNullException("firsts");
    if (seconds == null) throw new ArgumentNullException("seconds");
    return seconds.CombineLatest(() => firsts.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => firsts);
}

private static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources, Func<IObservable<IEnumerable<T>>> any, Func<IObservable<IEnumerable<T>>> none)
{
    if (sources == null) throw new ArgumentNullException("sources");
    if (any == null) throw new ArgumentNullException("any");
    if (none == null) throw new ArgumentNullException("none");
    return Observable.Defer(() => sources.Any() ? any() : none());
}

They may not be very efficient, but they do handle any number of observables that need to be combined.

I'd be keen to see these methods converted to F#.

As for your second question, I'm not sure I can answer with what you've said so far because CombineLatest and Throttle both lose values so it is probably prudent to understand your use case in more detail before attempting an answer.


Although Gideon Engelberth has answered your question with one of the possible way to solve the problem. Other possible way could be something like below, it doesn't use nesting.

let combineLatestToArray (list : IObservable<'T> list) = 
    let s = new Subject<'T array>()
    let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
    let cb (i:int,v:'T) = 
        arr.[i] <- v
        s.OnNext(arr |> Array.toList |> List.toArray)
    let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
               |> Observable.Merge
    main.Subscribe(new Action<int * 'T>(cb)
                   ,new Action<exn>(fun e -> s.OnError(e)) 
                   ,new Action(fun () -> s.OnCompleted()) ) |> ignore
    s :> IObservable<'T array>

Let me know if this solved your problem as I haven't testing it much :) NOTE: This is for the first part, for second part everyone has already mentioned what you need to do

UPDATE: Another implementation :

let combineLatestToArray (list : IObservable<'T> list) = 
    let s = new Subject<'T array>()
    let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
    let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
               |> Observable.Merge
    async {
        try
            let se = main.ToEnumerable() |> Seq.scan (fun ar (i,t) -> Array.set ar i t; ar) arr
            for i in se do
                s.OnNext(i |> Array.toList |> List.toArray)
            s.OnCompleted()
        with
        | :? Exception as ex -> s.OnError(ex)
    } |> Async.Start
    s :> IObservable<'T array>


  1. Seems that Observable.Merge() which has overloads for variable number of IObservables is closer to what you want.

  2. Observable.Buffer() with the time overloads will do what you want here. In the "no events" situation, Buffer will still OnNext() an empty list, letting you react to that stiuation.


This is the best I could come up with. I've been wanting to solve this for a while.

public static class Extensions
{
    public static IObservable<IEnumerable<T>> CombineLatest<T>(this Observable observable, IEnumerable<IObservable<T>> observableCollection)
    {
        return observableCollection.CombineLatest();
    }

    public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> observables)
    {
        return observables.Aggregate<IObservable<T>, IObservable<IEnumerable<T>>>
        (
            Observable.Return(Enumerable.Empty<T>()),
            (o, n) => o.CombineLatest
            (
                n,
                (list, t) => list.Concat(EnumerableEx.Return(t))
            )
        );
    }
}

So an example usage would be:

var obs = new List<IObservable<bool>> 
{ 
    Observable.Return(true), 
    Observable.Return(false), 
    Observable.Return(true) 
};

var result = obs.CombineLatest().Select(list => list.All(x => x));
result.Subscribe(Console.WriteLine);
Console.ReadKey();

You would have to operate on the knowledge, though, that the resulting IObservable<IEnumerable<T>> will not fire until all observables have yielded a value. This is what I needed in my scenarios, but might not be appropriate for your scenario.

My worry with this is the performance of all of the .Concats. Might be more performant to deal in a mutable collection in the extension method. Not sure.

Sorry, I don't know F#. I'll get around to it one of these days.

Throttling is just done with the .Throttle operator after you get your final observable.

Edit: This answer is the iterative Ying to Enigmativity's recursive Yang.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜