Rx Window, Join, GroupJoin?
I have generated/tested two Observables to be combined for executing a single query.
A user can have multiple ro开发者_如何学Goles. Whenever their role id changes the data needs to be updated. But the data should only update if the query is active (there is some control that currently needs the data).
A role Id change can also happen when a query is suspended. When the query becomes active again the data should also load.
//Tuple has the Id of the current Role and the time that the Id updated
IObservable<Tuple<Guid, DateTime>> idUpdate
//Tuple has the state of the query (true=active or false=suspended)
//and the time the state of the query updated
IObservable<Tuple<bool, DateTime>> queryStateUpdate
I would like to create
//A hot observable that pushes true whenever the query should execute
IObservable<bool> execute
I broke it down into two cases that could be merged but I cannot figure out how to create the case observables.
- case a) the role Id updated & the last state was Active
- case b) the state updated to Active && this is the first active state since the role Id updated
I have looked through the videos, lee campbells site, the beginners TOC, etc but I can't seem to find a good example for this rx join. Any ideas on how to create the execute or case observables?
Given the problem as described - which is a little vague as I don't see what the actual id (Guid
) is used for, nor the DateTime
values - I've got the following query which appears to solve your problem:
IObservable<bool> execute =
idUpdate
.Publish(_idUpdate =>
from qsu in queryStateUpdate
select qsu.Item1
? _idUpdate.Select(x => true)
: Observable.Empty<bool>())
.Switch();
I've tested this with the following idUpdate
& queryStateUpdate
observables.
var rnd = new Random();
IObservable<Tuple<Guid, DateTime>> idUpdate =
Observable
.Generate(
0,
n => n < 10000,
n => n + 1,
n => Tuple.Create(Guid.NewGuid(), DateTime.Now),
n => TimeSpan.FromSeconds(rnd.NextDouble() * 0.1));
IObservable<Tuple<bool, DateTime>> queryStateUpdate =
Observable
.Generate(
0,
n => n < 100,
n => n + 1,
n => n % 2 == 0,
n => TimeSpan.FromSeconds(rnd.NextDouble() * 2.0))
.StartWith(true)
.DistinctUntilChanged()
.Select(b => Tuple.Create(b, DateTime.Now));
If you can provide some clarification around your problem I will probably be able to provide a better answer to suit your needs.
EDIT: Added the "replay(1)" behaviour required when the Id changes when inactive.
Please note that I have gotten rid of the need to have tuples with DateTime
.
IObservable<Guid> idUpdate = ...
IObservable<bool> queryStateUpdate = ...
var replay = new ReplaySubject<Guid>(1);
var disposer = new SerialDisposable();
Func<bool, IObservable<bool>, IObservable<Guid>,
IObservable<Guid>> getSwitch = (qsu, qsus, iu) =>
{
if (qsu)
{
return replay.Merge(iu);
}
else
{
replay.Dispose();
replay = new ReplaySubject<Guid>(1);
disposer.Disposable = iu.TakeUntil(qsus).Subscribe(replay);
return Observable.Empty<Guid>();
}
};
var query =
queryStateUpdate
.DistinctUntilChanged()
.Publish(qsus =>
idUpdate
.Publish(ius =>
qsus
.Select(qsu =>
getSwitch(qsu, qsus, ius))))
.Switch();
I read the question as saying that there is a stream of notifications idUpdate
, which will be processed as long as queryStateUpdate
is set. When queryStateUpdate
isn't set, then the notifications should pause until queryStateUpdate
is set again.
In which case the join operator is not going to solve your problem.
I would suggest that you need some form of cache while queryStateUpdate
is unset, i.e.
List<Tuple<Guid,DateTime>> cache = new List<Tuple<Guid,DateTime>>();
Subject<Tuple<Guid,DateTime>> execute = new Subject<Tuple<Guid,DateTime>>();
idUpdate.Subscribe( x => {
if (queryStateUpdate.Last().Item1) //might be missing something here with Last, you might need to copy the state out
exeucte.OnNext(x);
else
cache.Add(x);
});
queryStateUpdate.Subscribe(x=> {
if (x.Item1)
{
//needs threadsafety
foreach(var x in cache)
execute.OnNext(x);
cache.Clear();
});
Thanks to Enigmativity and AlSki. Using a cache I came up with the answer.
var execute = new Subject<Guid>();
var cache = new Stack<Guid>();
idUpdate.CombineLatest(queryStateUpdate, (id, qs) => new { id, qs }).Subscribe( anon =>
{
var id = anon.id;
var queryState = anon.qs;
//The roleId updated after the queryState updated
if (id.Item2 > queryState.Item2)
{
//If the queryState is active, call execute
if (observationState.Item1)
{
cache.Clear();
execute.OnNext(roleId.Item1);
return;
}
//If the id updated and the state is suspended, cache it
cache.Push(id.Item1);
}
//The queryState updated after the roleId
else if (queryState.Item2 > roleId.Item2)
{
//If the queryState is active and a roleId update has been cached, call execute
if (queryState.Item1 && cache.Count > 0)
{
execute.OnNext(cache.Pop());
cache.Clear();
}
}});
精彩评论