Threading.Tasks analog in Rx extensions in .NET 3.5?
Is it possible to use Reactive开发者_运维百科 Extensions (Rx) to create applications in .NET 3.5 that perform parallelization or is it limited in some way? I downloaded Rx from here http://www.microsoft.com/download/en/confirmation.aspx?id=26649 and after creating a simple project with referenced reactive assemblies, I failed to find any classes corresponding to Tasks in .NET 4.0. I'm trying to find classes for 'Task', but alas I can not find any. Am I doing something wrong?
As some here mentioned, you can find a backported System.Threading.dll
in an old Rx
installation.
For an easier integration, I have made a nuget (called TaskParallelLibrary) out of it.
You can get it from http://nuget.org/packages/TaskParallelLibrary.
To the best of my knowledge, the last version of the Reactive Extensions to include "the back ported standalone DLL named System.Threading.dll" is Reactive Extensions (Rx) v1.0.2856.0 released 2/11/2011 (in Rx_Net35.msi).
(I actually got it as Rx_Net35.v1.0.2856.0.msi, downloaded in May, I think from CodePlex; can't find that any more... it installed as C:\Program Files (x86)\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2856.0\Net35\System.Threading.dll
on my 64-bit system.)
System.Threading, the assembly for the TPL used to be included in the Rx releases, but no longer is.
You may be able to find it in older versions.
That said, an IObservable<T>
is analogous to a Task<T>
with the key distinction that a Task<T>
can have only 1 result, an IObservable<T>
is a stream of 0 or more results.
Maybe the samples from the rx wiki can help you.
The simplest background task would be:
var o = Observable.Start
(
() =>
{
Console.WriteLine("Calculating...");
Thread.Sleep(3000);
Console.WriteLine("Done.");
}
);
o.First(); // subscribe and wait for completion of background operation
But you may also want to look at the forkjoin examples.
Rx in .NET 3.5 can't use Tasks, it uses the .NET 3.5 Thread Pool - however, as Scott says, IObservable works like Task when the IObservable is one item long.
Please correct me if i am wrong any where.
Observables and observers are mainly related to task - Observe (Wait for data) & Subscribe (for Observers to get data which is pushed).
Observable provides two operators to improve performance of the above two task.
1). Observable.ObserveOn - Asynchronously notify observer on specified schedular. The task can be performed in better way using Schedular where it takes argument of either
- Scheduler.CurrentThread (Observe on current running thread)
- Scheduler.NewThread (Observe everytime on new thread)
- Scheduler.TaskPool (Observe using TaskPool mechanism, i think you might be interested.)
- Scheduler.ThreadPool (Observe using ThreadPool mechanism)
same goes for the second....
2). Observable.SubscribeOn - Asynchronously subscribe - unsubscribe observers on specified schedulers.
It also has same options as above to schedule the subscription.
Thus Rx provides inbuilt capabilities to schedule your process to give fast results.
精彩评论