开发者

How to throttle a producer in a producer/consumer situation on a TChan in Haskell?

We have something dumping values on a TChan, and then a consumer processing those. But the consumer can't keep up, so we're getting lots of m开发者_开发技巧emory usage as the producer is dumping lots of stuff on the channel, but consumer isn't keeping up. Is there a straightforward way to have the producer block if the channel queue becomes a certain size or something so we can have the producer wait for the consumer to catch up?


Like John's answer, I'd suggest just building a bounded TChan yourself. My code is different because it:

  1. adds abstraction (makes BTChan an ADT)
  2. removes the corner case due to his reading the current size in IO.
  3. tries not to build thunks in the size TVar when reading (it's less important when writing because the thunk can only be "one deep" - the next operation always needs to evaluate the size).
  4. is now on hackage: http://hackage.haskell.org/package/bounded-tchan

NOTE: Honestly, if I were you I'd ignore all these answers and just go with the code ephemient linked to in his comment (unless it turns out to be bad code). I'd bet it does the same I do here but with more thought.

{-# LANGUAGE BangPatterns #-}
module BTChan
        ( BTChan
        , newBTChanIO
        , newBTChan
        , writeBTChan
        , readBTChan
        ) where

import Control.Concurrent.STM

data BTChan a = BTChan {-# UNPACK #-} !Int (TChan a) (TVar  Int)

-- | `newBTChan m` make a new bounded TChan of max size `m`
newBTChanIO :: Int -> IO (BTChan a)
newBTChanIO m = do
    szTV <- newTVarIO 0
    c    <- newTChanIO
    return (BTChan m c szTV)

newBTChan :: Int -> STM (BTChan a)
newBTChan m 
        | m < 1 = error "BTChan's can not have a maximum <= 0!"
        | otherwise = do
        szTV <- newTVar 0
        c    <- newTChan
        return (BTChan m c szTV)

writeBTChan :: BTChan a -> a -> STM ()
writeBTChan (BTChan mx c szTV) x = do
        sz <- readTVar szTV
        if sz >= mx then retry else writeTVar szTV (sz + 1) >> writeTChan c x

readBTChan :: BTChan a -> STM a
readBTChan (BTChan _ c szTV) = do
        x <- readTChan c
        sz <- readTVar szTV
        let !sz' = sz - 1
        writeTVar szTV sz'
        return x

sizeOfBTChan :: BTChan a -> STM Int
sizeOfBTChan (BTChan _ _ sTV) = readTVar sTV

Some things of note for the STM programmer:

  • Explicitly calling retry will yield, putting your haskell thread in a blocked state waiting for the state of one of the TVar or TChan to change so it can retry. This is how you avoid checking values in IO and using the yield function.
  • Like MVars, TVars can refer to thunks, which usually isn't what you want. Perhaps someone should make a hackage package defining STVar, STChan, SBTChan and BTChan (Strict and/or bounded TVars and TChans).
  • It is actually necessary to write newBTChanIO instead of leverage newBTChan because the implementations of new{TVar,TChan}IO are made to work even when under unsafePerformIO, which atomically can not do.

EDIT: you can actually get 2-5 times better performance (depending on the bound you use) by separating out the TVar into one for the reader and one for the writer, thus reducing contention. Verified using criterion. The improved version, 0.2.1, is already on hackage.


Probably the easiest solution is to add a TVar indicating the number of elements in the channel:

type BoundedChan a = (TChan a, TVar Int, Int)

writeBoundedChan :: BoundedChan a -> a -> IO ()
writeBoundedChan bc@(tchan, tsz, maxsz) x = do
  cursz' <- readTVarIO tsz
  if cursz' >= maxsz
    then yield >> writeBoundedChan bc x
    else atomically $ do
      writeTChan tchan a
      cursz <- readTVar tsz
      writeTVar tsz (cursz+1)

readBoundedChan :: BoundedChan a -> IO a
readBoundedChan (tchan, tsz, maxsz) = atomically $ do
  x <- readTChan tchan
  cursz <- readTVar tsz
  writeTVar tsz (cursz-1)
  return x

Note that the max size can be slightly exceeded if you have multiple producers, because the cursz value can change between the two reads.


I know this is a little late to the game, but you could alternatively implement a Skip Channel, which allows non-blocking writes to the channel, but "overwrites" the old value that has not been seen by any reader.

import Control.Concurrent.MVar

data SkipChan a = SkipChan (MVar (a, [MVar ()])) (MVar ())

newSkipChan :: IO (SkipChan a)
newSkipChan = do
    sem <- newEmptyMVar
    main <- newMVar (undefined, [sem])
    return (SkipChan main sem)

putSkipChan :: SkipChan a -> a -> IO ()
putSkipChan (SkipChan main _) v = do
    (_, sems) <- takeMVar main
    putMVar main (v, [])
    mapM_ (\sem -> putMVar sem ()) sems

getSkipChan :: SkipChan a -> IO a
getSkipChan (SkipChan main sem) = do
    takeMVar sem
    (v, sems) <- takeMVar main
    putMVar main (v, sem:sems)
    return v

dupSkipChan :: SkipChan a -> IO (SkipChan a)
dupSkipChan (SkipChan main _) = do
    sem <- newEmptyMVar
    (v, sems) <- takeMVar main
    putMVar main (v, sem:sems)
    return (SkipChan main sem)


There is BoundedChan on hackage, but that uses MVars, not STM. You could use it to learn how to write your own - it is only about a page of code.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜