开发者

Reactive linq expression on 2 IObservable's

In the following code if I understand joins in RX correctly, I should see the following alerts occur:

  • West
  • Test
  • Test-West*
  • Done

I get 3 of the 4 alerts I expect... why aren't I receiving "Test-West" as well?

public partial class MainWindow : Window
{
    public MainWindow()
    {
        InitializeComponent(); 

        var loginInitial = new LoginInitial();
        var loginCheckList = new LoginCheckList();



        var result1 = from x in loginInitial.Status
                    from y in loginCheckList.Status
                    where x == "Test" && y == "West"
                    select new { x, y };

        result1.Subscribe(x => MessageBox.Show(x.x + "-" + x.y));

        var result2 = from x in loginInitial.Status
                      where x == "Test"
                      select x;

        result2.Subscribe(x => MessageBox.Show(x));

        var result3 = from x in loginCheckList.Status
                      where x == "West"
                      select x;

        result3.Subscribe(x => MessageBox.Show(x));

        var task1 = Task.Factory.StartNew(() =>
                                  {
                                      for (int i = 0; i < 10000000; i++)
                                      {
                                          if (i == 9000000)
                                              loginInitial.Status.Publish("9000000");
                                          if (i == 9000001)
                                              loginInitial.Status.Publish("Test");
                                      }
                                  });

        var task2 = Task.Factory.StartNew(() =>
                                  {
                                        for (int i = 0; i < 1000000; i++)
                                        {
                                            if (i == 800000)
                                                loginInitial.Status.Publish("800000");
                                            if (i == 800001)
                                                loginCheckList.Status.Publish("West");
                                        }
                                  });
        Task.WaitAll(task1, task2);

        MessageBox.Show("Done");
    }
}

public class LoginInitial
{
    public PublishObservable<string> Status = new PublishObservable<string>(); 
}

public class LoginCheckList
{
    public PublishObservable<string> Status = new Publi开发者_如何学PythonshObservable<string>();
}

public class PublishObservable<T> : IObservable<T>
{
    private IList<IObserver<T>> _observers = new List<IObserver<T>>();

    public void Publish(T value)
    {
        lock (_observers)
        {
            foreach (var observer in _observers)
            {
                observer.OnNext(value);
            }
        }
    }

    public void Complete()
    {
        lock (_observers)
        {
            foreach (var observer in _observers)
            {
                observer.OnCompleted();
            }
        }
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (_observers)
        {
            _observers.Add(observer);
        }
        return null;
    }
}


When you use the from clause in Rx, you're saying that the rest of the clause should run for all occurrences of the observable. For nested from clauses this means that you're waiting for the first occurrence of the first event and then start running the rest of the clause for this occurrence (and then do the same thing in parallel for all future occurrences). You can find more information on how SelectMany works for example here.

When you look at your example:

var result1 = 
  from x in loginInitial.Status
  from y in loginCheckList.Status
  where x == "Test" && y == "West"
  select new { x, y }; 

...this means that the clause needs to wait for loginInitial.Status. When this triggers a value, it starts waiting for loginCheckList.Status. If I understand your code correctly, the Initial observable will produce a value after the CheckList observable, so by the time you start waiting for the second one, the value has already been generated and you will not get it again.

I think that a more appropriate operation in your case would be Observable.Zip or CombineLatest (see this and this).


Tomas Petricek pretty much explains why this is happening. I'll just add a solution as an example.

As well as adjusting result1 to use CombineLatest (which also needs to use extension method syntax as opposed to linq syntax), I've changed the implementation to use Subject which will remove the need to create your own implementation of IObservable. I've also changed your implementations that uses multiple subscriptions into a single subscription by mergin geach result observable through Observable.Merge.

public partial class MainWindow : Window
{
    public MainWindow()
    {
        InitializeComponent(); 

        var loginInitial = new Subject<String>();
        var loginCheckList = new Subject<String>();

        var result1 = loginInitial.CombineLatest(loginCheckList, 
                (x, y) => new Tuple<string, string>(x, y))
            .Where(latest => latest.Item1 == "Test" && latest.Item2 == "West")
            .Select(latest => string.Format("{0} - {1}", latest.Item1, latest.Item2));

        var result2 = from x in loginInitial
                        where x == "Test"
                        select x;

        var result3 = from x in loginCheckList
                        where x == "West"
                        select x;

        Observable.Merge(result1, result2, result3)
            .Subscribe(Console.WriteLine);

        var task1 = Task.Factory.StartNew(() =>
        {
            for (int i = 0; i < 10000000; i++)
            {
                if (i == 9000000)
                    loginInitial.OnNext("9000000");
                if (i == 9000001)
                    loginInitial.OnNext("Test");
            }
        });

        var task2 = Task.Factory.StartNew(() =>
        {
            for (int i = 0; i < 1000000; i++)
            {
                if (i == 800000)
                    loginInitial.OnNext("800000");
                if (i == 800001)
                    loginCheckList.OnNext("West");
            }
        });

        Task.WaitAll(task1, task2);

        Console.WriteLine("Done");
    }
}

Note 1 - I've used CombineLatest here but you could just as easily change it to use Zip depending on the behavior you need. Check out the marble diagrams on the RxAs pages for Zip and CombineLatest for a better idea of how each behaves.

Note 2 - I would probably change result2 and result3 to use extension method syntax so that there isn't a mix of approaches in one method. Nothing wrong with it the way it is but I'd prefer the consistency of using one type of syntax where possible.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜