Tutorial: Messaging
In this tutorial, we'll explore the messaging features available to your Clojure applications when deployed on Immutant. Because Immutant is built atop JBoss AS7, it includes the excellent HornetQ messaging service baked right in. Hence, there is nothing extra to install or configure in order for your applications to benefit from asynchronous messaging.
Destinations are either Queues or Topics
Two types of message destinations, or endpoints, are supported: queues and topics. A queue exhibits point-to-point semantics: a message sent to a queue will be delivered to a single recipient. A topic provides publish-subscribe semantics: messages sent to a topic will be delivered to all subscribed recipients. In both cases, the message producers have no direct knowledge of the message consumers.
Use the start
function to create a messaging destination. A simple
naming convention designates an endpoint as either a queue or a topic:
if its name contains queue
, it's a queue; if it contains
topic
, it's a topic.
(require '[immutant.messaging :as msg]) (msg/start "queue.work") ; to start a queue (msg/start "topic.news") ; to start a topic
You can invoke start
from anywhere in your application, e.g. the
src/immutant/init.clj
initialization file, as described in
the deployment tutorial.
Note that you will need to call start
for any destination you want
to use, even if start
has already been called in another
coordinating application. Also note that start
is idempotent -
calling it more than once has no effect.
While start
has a complement, stop
, you needn't call it
directly. It will be invoked when your application is undeployed. A
call to stop
will silently fail if the endpoint is in use by any
other application. The last to leave will turn the lights out.
Only One Way to Produce Messages
publish
Messages are sent to a destination, whether queue or topic, via a
single function, publish
, to which is passed the destination name
and the message content, which can be just about anything. A number
of optional key-value parameters may be passed as well.
:encoding
may be either:clojure
(the default),:json
(useful with non-clojure consumers),:fressian
(an efficient binary encoding) or:text
(no encoding):priority
may be an integer between 0-9, inclusive. Convenient keyword values:low
,:normal
,:high
and:critical
correspond to 0, 4, 7 and 9, respectively. 4 is the default.:ttl
time-to-live may be specified in milliseconds, after which time the message is discarded if not consumed. Default is 0, i.e. forever.:properties
is a hash of arbitrary message metadata upon which JMS selector expressions may be constructed to filter received messages.
Some examples:
;; A simple string (msg/publish "queue.work" "simple string") ;; Notify everyone something interesting just happened (msg/publish "topic.news" {:event "VISIT" :url "/sales-inquiry"}) ;; Move this message to the front of the line (msg/publish "queue.work" some-message :priority :high :ttl 1000) ;; Make messages as complex as necessary (msg/publish "queue.work" {:a "b" :c [1 2 3 {:foo 42}]}) ;; Make messages consumable by a Ruby app (msg/publish "queue.work" {:a "b" :c [1 2 3 {:foo 42}]} :encoding :json)
Three Ways to Consume Messages
receive
Block on a call to receive
, passing a destination name and
optionally, the following:
:timeout
an expiration in milliseconds, after which nil is returned. Default is 0, i.e. wait forever:selector
a JMS expression used to filter messages according to the values of arbitrary:properties
. For documentation on JMS selector syntax please see the javadoc for javax.jms.Message.
listen
Pass a destination name and function to listen
and the decoded
content of a message sent to that destination will be passed to the
function. Options include:
:concurrency
the maximum number of listening threads that can simultaneouly call the function. Default is 1.:selector
same as:receive
message-seq
Create a lazy sequence of messages via message-seq
, which accepts
the same options as receive
.
Some examples:
;; Wait on a task (let [task (msg/receive "queue.work")] (perform task)) ;; Case-sensitive work queues? (msg/listen "/queue/lower" #(msg/publish "/queue/upper" (.toUpperCase %))) ;; Contrived laziness (let [messages (message-seq queue)] (doseq [i (range 4)] (publish queue i)) (= (range 4) (take 4 messages)))
Synchronous request/respond
Immutant provides an implementation of the request/response pattern, a popular means of synchronous work distribution. Clients can publish a message, i.e. make a request, and then block awaiting a response without knowing exactly which consumer returns the response. For example,
(require '[immutant.messaging :as msg]) ;; setup a responder (msg/respond "queue.work" (partial apply +)) ;; send a request (let [result (msg/request "queue.work" [1 2 3])] (println @result)) ;; => 6
See the manual for more options and examples.