开发者

Producer consumer with qualifications

I am new to clojure and am trying to understand how to properly use its concurrency features, so any critique/suggestions is appreciated. So I am trying to write a small test program in clojure that works as follows:

  1. there 5 producers and 2 consumers
  2. a producer waits for a random time and then pushes a number onto a shared queue.
  3. a consumer should pull a number off the queue as soon as the queue is nonempty and then sleep for a short time to simulate doing work
  4. the consumers should block when the queue is empty
  5. producers should block when the queue has more than 4 items in it to prevent it from growing huge

Here is my plan for each step above:

  1. the producers and consumers will be agents that don't really care for their state (just nil values or something); i just use the agents to send-off a "consumer" or "producer" function to do at some time. Then the shared queue will be (def queue (ref [])). Perhaps this should be an atom though?
  2. in the "producer" agent function, simply (Thread/sleep (rand-int 1000)) and then (dosync (alter queue conj (rand-int 100))) to push onto the queue.
  3. I am thinking to make the consumer agents watch the queue for changes with add-watcher. Not sure about this though..it wil开发者_如何转开发l wake up the consumers on any change, even if the change came from a consumer pulling something off (possibly making it empty) . Perhaps checking for this in the watcher function is sufficient. Another problem I see is that if all consumers are busy, then what happens when a producer adds something new to the queue? Does the watched event get queued up on some consumer agent or does it disappear?
  4. see above
  5. I really don't know how to do this. I heard that clojure's seque may be useful, but I couldn't find enough doc on how to use it and my initial testing didn't seem to work (sorry don't have the code on me anymore)


Here's my take on it. I made a point of only using Clojure data structures to see how that would work out. Note that it would have been perfectly usual and idiomatic to take a blocking queue from the Java toolbox and use it here; the code would be easy to adapt, I think. Update: I actually did adapt it to java.util.concurrent.LinkedBlockingQueue, see below.

clojure.lang.PersistentQueue

Call (pro-con) to start a test run; then have a look at the contents of output to see if anything happened and queue-lengths to see if they stayed within the given bound.

Update: To explain why I felt the need to use ensure below (I was asked about this on IRC), this is to prevent write skew (see the Wikipedia article on Snapshot isolation for a definition). If I substituted @queue for (ensure queue), it would become possible for two or more producers to check the length of the queue, find that it is less than 4, then place additional items on the queue and possibly bring the total length of the queue above 4, breaking the constraint. Similarly, two consumers doing @queue could accept the same item for processing, then pop two items off the queue. ensure prevents either of these scenarios from happening.

(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn queue-length-watch [_ _ _ new-queue-state]
  (dosync (alter queue-lengths conj (count new-queue-state))))

(add-watch queue :queue-length-watch queue-length-watch)

(defn producer [tag]
  (future
   (while @go-on?
     (if (dosync (let [l (count (ensure queue))]
                   (when (< l *max-queue-length*)
                     (alter queue conj tag)
                     true)))
       (Thread/sleep (rand-int 2000))))))

(defn consumer []
  (future
   (while @go-on?
     (Thread/sleep 100)       ; don't look at the queue too often
     (when-let [item (dosync (let [item (first (ensure queue))]
                               (alter queue pop)
                               item))]
       (Thread/sleep (rand-int 500))         ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))

java.util.concurrent.LinkedBlockingQueue

A version of the above written using LinkedBlockingQueue. Note how the general outline of the code is basically the same, with some details actually being slightly cleaner. I removed queue-lengths from this version, as LBQ takes care of that constraint for us.

(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn producer [tag]
  (future
   (while @go-on?
     (.put queue tag)
     (Thread/sleep (rand-int 2000)))))

(defn consumer []
  (future
   (while @go-on?
     ;; I'm using .poll on the next line so as not to block
     ;; indefinitely if we're done; note that this has the
     ;; side effect that nulls = nils on the queue will not
     ;; be handled; there's a number of other ways to go about
     ;; this if this is a problem, see docs on LinkedBlockingQueue
     (when-let [item (.poll queue)]
       (Thread/sleep (rand-int 500)) ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜