How to detect a timeout when using asynchronous Socket.BeginReceive?
Writing an asynchronous Ping using Raw Sockets in F#, to enable parallel requests using as few threads as possible. Not using "System.Net.NetworkInformation.Ping", because it appears to allocate one thread per request. Am also interested in using F# async workflows.
The synchronous version below correctly times out when the target host does not exist/respond, but the asynchronous version hangs. Both work when the host does respond. Not sure if this is a .NET issue, or an F# one...
Any ideas?
(note: the process must run as Admin to allow Raw Socket access)
This throws a timeout:
let result = Ping.Ping ( IPAddress.Parse( "192.168.33.22" ), 1000 )
However, this hangs:
let result = Ping.AsyncPing ( IPAddress.Parse( "192.168.33.22" ), 1000 )
|> Async.RunSynchronously
Here's the code...
module Ping
open System
open System.Net
open System.Net.Sockets
open System.Threading
//---- ICMP Packet Classes
type IcmpMessage (t : byte) =
let mutable m_type = t
let mutable m_code = 0uy
let mutable m_checksum = 0us
member this.Type
with get() = m_type
member this.Code
with get() = m_code
member this.Checksum = m_checksum
abstract Bytes : byte array
default this.Bytes
with get() =
[|
m_type
m_code
byte(m_checksum)
byte(m_checksum >>> 8)
|]
member this.GetChecksum() =
let mutable sum = 0ul
let bytes = this.Bytes
let mutable i = 0
// Sum up uint16s
while i < bytes.Length - 1 do
sum <- sum + uint32(BitConverter.ToUInt16( bytes, i ))
i <- i + 2
// Add in last byte, if an odd size buffer
if i <> bytes.Length then
sum <- sum + uint32(bytes.[i])
// Shuffle the bits
sum <- (sum >>> 16) + (sum &&& 0xFFFFul)
sum <- sum + (sum >>> 16)
sum <- ~~~sum
uint16(sum)
member this.UpdateChecksum() =
m_checksum <- this.GetChecksum()
type InformationMessage (t : byte) =
inherit IcmpMessage(t)
let mutable m_identifier = 0us
let mutable m_sequenceNumber = 0us
member this.Identifier = m_identifier
member this.SequenceNumber =开发者_StackOverflow中文版 m_sequenceNumber
override this.Bytes
with get() =
Array.append (base.Bytes)
[|
byte(m_identifier)
byte(m_identifier >>> 8)
byte(m_sequenceNumber)
byte(m_sequenceNumber >>> 8)
|]
type EchoMessage() =
inherit InformationMessage( 8uy )
let mutable m_data = Array.create 32 32uy
do base.UpdateChecksum()
member this.Data
with get() = m_data
and set(d) = m_data <- d
this.UpdateChecksum()
override this.Bytes
with get() =
Array.append (base.Bytes)
(this.Data)
//---- Synchronous Ping
let Ping (host : IPAddress, timeout : int ) =
let mutable ep = new IPEndPoint( host, 0 )
let socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )
let packet = EchoMessage()
let mutable buffer = packet.Bytes
try
if socket.SendTo( buffer, ep ) <= 0 then
raise (SocketException())
buffer <- Array.create (buffer.Length + 20) 0uy
let mutable epr = ep :> EndPoint
if socket.ReceiveFrom( buffer, &epr ) <= 0 then
raise (SocketException())
finally
socket.Close()
buffer
//---- Entensions to the F# Async class to allow up to 5 paramters (not just 3)
type Async with
static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction)
static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction)
//---- Extensions to the Socket class to provide async SendTo and ReceiveFrom
type System.Net.Sockets.Socket with
member this.AsyncSendTo( buffer, offset, size, socketFlags, remoteEP ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP,
this.BeginSendTo,
this.EndSendTo )
member this.AsyncReceiveFrom( buffer, offset, size, socketFlags, remoteEP ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP,
this.BeginReceiveFrom,
(fun asyncResult -> this.EndReceiveFrom(asyncResult, remoteEP) ) )
//---- Asynchronous Ping
let AsyncPing (host : IPAddress, timeout : int ) =
async {
let ep = IPEndPoint( host, 0 )
use socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )
let packet = EchoMessage()
let outbuffer = packet.Bytes
try
let! result = socket.AsyncSendTo( outbuffer, 0, outbuffer.Length, SocketFlags.None, ep )
if result <= 0 then
raise (SocketException())
let epr = ref (ep :> EndPoint)
let inbuffer = Array.create (outbuffer.Length + 256) 0uy
let! result = socket.AsyncReceiveFrom( inbuffer, 0, inbuffer.Length, SocketFlags.None, epr )
if result <= 0 then
raise (SocketException())
return inbuffer
finally
socket.Close()
}
James, your own accepted answer has a problem I wanted to point out. You only allocate one timer, which makes the async object returned by AsyncReceiveEx a stateful one-time-use object. Here's a similar example that I trimmed down:
let b,e,c = Async.AsBeginEnd(Async.Sleep)
type Example() =
member this.Close() = ()
member this.AsyncReceiveEx( sleepTime, (timeoutMS:int) ) =
let timedOut = ref false
let completed = ref false
let timer = new System.Timers.Timer(double(timeoutMS), AutoReset=false)
timer.Elapsed.Add( fun _ ->
lock timedOut (fun () ->
timedOut := true
if not !completed
then this.Close()
)
)
let complete() =
lock timedOut (fun () ->
timer.Stop()
timer.Dispose()
completed := true
)
Async.FromBeginEnd( sleepTime,
(fun st ->
let result = b(st)
timer.Start()
result
),
(fun result ->
complete()
if !timedOut
then printfn "err"; ()
else e(result)
),
(fun () ->
complete()
this.Close()
)
)
let ex = new Example()
let a = ex.AsyncReceiveEx(3000, 1000)
Async.RunSynchronously a
printfn "ok..."
// below throws ODE, because only allocated one Timer
Async.RunSynchronously a
Ideally you want every 'run' of the async returned by AsyncReceiveEx to behave the same, which means each run needs its own timer and set of ref flags. This is easy to fix thusly:
let b,e,c = Async.AsBeginEnd(Async.Sleep)
type Example() =
member this.Close() = ()
member this.AsyncReceiveEx( sleepTime, (timeoutMS:int) ) =
async {
let timedOut = ref false
let completed = ref false
let timer = new System.Timers.Timer(double(timeoutMS), AutoReset=false)
timer.Elapsed.Add( fun _ ->
lock timedOut (fun () ->
timedOut := true
if not !completed
then this.Close()
)
)
let complete() =
lock timedOut (fun () ->
timer.Stop()
timer.Dispose()
completed := true
)
return! Async.FromBeginEnd( sleepTime,
(fun st ->
let result = b(st)
timer.Start()
result
),
(fun result ->
complete()
if !timedOut
then printfn "err"; ()
else e(result)
),
(fun () ->
complete()
this.Close()
)
)
}
let ex = new Example()
let a = ex.AsyncReceiveEx(3000, 1000)
Async.RunSynchronously a
printfn "ok..."
Async.RunSynchronously a
The only change is to put the body of AsyncReceiveEx inside async{...}
and have the last line return!
.
The docs clearly state that the timeout only applies to the sync versions:
http://msdn.microsoft.com/en-us/library/system.net.sockets.socketoptionname.aspx
After some thought, came up with the following. This code adds an AsyncReceiveEx member to Socket, which includes a timeout value. It hides the details of the watchdog timer inside the receive method... very tidy and self contained. Now THIS is what I was looking for!
See the complete async ping example, further below.
Not sure if the locks are necessary, but better safe than sorry...
type System.Net.Sockets.Socket with
member this.AsyncSend( buffer, offset, size, socketFlags, err ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
this.BeginSend,
this.EndSend,
this.Close )
member this.AsyncReceive( buffer, offset, size, socketFlags, err ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
this.BeginReceive,
this.EndReceive,
this.Close )
member this.AsyncReceiveEx( buffer, offset, size, socketFlags, err, (timeoutMS:int) ) =
async {
let timedOut = ref false
let completed = ref false
let timer = new System.Timers.Timer( double(timeoutMS), AutoReset=false )
timer.Elapsed.Add( fun _ ->
lock timedOut (fun () ->
timedOut := true
if not !completed
then this.Close()
)
)
let complete() =
lock timedOut (fun () ->
timer.Stop()
timer.Dispose()
completed := true
)
return! Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
(fun (b,o,s,sf,e,st,uo) ->
let result = this.BeginReceive(b,o,s,sf,e,st,uo)
timer.Start()
result
),
(fun result ->
complete()
if !timedOut
then err := SocketError.TimedOut; 0
else this.EndReceive( result, err )
),
(fun () ->
complete()
this.Close()
)
)
}
Here is a complete Ping example. To avoid running out of source ports and to prevent getting too many replies at once, it scans one class-c subnet at a time.
module Ping
open System
open System.Net
open System.Net.Sockets
open System.Threading
//---- ICMP Packet Classes
type IcmpMessage (t : byte) =
let mutable m_type = t
let mutable m_code = 0uy
let mutable m_checksum = 0us
member this.Type
with get() = m_type
member this.Code
with get() = m_code
member this.Checksum = m_checksum
abstract Bytes : byte array
default this.Bytes
with get() =
[|
m_type
m_code
byte(m_checksum)
byte(m_checksum >>> 8)
|]
member this.GetChecksum() =
let mutable sum = 0ul
let bytes = this.Bytes
let mutable i = 0
// Sum up uint16s
while i < bytes.Length - 1 do
sum <- sum + uint32(BitConverter.ToUInt16( bytes, i ))
i <- i + 2
// Add in last byte, if an odd size buffer
if i <> bytes.Length then
sum <- sum + uint32(bytes.[i])
// Shuffle the bits
sum <- (sum >>> 16) + (sum &&& 0xFFFFul)
sum <- sum + (sum >>> 16)
sum <- ~~~sum
uint16(sum)
member this.UpdateChecksum() =
m_checksum <- this.GetChecksum()
type InformationMessage (t : byte) =
inherit IcmpMessage(t)
let mutable m_identifier = 0us
let mutable m_sequenceNumber = 0us
member this.Identifier = m_identifier
member this.SequenceNumber = m_sequenceNumber
override this.Bytes
with get() =
Array.append (base.Bytes)
[|
byte(m_identifier)
byte(m_identifier >>> 8)
byte(m_sequenceNumber)
byte(m_sequenceNumber >>> 8)
|]
type EchoMessage() =
inherit InformationMessage( 8uy )
let mutable m_data = Array.create 32 32uy
do base.UpdateChecksum()
member this.Data
with get() = m_data
and set(d) = m_data <- d
this.UpdateChecksum()
override this.Bytes
with get() =
Array.append (base.Bytes)
(this.Data)
//---- Entensions to the F# Async class to allow up to 5 paramters (not just 3)
type Async with
static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction)
static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction)
//---- Extensions to the Socket class to provide async SendTo and ReceiveFrom
type System.Net.Sockets.Socket with
member this.AsyncSend( buffer, offset, size, socketFlags, err ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
this.BeginSend,
this.EndSend,
this.Close )
member this.AsyncReceive( buffer, offset, size, socketFlags, err ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
this.BeginReceive,
this.EndReceive,
this.Close )
member this.AsyncReceiveEx( buffer, offset, size, socketFlags, err, (timeoutMS:int) ) =
async {
let timedOut = ref false
let completed = ref false
let timer = new System.Timers.Timer( double(timeoutMS), AutoReset=false )
timer.Elapsed.Add( fun _ ->
lock timedOut (fun () ->
timedOut := true
if not !completed
then this.Close()
)
)
let complete() =
lock timedOut (fun () ->
timer.Stop()
timer.Dispose()
completed := true
)
return! Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
(fun (b,o,s,sf,e,st,uo) ->
let result = this.BeginReceive(b,o,s,sf,e,st,uo)
timer.Start()
result
),
(fun result ->
complete()
if !timedOut
then err := SocketError.TimedOut; 0
else this.EndReceive( result, err )
),
(fun () ->
complete()
this.Close()
)
)
}
//---- Asynchronous Ping
let AsyncPing (ip : IPAddress, timeout : int ) =
async {
use socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
socket.Connect( IPEndPoint( ip, 0 ) )
let pingTime = System.Diagnostics.Stopwatch()
let packet = EchoMessage()
let outbuffer = packet.Bytes
let err = ref (SocketError())
let isAlive = ref false
try
pingTime.Start()
let! result = socket.AsyncSend( outbuffer, 0, outbuffer.Length, SocketFlags.None, err )
pingTime.Stop()
if result <= 0 then
raise (SocketException(int(!err)))
let inbuffer = Array.create (outbuffer.Length + 256) 0uy
pingTime.Start()
let! reply = socket.AsyncReceiveEx( inbuffer, 0, inbuffer.Length, SocketFlags.None, err, timeout )
pingTime.Stop()
if result <= 0 && not (!err = SocketError.TimedOut) then
raise (SocketException(int(!err)))
isAlive := not (!err = SocketError.TimedOut)
&& inbuffer.[25] = 0uy // Type 0 = echo reply (redundent? necessary?)
&& inbuffer.[26] = 0uy // Code 0 = echo reply (redundent? necessary?)
finally
socket.Close()
return (ip, pingTime.Elapsed, !isAlive )
}
let main() =
let pings net =
seq {
for node in 0..255 do
let ip = IPAddress.Parse( sprintf "192.168.%d.%d" net node )
yield Ping.AsyncPing( ip, 1000 )
}
for net in 0..255 do
pings net
|> Async.Parallel
|> Async.RunSynchronously
|> Seq.filter ( fun (_,_,alive) -> alive )
|> Seq.iter ( fun (ip, time, alive) ->
printfn "%A %dms" ip time.Milliseconds)
main()
System.Console.ReadKey() |> ignore
A couple things...
First, it's possible to adapt the .NET FooAsync
/FooCompleted
pattern into an F# async. The FSharp.Core library does this for WebClient; I think you can use the same pattern here. Here's the WebClient code
type System.Net.WebClient with
member this.AsyncDownloadString (address:Uri) : Async<string> =
let downloadAsync =
Async.FromContinuations (fun (cont, econt, ccont) ->
let userToken = new obj()
let rec handler =
System.Net.DownloadStringCompletedEventHandler (fun _ args ->
if userToken = args.UserState then
this.DownloadStringCompleted.RemoveHandler(handler)
if args.Cancelled then
ccont (new OperationCanceledException())
elif args.Error <> null then
econt args.Error
else
cont args.Result)
this.DownloadStringCompleted.AddHandler(handler)
this.DownloadStringAsync(address, userToken)
)
async {
use! _holder = Async.OnCancel(fun _ -> this.CancelAsync())
return! downloadAsync
}
and I think you can do the same for SendAsync
/SendAsyncCancel
/PingCompleted
(I have not thought it through carefully).
Second, name your method AsyncPing
, not PingAsync
. F# async methods are named AsyncFoo
, whereas methods with the event pattern are named FooAsync
.
I didn't look carefully through your code to try to find where the error may lie.
Here is a version using Async.FromContinuations.
However, this is NOT an answer to my problem, because it does not scale. The code may be useful to someone, so posting it here.
The reason this is not an answer is because System.Net.NetworkInformation.Ping appears to use one thread per Ping and quite a bit of memory (likely due to thread stack space). Attempting to ping an entire class-B network will run out of memory and use 100's of threads, whereas the code using raw sockets uses only a few threads and under 10Mb.
type System.Net.NetworkInformation.Ping with
member this.AsyncPing (address:IPAddress) : Async<PingReply> =
let pingAsync =
Async.FromContinuations (fun (cont, econt, ccont) ->
let userToken = new obj()
let rec handler =
PingCompletedEventHandler (fun _ args ->
if userToken = args.UserState then
this.PingCompleted.RemoveHandler(handler)
if args.Cancelled then
ccont (new OperationCanceledException())
elif args.Error <> null then
econt args.Error
else
cont args.Reply)
this.PingCompleted.AddHandler(handler)
this.SendAsync(address, 1000, userToken)
)
async {
use! _holder = Async.OnCancel(fun _ -> this.SendAsyncCancel())
return! pingAsync
}
let AsyncPingTest() =
let pings =
seq {
for net in 0..255 do
for node in 0..255 do
let ip = IPAddress.Parse( sprintf "192.168.%d.%d" net node )
let ping = new Ping()
yield ping.AsyncPing( ip )
}
pings
|> Async.Parallel
|> Async.RunSynchronously
|> Seq.iter ( fun result ->
printfn "%A" result )
EDIT: code changed to working version.
James, I've modified your code and it seems it work as well as your version, but uses MailboxProcessor as timeout handler engine. The code is 4x times slower then your version but uses 1.5x less memory.
let AsyncPing (host: IPAddress) timeout =
let guard =
MailboxProcessor<AsyncReplyChannel<Socket*byte array>>.Start(
fun inbox ->
async {
try
let socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
try
let ep = IPEndPoint( host, 0 )
let packet = EchoMessage()
let outbuffer = packet.Bytes
let! reply = inbox.Receive()
let! result = socket.AsyncSendTo( outbuffer, 0, outbuffer.Length, SocketFlags.None, ep )
if result <= 0 then
raise (SocketException())
let epr = ref (ep :> EndPoint)
let inbuffer = Array.create (outbuffer.Length + 256) 0uy
let! result = socket.AsyncReceiveFrom( inbuffer, 0, inbuffer.Length, SocketFlags.None, epr )
if result <= 0 then
raise (SocketException())
reply.Reply(socket,inbuffer)
return ()
finally
socket.Close()
finally
()
})
async {
try
//#1: blocks thread and as result have large memory footprint and too many threads to use
//let socket,r = guard.PostAndReply(id,timeout=timeout)
//#2: suggested by Dmitry Lomov
let! socket,r = guard.PostAndAsyncReply(id,timeout=timeout)
printfn "%A: ok" host
socket.Close()
with
_ ->
printfn "%A: failed" host
()
}
//test it
//timeout is ms interval
//i.e. 10000 is equal to 10s
let AsyncPingTest timeout =
seq {
for net in 1..254 do
for node in 1..254 do
let ip = IPAddress.Parse( sprintf "192.168.%d.%d" net node )
yield AsyncPing ip timeout
}
|> Async.Parallel
|> Async.RunSynchronously
Possibly see also:
http://blogs.msdn.com/pfxteam/archive/2010/05/04/10007557.aspx
精彩评论