F#, System.IO.IOException: All pipe instances are busy
I have a F# application that communicates with a java application via named pipe. Where F# acts as server and java acts as client. The application works for the most part except that the F# runes in to “System.IO.IOException: All pipe instances are busy” error occasionally. Below is the full stack trace of the exception and code snip of both F# and Java. Any help is appreciated in resolving this issue
Thanks, Sudaly
full stack trace:
Unhandled Exception: System.IO.IOException: All pipe instances are busy.
at Microsoft.FSharp.Control.CancellationTokenOps.Start@1143-1.Invoke(Exception e)
at <StartupCode$FSharp-Core>.$Control.loop@413-38(Trampoline this, FSharpFunc`2 action)
at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
at <StartupCode$FSharp-Core>.$Control.-ctor@473-1.Invoke(Object state)
at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()
F# Code:
[<DataContract>]
type Quote = {
[<field: DataMember(Name="securityIdentifier") >]
RicCode:string
[<field: DataMember(Name="madeOn") >]
MadeOn:DateTime
[<field: DataMember(Name="closePrice") >]
Price:int
}
let globalPriceCache = new Dictionary<string, Quote>()
let ParseQuoteString (quoteString:string) = 开发者_开发百科
let data = Encoding.Unicode.GetBytes(quoteString)
let stream = new MemoryStream()
stream.Write(data, 0, data.Length);
stream.Position <- 0L
let ser = Json.DataContractJsonSerializer(typeof<Quote array>)
let results:Quote array = ser.ReadObject(stream) :?> Quote array
results
let RefreshCache quoteList =
globalPriceCache.Clear()
quoteList
|> Array.iter(fun result->globalPriceCache.Add(result.RicCode, result))
let EstablishConnection() =
let pipeServer = new NamedPipeServerStream("testpipe", PipeDirection.InOut, 4)
pipeServer.WaitForConnection()
try
Some(new StreamReader(pipeServer))
with e ->
None
let rec MarketPriceCache() =
match EstablishConnection() with
|Some(sr) ->
// Read request from the stream.
let m_cache =
sr.ReadLine()
|> ParseQuoteString
|> RefreshCache
MarketPriceCache()
| _ -> ()
[<EntryPoint>]
let main args=
try
async {
MarketPriceCache()
} |> Async.Start
while true do
if globalPriceCache.Count > 0 then
//Business logic
System.Threading.Thread.Sleep(1000 * 50)
else
ignore(logInfo(sprintf "%s" "Price Cache is empty"))
System.Threading.Thread.Sleep(1000 * 65)
with e ->
ignore(logError e.Message)
ignore(logError e.StackTrace)
0
Java Code:
public void WatchForPrice()
{
while (true)
{
try
{
Map<String, SecurityQuoteCacheEntry> priceMap = getPriceCache().getCacheMap();
List<LocalSecurityQuote> localSecurityQuotes = new ArrayList<LocalSecurityQuote>();
if(priceMap != null)
{
Set<String> keySet = priceMap.keySet();
System.out.println("Key Size: " + keySet.size());
for(String key : keySet)
{
SecurityQuote quote = priceMap.get(key).getQuote();
if(quote != null)
{
LocalSecurityQuote localSecurityQuote = new LocalSecurityQuote();
localSecurityQuote.setClosePrice(quote.getClosePrice());
localSecurityQuote.setMadeOn(quote.getMadeOn());
localSecurityQuote.setSecurityIdentifier(key);
localSecurityQuotes.add(localSecurityQuote);
}
}
JSONSerializer serializer = new JSONSerializer();
String jsonString = serializer.serialize(localSecurityQuotes);
// Connect to the pipe
RandomAccessFile pipe = new RandomAccessFile("\\\\.\\pipe\\testpipe", "rw");
if (pipe != null )
{
// write to pipe
pipe.write(jsonString.getBytes());
pipe.close();
}
else
System.out.println("Pipe not found");
doPeriodicWait();
}
else
System.out.println("No Price data found");
}
catch (Exception e)
{
e.printStackTrace();
System.out.println(e.getMessage());
doPeriodicWait();
}
}
}
This is a hunch, but maybe the issue is that you are not closing your pipe stream reader?
let rec MarketPriceCache() =
match EstablishConnection() with
|Some(sr) ->
// Read request from the stream.
try
sr.ReadLine()
|> ParseQuoteString
|> RefreshCache
finally
sr.Close()
MarketPriceCache()
| _ -> ()
(m_cache variable is not needed - you are not using it anywhere)
You have to dispose of the NamedPipeServerStream
every time you create one. The easiest way to do this in your code is to dispose of the StreamReader
inside MarketPriceCache
by putting a use
statement around it:
let rec MarketPriceCache() =
match EstablishConnection() with
| Some(sr) ->
// Read request from the stream.
use reader = sr in
(
let m_cache =
reader.ReadLine()
|> ParseQuoteString
|> RefreshCache
)
MarketPriceCache()
| _ -> ()
The syntax with using ... in
is there to prevent that the scope of the reader ends after the recursive call to MarketPriceCache
.
精彩评论