How to throttle a producer in a producer/consumer situation on a TChan in Haskell?
We have something dumping values on a TChan, and then a consumer processing those. But the consumer can't keep up, so we're getting lots of m开发者_开发技巧emory usage as the producer is dumping lots of stuff on the channel, but consumer isn't keeping up. Is there a straightforward way to have the producer block if the channel queue becomes a certain size or something so we can have the producer wait for the consumer to catch up?
Like John's answer, I'd suggest just building a bounded TChan yourself. My code is different because it:
- adds abstraction (makes
BTChan
an ADT) - removes the corner case due to his reading the current size in IO.
- tries not to build thunks in the size TVar when reading (it's less important when writing because the thunk can only be "one deep" - the next operation always needs to evaluate the size).
- is now on hackage: http://hackage.haskell.org/package/bounded-tchan
NOTE: Honestly, if I were you I'd ignore all these answers and just go with the code ephemient linked to in his comment (unless it turns out to be bad code). I'd bet it does the same I do here but with more thought.
{-# LANGUAGE BangPatterns #-}
module BTChan
( BTChan
, newBTChanIO
, newBTChan
, writeBTChan
, readBTChan
) where
import Control.Concurrent.STM
data BTChan a = BTChan {-# UNPACK #-} !Int (TChan a) (TVar Int)
-- | `newBTChan m` make a new bounded TChan of max size `m`
newBTChanIO :: Int -> IO (BTChan a)
newBTChanIO m = do
szTV <- newTVarIO 0
c <- newTChanIO
return (BTChan m c szTV)
newBTChan :: Int -> STM (BTChan a)
newBTChan m
| m < 1 = error "BTChan's can not have a maximum <= 0!"
| otherwise = do
szTV <- newTVar 0
c <- newTChan
return (BTChan m c szTV)
writeBTChan :: BTChan a -> a -> STM ()
writeBTChan (BTChan mx c szTV) x = do
sz <- readTVar szTV
if sz >= mx then retry else writeTVar szTV (sz + 1) >> writeTChan c x
readBTChan :: BTChan a -> STM a
readBTChan (BTChan _ c szTV) = do
x <- readTChan c
sz <- readTVar szTV
let !sz' = sz - 1
writeTVar szTV sz'
return x
sizeOfBTChan :: BTChan a -> STM Int
sizeOfBTChan (BTChan _ _ sTV) = readTVar sTV
Some things of note for the STM programmer:
- Explicitly calling
retry
will yield, putting your haskell thread in a blocked state waiting for the state of one of theTVar
orTChan
to change so it can retry. This is how you avoid checking values inIO
and using theyield
function. - Like MVars, TVars can refer to thunks, which usually isn't what you want. Perhaps someone should make a hackage package defining
STVar
,STChan
,SBTChan
andBTChan
(Strict and/or bounded TVars and TChans). - It is actually necessary to write
newBTChanIO
instead of leveragenewBTChan
because the implementations ofnew{TVar,TChan}IO
are made to work even when underunsafePerformIO
, whichatomically
can not do.
EDIT: you can actually get 2-5 times better performance (depending on the bound you use) by separating out the TVar into one for the reader and one for the writer, thus reducing contention. Verified using criterion. The improved version, 0.2.1, is already on hackage.
Probably the easiest solution is to add a TVar
indicating the number of elements in the channel:
type BoundedChan a = (TChan a, TVar Int, Int)
writeBoundedChan :: BoundedChan a -> a -> IO ()
writeBoundedChan bc@(tchan, tsz, maxsz) x = do
cursz' <- readTVarIO tsz
if cursz' >= maxsz
then yield >> writeBoundedChan bc x
else atomically $ do
writeTChan tchan a
cursz <- readTVar tsz
writeTVar tsz (cursz+1)
readBoundedChan :: BoundedChan a -> IO a
readBoundedChan (tchan, tsz, maxsz) = atomically $ do
x <- readTChan tchan
cursz <- readTVar tsz
writeTVar tsz (cursz-1)
return x
Note that the max size can be slightly exceeded if you have multiple producers, because the cursz value can change between the two reads.
I know this is a little late to the game, but you could alternatively implement a Skip Channel, which allows non-blocking writes to the channel, but "overwrites" the old value that has not been seen by any reader.
import Control.Concurrent.MVar
data SkipChan a = SkipChan (MVar (a, [MVar ()])) (MVar ())
newSkipChan :: IO (SkipChan a)
newSkipChan = do
sem <- newEmptyMVar
main <- newMVar (undefined, [sem])
return (SkipChan main sem)
putSkipChan :: SkipChan a -> a -> IO ()
putSkipChan (SkipChan main _) v = do
(_, sems) <- takeMVar main
putMVar main (v, [])
mapM_ (\sem -> putMVar sem ()) sems
getSkipChan :: SkipChan a -> IO a
getSkipChan (SkipChan main sem) = do
takeMVar sem
(v, sems) <- takeMVar main
putMVar main (v, sem:sems)
return v
dupSkipChan :: SkipChan a -> IO (SkipChan a)
dupSkipChan (SkipChan main _) = do
sem <- newEmptyMVar
(v, sems) <- takeMVar main
putMVar main (v, sem:sems)
return (SkipChan main sem)
There is BoundedChan on hackage, but that uses MVars, not STM. You could use it to learn how to write your own - it is only about a page of code.
精彩评论