开发者

F#, System.IO.IOException: All pipe instances are busy

I have a F# application that communicates with a java application via named pipe. Where F# acts as server and java acts as client. The application works for the most part except that the F# runes in to “System.IO.IOException: All pipe instances are busy” error occasionally. Below is the full stack trace of the exception and code snip of both F# and Java. Any help is appreciated in resolving this issue

Thanks, Sudaly

full stack trace:

Unhandled Exception: System.IO.IOException: All pipe instances are busy.
   at Microsoft.FSharp.Control.CancellationTokenOps.Start@1143-1.Invoke(Exception e)
   at <StartupCode$FSharp-Core>.$Control.loop@413-38(Trampoline this, FSharpFunc`2 action)
   at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   at <StartupCode$FSharp-Core>.$Control.-ctor@473-1.Invoke(Object state)
   at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
   at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

F# Code:

[<DataContract>] 
type Quote = { 
    [<field: DataMember(Name="securityIdentifier") >] 
    RicCode:string
    [<field: DataMember(Name="madeOn") >] 
    MadeOn:DateTime
    [<field: DataMember(Name="closePrice") >] 
    Price:int 
    }

let globalPriceCache = new Dictionary<string, Quote>()

let ParseQuoteString (quoteString:string) = 开发者_开发百科
    let data = Encoding.Unicode.GetBytes(quoteString)
    let stream = new MemoryStream() 
    stream.Write(data, 0, data.Length); 
    stream.Position <- 0L 
    let ser = Json.DataContractJsonSerializer(typeof<Quote array>) 
    let results:Quote array = ser.ReadObject(stream) :?> Quote array
    results

let RefreshCache quoteList =
    globalPriceCache.Clear() 
    quoteList 
    |> Array.iter(fun result->globalPriceCache.Add(result.RicCode, result)) 

let EstablishConnection() =
    let pipeServer = new NamedPipeServerStream("testpipe", PipeDirection.InOut, 4)
    pipeServer.WaitForConnection()
    try
        Some(new StreamReader(pipeServer))
    with e -> 
        None

let rec MarketPriceCache() =
    match EstablishConnection() with
    |Some(sr) ->
        // Read request from the stream.
        let m_cache = 
            sr.ReadLine()  
            |>  ParseQuoteString  
            |>  RefreshCache

        MarketPriceCache()
    | _ -> () 


[<EntryPoint>]
let main args=
    try
        async { 
            MarketPriceCache() 
        } |> Async.Start

        while true do
            if globalPriceCache.Count > 0 then
    //Business logic
                System.Threading.Thread.Sleep(1000 * 50)
            else

                ignore(logInfo(sprintf "%s" "Price Cache is empty"))
                System.Threading.Thread.Sleep(1000 * 65)

    with e ->
        ignore(logError e.Message)
        ignore(logError e.StackTrace)    
    0

Java Code:

public void WatchForPrice()
 {
  while (true)
  {
   try 
   {
    Map<String, SecurityQuoteCacheEntry> priceMap = getPriceCache().getCacheMap();
    List<LocalSecurityQuote> localSecurityQuotes = new ArrayList<LocalSecurityQuote>();
    if(priceMap != null)
    {

     Set<String> keySet = priceMap.keySet();
     System.out.println("Key Size: " + keySet.size());
     for(String key : keySet)
     {
      SecurityQuote quote =  priceMap.get(key).getQuote();
      if(quote != null)
      {
       LocalSecurityQuote localSecurityQuote = new LocalSecurityQuote();
       localSecurityQuote.setClosePrice(quote.getClosePrice());
       localSecurityQuote.setMadeOn(quote.getMadeOn());     
       localSecurityQuote.setSecurityIdentifier(key);
       localSecurityQuotes.add(localSecurityQuote);
      }

     }

     JSONSerializer serializer = new JSONSerializer();
     String jsonString = serializer.serialize(localSecurityQuotes);

     // Connect to the pipe
     RandomAccessFile pipe = new RandomAccessFile("\\\\.\\pipe\\testpipe", "rw");
     if (pipe != null )
     {
      // write to pipe
      pipe.write(jsonString.getBytes());
      pipe.close();

     }
     else
      System.out.println("Pipe not found");
     doPeriodicWait();
    }
    else 
     System.out.println("No Price data found");
   }
   catch (Exception e) 
   {
    e.printStackTrace();
    System.out.println(e.getMessage());
    doPeriodicWait();
   }
  }
 }


This is a hunch, but maybe the issue is that you are not closing your pipe stream reader?

let rec MarketPriceCache() =
match EstablishConnection() with
|Some(sr) ->
    // Read request from the stream.        
    try        
        sr.ReadLine()  
        |>  ParseQuoteString  
        |>  RefreshCache   
    finally
        sr.Close()
    MarketPriceCache()
| _ -> () 

(m_cache variable is not needed - you are not using it anywhere)


You have to dispose of the NamedPipeServerStream every time you create one. The easiest way to do this in your code is to dispose of the StreamReader inside MarketPriceCache by putting a use statement around it:

let rec MarketPriceCache() =
    match EstablishConnection() with
    | Some(sr) ->
        // Read request from the stream.
        use reader = sr in
        (
            let m_cache = 
                reader.ReadLine()  
                |>  ParseQuoteString  
                |>  RefreshCache
        )
        MarketPriceCache()
    | _ -> ()

The syntax with using ... in is there to prevent that the scope of the reader ends after the recursive call to MarketPriceCache.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜