How to implement an atomic switch from one IObserver to another?
I have an IObservable<byte[]>
that I transform into an IObservable<XDocument>
using some intermediate steps:
var observedXDocuments =
from b in observedBytes
// Lot of intermediate steps to transform byte arrays into XDocuments
select xDoc;
At some point in time, I'm interested in the observed XDocument
s so I subscribe an IObserver<XDocument>
. At a later point in time, I would like to subscribe another IObserver<XDocument>
and dispose of the old one.
How can I do this in one atomic operati开发者_Python百科on, without loosing any observed XDocument
? I could do something like:
oldObserver.Dispose();
observedXDocuments.Subscribe(newObserver);
I'm worried though, that between these two calls, I could loose an XDocument
. If I switch the two calls, it could happen that I receive the same XDocument
twice.
I'd probably add a layer of indirection. Write a class called ExchangeableObserver, subscribe it to your observable, and keep it permanently subscribed. The job of ExchangeableObserver is to delegate everything to a given sub-observer. But the programmer is allowed to change the sub-observer being delegated to at any time. In my example I have an Exchange() method. Something like:
public class ExchangeableObserver<T> : IObserver<T> {
private IObserver<T> inner;
public ExchangeableObserver(IObserver<T> inner) {
this.inner=inner;
}
public IObserver<T> Exchange(IObserver<T> newInner) {
return Interlocked.Exchange(ref inner, newInner);
}
public void OnNext(T value) {
inner.OnNext(value);
}
public void OnCompleted() {
inner.OnCompleted();
}
public void OnError(Exception error) {
inner.OnError(error);
}
}
you can use a semaphore that makes shure that while IObservable<byte[]>
prepares for IObservable<XDocument>
no observer-change takes place.
pseudocode how this could be done (not testet)
System.Threading.ReaderWriterLockSlim criticalSection
= new System.Threading.ReaderWriterLockSlim(...);
... converting from `IObservable<byte[]>` to `IObservable<XDocument>`
criticalSection.EnterReadLock();
Call IObservable<XDocument>
criticalSection.ExitReadLock();
.... replacing IObservable<XDocument>
criticalSection.EnterWriteLock();
Call change IObservable<XDocument>
criticalSection.ExitWriteLock();
Edit: with Call IObservable<XDocument>
> What exactly do you mean with the line `Call IObservable<XDocument>`?
I interprete your sentense
> I have an `IObservable<byte[]>` that I transform
> into an `IObservable<XDocument>` using some intermediate steps...
that you have registered an eventhandler for IObservable<byte[]>
that creates a XDocument
from byte[]
and then calls
something that triggers an event for IObservable<XDocument>
.
Call IObservable<XDocument>
means the code that triggers the followup-event
精彩评论