merge multiple observables to an observable array
Hi I am trying to merge a number of observables to an observable array. Here an example that works in fsi. (sorry that it is lengthy)
#r "./bin/Debug/System.Reactive.dll"
open System
open System.Reactive.Linq
/// Subscribes to the Observable with all 3 callbacks.
let subscribeComplete next error completed (observable: IObservable<'T>) =
observable.Subscribe(
(fun x -> next x),
(fun e -> error e),
(fun () -> completed()))
/// Subscribes to the Observable with a next and an error-function.
let subscribeWithError next error observable =
subscribeComplete next error (fun () -> ()) observable
/// Subscribes to the Observable with a next-function
let subscribe (next: 'T -> unit) (observable: IObservable<'T>) : IDisposable =
subscribeWithError next ignore observable
/// Static method to generate observable from input functions
let ObsGenerate (initState: 'TS) (termCond: 'TS -> bool) (iterStep: 'TS -> 'TS)
(resSelect: 'TS -> 'TR) (timeSelect : 'TS -> System.TimeSpan) =
Observable.Generate(initState, termCond, iterStep, resSelect, timeSelect)
//maps the given observable with the given function
let obsMap (f: 'T -> 'U) (observable : IObservable<'T>) : IObservable<'U> =
Observable.Select(observable, Func<_,_>(f))
/// Merges two observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest (obs1: IObservable<'T>) (obs2: IObservable<'U>) : IObservable<'T * 'U> =
Observable.CombineLatest(
obs1, obs2, Func<_,_,_>(fun a b -> a, b))
/// Merges three observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest3 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) : IObservable<'T * 'U * 'V> =
let obs12 =obs1.CombineLatest(obs2, Func<_,_,_>(fun a b -> a, b))
obs12.CombineLatest(obs3, Func<_,_,_>(fun (a,b) c -> a, b, c))
/// Merges four observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest4 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) (obs4: IObservable<'W>) : IObservable<'T * 'U * 'V * 'W> =
let obsNew = combineLatest3 obs1 obs2 obs3
obsNew.CombineLatest(obs4, Func<_,_,_>(fun (a,b,c) d -> a, b, c, d))
// second section generating arrays
let combineLatestArray (obs1: IObservable<'T>) (obs2: IObservable<'T>) =
combineLatest obs1 obs2
|> obsMap (fun (a, b) -> [a; b] |> List.toArray)
let combineLatest3Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) =
combineLatest3 obs1 obs2 obs3
|> obsMap (fun (a, b, c) -> [a; b; c] |> List.toArray)
let combineLatest4Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) (obs4: IObservable<'T>) =
combineLatest4 obs1 obs2 obs3 obs4
|> obsMap (fun (a, b, c, d) -> [a; b; c; d] |> List.toArray)
let combineLatestListToArray (list: IObservable<'T> List) =
match list.Length with
| 2 -> combineLatestArray list.[0] list.[1]
| 3 -> combineLatest3Array list.[0] list.[1] list.[2]
| 4 -> combineLatest4Array list.[0] list.[1] list.[2] list.[3]
| _ -> failwith "combine latest on unsupported list size"
type FooType =
{ NameVal : string
IdVal : int
RetVal : float }
member x.StringKey() =
x.NameVal.ToString() + ";" + x.IdVal.ToString()
// example code starts here
let rnd = System.Random()
let fooListeners = Collections.Generic.Dictionary()
let AddAFoo (foo : FooType) =
let fooId = foo.StringKey()
if fooListeners.ContainsKey(fooId)
then fooListeners.[fooId]
else
let myObs = ObsGenerate {NameVal = foo.NameVal; IdVal = foo.IdVal; RetVal = foo.RetVal} (fun x -> true) (fun x -> {NameVal = (x.NameVal); IdVal = (x.IdVal); RetVal = (x.RetVal + rnd.NextDouble() - 0.5)}) (fun x -> x) (fun x -> System.TimeSpan.FromMilliseconds(rnd.NextDouble() * 2000.0))
fooListeners.Add(fooId,myObs)
myObs
let fooInit = [6..9]
|> List.map (fun index -> {NameVal = (string index + "st"); IdVal = index; RetVal = (float index + 1.0)})
|> List.map (fun foo -> AddAFoo foo)
let fooValuesArray = fooInit
|> List.map(fun x -> (x |> obsMap (fun x -> x.RetVal)))
|> combineLatestListToArray
let mySub =
fooValuesArray
|> subscribe (fun fooVals -> printfn "fooArray: %A" fooVals)
//execute until here to start example
// execute this last line to unsubscribe
mySub.Dispose()
I have two questions about this code:
Is there a smarter way of merging the observables to arrays? (it gets very lengthy as I need to merge larger arrays)
I want to throttle the updates. What I mean by that is that I want all updates that occur within (say) the same half a second window to be handled as one update on the array. Ideally, I want this window to open only when a first update comes in, i.e if no updates arrive in 2 seconds, then one update arrives, then we wait and include further updates for 0.5 seconds and then trigger the observable. I don't want it to publish periodically every 0.5 seconds although no observables are triggered. I hope this description is clear enough.
update: I have decided to accept one of the F# answ开发者_开发技巧ers, but I haven't done the C# answers justice yet. I hope to be able to check them out properly soon.
For 1, an application of List.fold
and List.toArray
and a few Observable
operators should work nicely. Something like:
let combineLatest observables =
Observable.Select(
(observables
|> List.fold (fun ol o
-> Observable.CombineLatest(o, ol, (fun t tl -> t :: tl))
) (Observable.Return<_>([]))
),
List.toArray)
Due to the nesting, you may end up with performance issues if you have a large list of Observables, but it's worth at least trying before you resort to writing it by hand.
For 2, I would agree with the other answers to apply Throttling to the result.
I'm sorry this isn't F# - I wish I had time to learn it - but here's a possible answer in C#.
Here are a set of extension methods that will combine the latest from an IEnumerable<IObservable<T>>
to an IObservable<IEnumerable<T>>
:
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IObservable<T> second)
{
if (first == null) throw new ArgumentNullException("first");
if (second == null) throw new ArgumentNullException("second");
return first.CombineLatest(second, (t0, t1) => EnumerableEx.Return(t0).Concat(EnumerableEx.Return(t1)));
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IObservable<T> second)
{
if (firsts == null) throw new ArgumentNullException("firsts");
if (second == null) throw new ArgumentNullException("second");
return firsts.CombineLatest(second, (t0s, t1) => t0s.Concat(EnumerableEx.Return(t1)));
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources)
{
if (sources == null) throw new ArgumentNullException("sources");
return sources.CombineLatest(() => sources.First().CombineLatest(sources.Skip(1)), () => Observable.Empty<IEnumerable<T>>());
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IEnumerable<IObservable<T>> seconds)
{
if (first == null) throw new ArgumentNullException("first");
if (seconds == null) throw new ArgumentNullException("seconds");
return seconds.CombineLatest(() => first.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => first.Select(t => EnumerableEx.Return(t)));
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IEnumerable<IObservable<T>> seconds)
{
if (firsts == null) throw new ArgumentNullException("firsts");
if (seconds == null) throw new ArgumentNullException("seconds");
return seconds.CombineLatest(() => firsts.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => firsts);
}
private static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources, Func<IObservable<IEnumerable<T>>> any, Func<IObservable<IEnumerable<T>>> none)
{
if (sources == null) throw new ArgumentNullException("sources");
if (any == null) throw new ArgumentNullException("any");
if (none == null) throw new ArgumentNullException("none");
return Observable.Defer(() => sources.Any() ? any() : none());
}
They may not be very efficient, but they do handle any number of observables that need to be combined.
I'd be keen to see these methods converted to F#.
As for your second question, I'm not sure I can answer with what you've said so far because CombineLatest
and Throttle
both lose values so it is probably prudent to understand your use case in more detail before attempting an answer.
Although Gideon Engelberth has answered your question with one of the possible way to solve the problem. Other possible way could be something like below, it doesn't use nesting.
let combineLatestToArray (list : IObservable<'T> list) =
let s = new Subject<'T array>()
let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
let cb (i:int,v:'T) =
arr.[i] <- v
s.OnNext(arr |> Array.toList |> List.toArray)
let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
|> Observable.Merge
main.Subscribe(new Action<int * 'T>(cb)
,new Action<exn>(fun e -> s.OnError(e))
,new Action(fun () -> s.OnCompleted()) ) |> ignore
s :> IObservable<'T array>
Let me know if this solved your problem as I haven't testing it much :) NOTE: This is for the first part, for second part everyone has already mentioned what you need to do
UPDATE: Another implementation :
let combineLatestToArray (list : IObservable<'T> list) =
let s = new Subject<'T array>()
let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
|> Observable.Merge
async {
try
let se = main.ToEnumerable() |> Seq.scan (fun ar (i,t) -> Array.set ar i t; ar) arr
for i in se do
s.OnNext(i |> Array.toList |> List.toArray)
s.OnCompleted()
with
| :? Exception as ex -> s.OnError(ex)
} |> Async.Start
s :> IObservable<'T array>
Seems that
Observable.Merge()
which has overloads for variable number ofIObservables
is closer to what you want.Observable.Buffer()
with the time overloads will do what you want here. In the "no events" situation, Buffer will still OnNext() an empty list, letting you react to that stiuation.
This is the best I could come up with. I've been wanting to solve this for a while.
public static class Extensions
{
public static IObservable<IEnumerable<T>> CombineLatest<T>(this Observable observable, IEnumerable<IObservable<T>> observableCollection)
{
return observableCollection.CombineLatest();
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> observables)
{
return observables.Aggregate<IObservable<T>, IObservable<IEnumerable<T>>>
(
Observable.Return(Enumerable.Empty<T>()),
(o, n) => o.CombineLatest
(
n,
(list, t) => list.Concat(EnumerableEx.Return(t))
)
);
}
}
So an example usage would be:
var obs = new List<IObservable<bool>>
{
Observable.Return(true),
Observable.Return(false),
Observable.Return(true)
};
var result = obs.CombineLatest().Select(list => list.All(x => x));
result.Subscribe(Console.WriteLine);
Console.ReadKey();
You would have to operate on the knowledge, though, that the resulting IObservable<IEnumerable<T>>
will not fire until all observables have yielded a value. This is what I needed in my scenarios, but might not be appropriate for your scenario.
My worry with this is the performance of all of the .Concats. Might be more performant to deal in a mutable collection in the extension method. Not sure.
Sorry, I don't know F#. I'll get around to it one of these days.
Throttling is just done with the .Throttle
operator after you get your final observable.
Edit: This answer is the iterative Ying to Enigmativity's recursive Yang.
精彩评论