Reactive Extensions seem very slow - am I doing something wrong?
I'm evaluating Rx for a trading platform project that will need to process thousands of messages a second. The existing platform has a complex event routing system (multicast delegates) that responds to these messages and performs a lot of subsequent processing.
I've looked at Reactive Extensions for the obvious benefits but noticed it's somewhat slower, usual 100 times slower.
I've created unit test to demonstrate this which runs a simple increment 1 million times, using various Rx flavours and a straight-out-the-box delegate "control" test.
Here are the results:
Delegate - (1000000) - 00:00:00.0410000
Observable.Range() - (1000000) - 00:00:04.8760000
Subject.Subscribe() - NewThread - (1000000) - 00:00:02.7630000
Subject.Subscribe() - CurrentThread - (1000000) - 00:00:03.0280000
Subject.Subscribe() - Immediate - (1000000) - 00:00:03.0030000
Subject.Subscribe() - ThreadPool - (1000000) - 00:00:02.9800000
Subject.Subscribe() - Dispatcher - (1000000) - 00:00:03.0360000
As you can see, all the Rx methods are ~100 times slower than a delegate equivalent. Obviously Rx is doing a lot under the covers that will be of use in a more complex example, but this just seems incredibly slow.
Is this normal or are my testing assumptions invalid? Nunit code for the above below -
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using NUnit.Framework;
using System.Concurrency;
namespace RxTests
{
[TestFixture]
class ReactiveExtensionsBenchmark_Tests
开发者_StackOverflow中文版 {
private int counter = 0;
[Test]
public void ReactiveExtensionsPerformanceComparisons()
{
int iterations = 1000000;
Action<int> a = (i) => { counter++; };
DelegateSmokeTest(iterations, a);
ObservableRangeTest(iterations, a);
SubjectSubscribeTest(iterations, a, Scheduler.NewThread, "NewThread");
SubjectSubscribeTest(iterations, a, Scheduler.CurrentThread, "CurrentThread");
SubjectSubscribeTest(iterations, a, Scheduler.Immediate, "Immediate");
SubjectSubscribeTest(iterations, a, Scheduler.ThreadPool, "ThreadPool");
SubjectSubscribeTest(iterations, a, Scheduler.Dispatcher, "Dispatcher");
}
public void ObservableRangeTest(int iterations, Action<int> action)
{
counter = 0;
long start = DateTime.Now.Ticks;
Observable.Range(0, iterations).Subscribe(action);
OutputTestDuration("Observable.Range()", start);
}
public void SubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;
var eventSubject = new Subject<int>();
var events = eventSubject.SubscribeOn(scheduler); //edited - thanks dtb
events.Subscribe(action);
long start = DateTime.Now.Ticks;
Enumerable.Range(0, iterations).ToList().ForEach
(
a => eventSubject.OnNext(1)
);
OutputTestDuration("Subject.Subscribe() - " + mode, start);
}
public void DelegateSmokeTest(int iterations, Action<int> action)
{
counter = 0;
long start = DateTime.Now.Ticks;
Enumerable.Range(0, iterations).ToList().ForEach
(
a => action(1)
);
OutputTestDuration("Delegate", start);
}
/// <summary>
/// Output helper
/// </summary>
/// <param name="test"></param>
/// <param name="duration"></param>
public void OutputTestDuration(string test, long duration)
{
Debug.WriteLine(string.Format("{0, -40} - ({1}) - {2}", test, counter, ElapsedDuration(duration)));
}
/// <summary>
/// Test timing helper
/// </summary>
/// <param name="elapsedTicks"></param>
/// <returns></returns>
public string ElapsedDuration(long elapsedTicks)
{
return new TimeSpan(DateTime.Now.Ticks - elapsedTicks).ToString();
}
}
}
My guess is that the Rx team focuses on building the functionality first and doesn't care about performance optimization yet.
Use a profiler to determine bottlenecks and replace slow Rx classes with your own optimized versions.
Below are two examples.
Results:
Delegate - (1000000) - 00:00:00.0368748 Simple - NewThread - (1000000) - 00:00:00.0207676 Simple - CurrentThread - (1000000) - 00:00:00.0214599 Simple - Immediate - (1000000) - 00:00:00.0162026 Simple - ThreadPool - (1000000) - 00:00:00.0169848 FastSubject.Subscribe() - NewThread - (1000000) - 00:00:00.0588149 FastSubject.Subscribe() - CurrentThread - (1000000) - 00:00:00.0508842 FastSubject.Subscribe() - Immediate - (1000000) - 00:00:00.0513911 FastSubject.Subscribe() - ThreadPool - (1000000) - 00:00:00.0529137
First of all, it seems to matter a lot how the observable is implemented. Here's an observable that cannot be unsubscribed from, but it's fast:
private IObservable<int> CreateFastObservable(int iterations)
{
return Observable.Create<int>(observer =>
{
new Thread(_ =>
{
for (int i = 0; i < iterations; i++)
{
observer.OnNext(i);
}
observer.OnCompleted();
}).Start();
return () => { };
});
}
Test:
public void SimpleObserveTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;
var start = Stopwatch.StartNew();
var observable = CreateFastObservable(iterations);
observable.SubscribeOn(scheduler).Run(action);
OutputTestDuration("Simple - " + mode, start);
}
Subjects add a lot of overhead. Here's a subject that is stripped of much of the functionality expected from a subject, but it's fast:
class FastSubject<T> : ISubject<T>
{
private event Action onCompleted;
private event Action<Exception> onError;
private event Action<T> onNext;
public FastSubject()
{
onCompleted += () => { };
onError += error => { };
onNext += value => { };
}
public void OnCompleted()
{
this.onCompleted();
}
public void OnError(Exception error)
{
this.onError(error);
}
public void OnNext(T value)
{
this.onNext(value);
}
public IDisposable Subscribe(IObserver<T> observer)
{
this.onCompleted += observer.OnCompleted;
this.onError += observer.OnError;
this.onNext += observer.OnNext;
return Disposable.Create(() =>
{
this.onCompleted -= observer.OnCompleted;
this.onError -= observer.OnError;
this.onNext -= observer.OnNext;
});
}
}
Test:
public void FastSubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;
var start = Stopwatch.StartNew();
var observable = new ConnectableObservable<int>(CreateFastObservable(iterations), new FastSubject<int>()).RefCount();
observable.SubscribeOn(scheduler).Run(action);
OutputTestDuration("FastSubject.Subscribe() - " + mode, start);
}
Update for Rx 2.0: I took the code from the original post with (almost) the latest Linqpad beta 4.42.04 (well there's a 06, but anyway):
... and adjusted it slightly to use the new Rx v2 scheduler syntax:
public void ReactiveExtensionsPerformanceComparisons()
{
int iterations = 1000000;
Action<int> a = (i) => { counter++; };
DelegateSmokeTest(iterations, a);
ObservableRangeTest(iterations, a);
SubjectSubscribeTest(iterations, a, NewThreadScheduler.Default, "NewThread");
SubjectSubscribeTest(iterations, a, CurrentThreadScheduler.Instance, "CurrentThread");
SubjectSubscribeTest(iterations, a, ImmediateScheduler.Instance, "Immediate");
SubjectSubscribeTest(iterations, a, ThreadPoolScheduler.Instance, "ThreadPool");
// I *think* this is the same as the ThreadPool scheduler in my case
SubjectSubscribeTest(iterations, a, DefaultScheduler.Instance, "Default");
// doesn't work, as LinqPad has no Dispatcher attched to the Gui thread, maybe there's a workaround; the Instance property on it is obsolete
//SubjectSubscribeTest(iterations, a, DispatcherScheduler.Current, "ThreadPool");
}
Note: results vary wildly, in rare cases Threadpool beats newThread, but in most cases NewThread has a slight edge above the schedulers below it in the list:
Delegate - (1000000) - 00:00:00.0440025
Observable.Range() - (1000000) - 00:00:01.9251101
Subject.Subscribe() - NewThread - (1000000) - 00:00:00.0400023
Subject.Subscribe() - CurrentThread - (1000000) - 00:00:00.0530030
Subject.Subscribe() - Immediate - (1000000) - 00:00:00.0490028
Subject.Subscribe() - ThreadPool - (1000000) - 00:00:00.0490028
Subject.Subscribe() - Default - (1000000) - 00:00:00.0480028
So it seems they worked pretty hard on performance..
Remember that your Delegate doesn't guarantee any thread safety - it literally calls the delegate from whatever thread it's called from, whereas when you call Observable.ObserveOn to marshall notifications to other threads, Rx.NET has to do locking to make sure it does what you think it does.
So, Delegates might move super fast, but if you want to build something practical using it, you're going to end up building synchronization by-hand which will slow you down. That being said, Rx, just like LINQ, is an abstraction - if you need it to be ridiculously fast, you have to start writing ugly code.
精彩评论