开发者

Observable.FromAsyncPattern with UDPClient.EndReceive and ref remote endpoint Parameter

I'm learning about Reactive extensions and trying to re-factor some of my code.

UDPClient.EndReceive takes a ref IPEndPoint parameter, so I currently have this working:

UdpClient receiverUDP = new UdpClient();
receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
receiverUDP.EnableBroadcast = true;
receiverUDP.Client.ExclusiveAddressUse = false;
receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));

IPEndPoint ep = null;
var async = Observable.FromAsyncPattern<byte[]>(receiverUDP.BeginReceive, (i) => receiverUDP.EndReceive(i, ref ep));
var subscr = async().Subscribe(x => Console.WriteLine(ASCIIEncoding.A开发者_运维知识库SCII.GetString(x)));

What if my subscribers need access to the remote IPEndPoint? In my current incarnation I'm using events, and passing back a custom class which wraps byte[] and IPEndPoint. I cannot for the life of me, work out how to do this with Rx.


If you've already created a wrapper class for byte[] and IPEndPoint why not return that as the sequence using Select:

private IObservable<RemoteData> GetRemoteDataAsync()
{
    return Observable.Defer(() => 
    {
        UdpClient receiverUDP = new UdpClient();
        receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, 
            SocketOptionName.ReuseAddress, true);
        receiverUDP.EnableBroadcast = true;
        receiverUDP.Client.ExclusiveAddressUse = false;
        receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));

        IPEndPoint ep = null;
        return Observable.FromAsyncPattern<byte[]>(
                   receiverUDP.BeginReceive, 
                   (i) => receiverUDP.EndReceive(i, ref ep)
               )()
               .Select(bytes => new RemoteData(bytes, ep));
    });
}


For anyone else looking, there's a slightly simpler, and more modern way to do this using ReceiveAsync:

public static IObservable<UdpReceiveResult> UdpStream(IPEndPoint endpoint)
{
    return Observable.Using(() => new UdpClient(endpoint),
        udpClient => Observable.Defer(() =>
            udpClient.ReceiveAsync().ToObservable()).Repeat());
}

You can call it with IPAddress.Any:

var stream = UdpStream(new IPEndPoint(IPAddress.Any, 514));

and then use Select to project the stream to whatever type you want.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜