开发者

Transform this Clojure call into a lazy sequence

I'm working with a messaging toolkit (it happens to be Spread but I don't know that the details matter). Receiving messages from this toolkit requires some boilerplate:

  1. Create a connection to the daemon.
  2. Join a group.
  3. Receive one or more messages.
  4. Leave the group.
  5. Disconnect from the daemon.

Following some idioms that I've seen used elsewhere, I was able to cook up some working functions using Spread's Java API and Clojure's interop forms:

(defn connect-to-daemon
  "Open a connection"
  [daemon-spec]
  (let [connection (SpreadConnection.)
        {:keys [host port user]} daemon-spec]
    (doto connection
      (.connect (InetAddress/getByName host) port user false false))))

(defn join-group
  "Join a group on a connection"
  [cxn group-name]
  (doto (SpreadGroup.)
    (.join cxn group-name)))

(defn with-daemon*
  "Execute a function with a connection to the specified 开发者_如何学Cdaemon"
  [daemon-spec func]
  (let [daemon (merge *spread-daemon* daemon-spec)
        cxn (connect-to-daemon daemon-spec)]
    (try
     (binding [*spread-daemon* (assoc daemon :connection cxn)]
       (func))
     (finally
      (.disconnect cxn)))))

(defn with-group*
  "Execute a function while joined to a group"
  [group-name func]
  (let [cxn (:connection *spread-daemon*)
        grp (join-group cxn group-name)]
    (try
     (binding [*spread-group* grp]
       (func))
     (finally
      (.leave grp)))))

(defn receive-message
  "Receive a single message. If none are available, this will block indefinitely."
  []
  (let [cxn (:connection *spread-daemon*)]
    (.receive cxn)))

(Basically the same idiom as with-open, just that the SpreadConnection class uses disconnect instead of close. Grr. Also, I left out some macros that aren't relevant to the structural question here.)

This works well enough. I can call receive-message from inside of a structure like:

(with-daemon {:host "localhost" :port 4803}
  (with-group "aGroup"
    (... looping ...
      (let [msg (receive-message)] 
        ...))))

It occurs to me that receive-message would be cleaner to use if it were an infinite lazy sequence that produces messages. So, if I wanted to join a group and get messages, the calling code should look something like:

(def message-seq (messages-from {:host "localhost" :port 4803} "aGroup"))
(take 5 message-seq)

I've seen plenty of examples of lazy sequences without cleanup, that's not too hard. The catch is steps #4 and 5 from above: leaving the group and disconnecting from the daemon. How can I bind the state of the connection and group into the sequence and run the necessary cleanup code when the sequence is no longer needed?


This article describes how to do exactly that using clojure-contrib fill-queue. Regarding cleanup - the neat thing about fill-queue is that you can supply a blocking function that cleans itself up if there is an error or some condition reached. You can also hold a reference to the resource to control it externally. The sequence will just terminate. So depending on your semantic requirement you'll have to choose the strategy that fits.


Try this:

(ns your-namespace
  (:use clojure.contrib.seq-utils))

(defn messages-from [daemon-spec group-name]
  (let [cnx (connect-to-deamon daemon-spec))
        group (connect-to-group cnx group-name)]
    (fill-queue (fn [fill]
                  (if done? 
                      (do
                        (.leave group)
                        (.disconnect cnx)
                        (throw (RuntimeException. "Finished messages"))
                      (fill (.receive cnx))))))

Set done? to true when you want to terminate the list. Also, any exceptions thrown in (.receive cnx) will also terminate the list.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜