Observable.FromAsyncPattern with UDPClient.EndReceive and ref remote endpoint Parameter
I'm learning about Reactive extensions and trying to re-factor some of my code.
UDPClient.EndReceive
takes a ref IPEndPoint
parameter, so I currently have this working:
UdpClient receiverUDP = new UdpClient();
receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
receiverUDP.EnableBroadcast = true;
receiverUDP.Client.ExclusiveAddressUse = false;
receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));
IPEndPoint ep = null;
var async = Observable.FromAsyncPattern<byte[]>(receiverUDP.BeginReceive, (i) => receiverUDP.EndReceive(i, ref ep));
var subscr = async().Subscribe(x => Console.WriteLine(ASCIIEncoding.A开发者_运维知识库SCII.GetString(x)));
What if my subscribers need access to the remote IPEndPoint? In my current incarnation I'm using events, and passing back a custom class which wraps byte[]
and IPEndPoint
. I cannot for the life of me, work out how to do this with Rx.
If you've already created a wrapper class for byte[]
and IPEndPoint
why not return that as the sequence using Select
:
private IObservable<RemoteData> GetRemoteDataAsync()
{
return Observable.Defer(() =>
{
UdpClient receiverUDP = new UdpClient();
receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket,
SocketOptionName.ReuseAddress, true);
receiverUDP.EnableBroadcast = true;
receiverUDP.Client.ExclusiveAddressUse = false;
receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));
IPEndPoint ep = null;
return Observable.FromAsyncPattern<byte[]>(
receiverUDP.BeginReceive,
(i) => receiverUDP.EndReceive(i, ref ep)
)()
.Select(bytes => new RemoteData(bytes, ep));
});
}
For anyone else looking, there's a slightly simpler, and more modern way to do this using ReceiveAsync
:
public static IObservable<UdpReceiveResult> UdpStream(IPEndPoint endpoint)
{
return Observable.Using(() => new UdpClient(endpoint),
udpClient => Observable.Defer(() =>
udpClient.ReceiveAsync().ToObservable()).Repeat());
}
You can call it with IPAddress.Any:
var stream = UdpStream(new IPEndPoint(IPAddress.Any, 514));
and then use Select
to project the stream to whatever type you want.
精彩评论