开发者

Rx Window, Join, GroupJoin?

I have generated/tested two Observables to be combined for executing a single query.

A user can have multiple ro开发者_如何学Goles. Whenever their role id changes the data needs to be updated. But the data should only update if the query is active (there is some control that currently needs the data).

A role Id change can also happen when a query is suspended. When the query becomes active again the data should also load.

//Tuple has the Id of the current Role and the time that the Id updated
IObservable<Tuple<Guid, DateTime>> idUpdate

//Tuple has the state of the query (true=active or false=suspended)
//and the time the state of the query updated
IObservable<Tuple<bool, DateTime>> queryStateUpdate 

I would like to create

//A hot observable that pushes true whenever the query should execute
IObservable<bool> execute 

I broke it down into two cases that could be merged but I cannot figure out how to create the case observables.

  • case a) the role Id updated & the last state was Active
  • case b) the state updated to Active && this is the first active state since the role Id updated

I have looked through the videos, lee campbells site, the beginners TOC, etc but I can't seem to find a good example for this rx join. Any ideas on how to create the execute or case observables?


Given the problem as described - which is a little vague as I don't see what the actual id (Guid) is used for, nor the DateTime values - I've got the following query which appears to solve your problem:

IObservable<bool> execute =
    idUpdate
        .Publish(_idUpdate =>
            from qsu in queryStateUpdate
            select qsu.Item1
                ? _idUpdate.Select(x => true) 
                : Observable.Empty<bool>())
        .Switch();

I've tested this with the following idUpdate & queryStateUpdate observables.

var rnd = new Random();

IObservable<Tuple<Guid, DateTime>> idUpdate =
    Observable
        .Generate(
            0,
            n => n < 10000,
            n => n + 1,
            n => Tuple.Create(Guid.NewGuid(), DateTime.Now),
            n => TimeSpan.FromSeconds(rnd.NextDouble() * 0.1));

IObservable<Tuple<bool, DateTime>> queryStateUpdate =
    Observable
        .Generate(
            0,
            n => n < 100,
            n => n + 1,
            n => n % 2 == 0,
            n => TimeSpan.FromSeconds(rnd.NextDouble() * 2.0))
        .StartWith(true)
        .DistinctUntilChanged()
        .Select(b => Tuple.Create(b, DateTime.Now));

If you can provide some clarification around your problem I will probably be able to provide a better answer to suit your needs.


EDIT: Added the "replay(1)" behaviour required when the Id changes when inactive.

Please note that I have gotten rid of the need to have tuples with DateTime.

IObservable<Guid> idUpdate = ...
IObservable<bool> queryStateUpdate = ...

var replay = new ReplaySubject<Guid>(1);
var disposer = new SerialDisposable();
Func<bool, IObservable<bool>, IObservable<Guid>,
    IObservable<Guid>> getSwitch = (qsu, qsus, iu) =>
{
    if (qsu)
    {
        return replay.Merge(iu);
    }
    else
    {
        replay.Dispose();
        replay = new ReplaySubject<Guid>(1);
        disposer.Disposable = iu.TakeUntil(qsus).Subscribe(replay);
        return Observable.Empty<Guid>();
    }
};

var query =
    queryStateUpdate
        .DistinctUntilChanged()
        .Publish(qsus =>
            idUpdate
            .Publish(ius =>
                qsus
                    .Select(qsu =>
                        getSwitch(qsu, qsus, ius))))
        .Switch();


I read the question as saying that there is a stream of notifications idUpdate, which will be processed as long as queryStateUpdate is set. When queryStateUpdate isn't set, then the notifications should pause until queryStateUpdate is set again.

In which case the join operator is not going to solve your problem.

I would suggest that you need some form of cache while queryStateUpdate is unset, i.e.

List<Tuple<Guid,DateTime>> cache = new List<Tuple<Guid,DateTime>>();
Subject<Tuple<Guid,DateTime>> execute = new Subject<Tuple<Guid,DateTime>>();

idUpdate.Subscribe( x => {
    if (queryStateUpdate.Last().Item1) //might be missing something here with Last, you might need to copy the state out
        exeucte.OnNext(x);
    else
        cache.Add(x);
    });

queryStateUpdate.Subscribe(x=> {
    if (x.Item1)
    {
       //needs threadsafety
       foreach(var x in cache)
           execute.OnNext(x);
      cache.Clear();
    });


Thanks to Enigmativity and AlSki. Using a cache I came up with the answer.

var execute = new Subject<Guid>();
var cache = new Stack<Guid>();
idUpdate.CombineLatest(queryStateUpdate, (id, qs) => new { id, qs }).Subscribe( anon =>
{
  var id = anon.id;
  var queryState = anon.qs;
  //The roleId updated after the queryState updated
  if (id.Item2 > queryState.Item2)
  {
      //If the queryState is active, call execute
      if (observationState.Item1)
      {
         cache.Clear();
         execute.OnNext(roleId.Item1);
         return;
      }
      //If the id updated and the state is suspended, cache it
      cache.Push(id.Item1);
  }
  //The queryState updated after the roleId
  else if (queryState.Item2 > roleId.Item2)
  {
     //If the queryState is active and a roleId update has been cached, call execute
     if (queryState.Item1 && cache.Count > 0)
     {
         execute.OnNext(cache.Pop());
         cache.Clear();
     }
  }});
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜