JBoss.orgCommunity Documentation

Chapter 7. Immutant Messaging

7.1. Introduction

7.1.1. HornetQ

Immutant encapsulates the JBoss HornetQ message broker. It is automatically available to you, with no additional configuration required to start the messaging service. HornetQ supports clustered messaging, which provides auto-discovery, load-balancing, and failover, among other things.

The term "messaging" encompasses a large area of functionality. Messaging solutions are used to achieve loosely-coupled, asynchronous systems. The primary actors in a messaging-based system are messages, destinations, consumers, and producers. The broker mediates the relationships between the other actors.

7.2. Destinations

A destination represents a rendezvous point where messages are exchanged. A message may be sent to a destination by one actor, and received from the destination by another.

There are two main types of destinations: queues (point-to-point) and topics (publish-subscribe). All destinations allow multiple actors to send messages. The type of destination determines how the message is routed. A queue delivers the message to a single recipient (possibly one of many candidates). And a topic delivers the message to all interested recipients. In both cases, the message producers have no direct knowledge of the message consumers.

7.2.1. Durable Topic Subscribers

Typically, messages published to a topic are only delivered to consumers connected to the topic at that time. But if a consumer identifies itself with a unique name, then the broker will accumulate messages for that client when it's disconnected and deliver them in order whenever the client reconnects.

You implicitly create a durable topic subscriber by passing the :client-id option to the receive and listen functions, described below. As long as you pass the same :client-id in subsequent receive/listen calls, you'll receive every message published to that topic, whether you were connected at the time or not.

7.2.2. Deployment

Use the immutant.messaging/start function to define a messaging destination. A simple naming convention designates the destination as either a queue or a topic: if its name matches =#"^.?queue"=, it's a queue; if it matches #"^.?topic"=, it's a topic. It's common to separate sections of the queue name with =. or /, but is not required.

(require '[immutant.messaging :as msg])

;; to start queues - these are all valid names
(msg/start "/queue/work")
(msg/start ".queue.play")
(msg/start "queue.sing")
(msg/start "queuedance")

;; to start topics - these are all valid names
(msg/start "/topic/news")
(msg/start "topic/infotainment")
(msg/start ".topic")
(msg/start "topicality")

You can invoke start from anywhere in your application, but typically it's done in the immutant.clj initialization file.

While start has a complement, immutant.messaging/stop, you needn't call it directly. It will be invoked when your application is undeployed. And it's important to note that start is idempotent: if an endpoint has already been started, likely by a cooperating application, the call is effectively a no-op. Similarly, a call to stop will silently fail if the endpoint is in use by any other application.

7.3. Messages

The unit of communication within a messaging system is a message. A message may either be simply a blob of octets, or it might have some higher-order, application-defined semantics. All messages include a set of headers, similar to email.

7.3.1. Publishing

Any component or client code that creates messages and gives them to the message broker for delivery is considered a producer. Generally speaking, the producer does not know the details of the destination or any of its consumers.

In Immutant, there is only one way to send a message, whether to a queue or a topic: via the immutant.messaging/publish function. It accepts two required parameters: the name of the destination and the message content, which can be just about anything.

If the message has any Clojure metadata attached, it will be passed as the JMS Message's properties, the names of which are subject to certain naming restrictions – they must be valid Java identifiers – since they may be used as expressions in selectors (see below). It's also possible to pass properties via the :properties option, which will override any matching keys in the payload metadata, if present.

The publish function accepts the following options:

OptionDefaultDescription
:encoding:clojureOne of :clojure, :json or :text
:priority4An integer (0-9) or one of :low, :normal, :high and :critical which correspond to 0, 4, 7 and 9, respectively
:ttl0An integer greater than 0, indicating the number of milliseconds after which the message is discarded if not consumed. A 0 indicates that the message should be held indefinitely.
:persistenttrueIf true, undelivered messages survive restarts.
:properties{}A hash of arbitrary metadata upon which JMS selector expressions may be constructed to filter received messages
:correlation-idnilUsed to set the JMSCorrelationID (see setJMSCorrelationID)
:hostnilA remote HornetQ host to connect to.
:portnil, or 5445 if :host is setA remote HornetQ port to connect to. Requires :host to be set.
:usernamenilThe username to authenticate the connection with (if the broker has authentication enabled). Requires :password to be set.
:passwordnilThe password to authenticate the connection with (if the broker has authentication enabled). Requires :username to be set.

The :json encoding is useful when the message consumers aren't written in Clojure. For example, TorqueBox Ruby processors will automatically convert json-encoded messages generated by a Clojure function into their analogous Ruby data structures, so as long as you limit the content of your messages to standard collections and types, they are transparently interoperable between Clojure and Ruby in either direction.

7.3.1.1. Some Examples

;; A simple string
(msg/publish "/queue/work" "simple string")
;; Notify everyone something interesting just happened
(msg/publish "topic/news" {:event "VISIT" :url "/sales-inquiry"})
;; Move this message to the front of the line
(msg/publish "/queue/work" some-message :priority :high :ttl 1000)
;; Make messages as complex as necessary
(msg/publish "/queue/work" {:a "b" :c [1 2 3 {:foo 42}]})
;; Make messages consumable by a Ruby app
(msg/publish "/queue/work" {:a "b" :c [1 2 3 {:foo 42}]} :encoding :json)
;; Publish to a remote broker
(msg/publish "queue.remote-work" "a message" :host "foo.example.com" :port 5445)
;; The received message's metadata => {:foo 42, :bar 1}
(msg/publish q (with-meta msg {:foo 42 :bar 0}) :properties {:bar 1})

7.3.2. Receiving

Any component that waits for messages to be delivered to it by the message broker is consider a consumer. Typically, a consumer is unaware of the producer or any other consumers.

If the published message payload contains metadata, the received message should have it, too, transferred in the form of JMS properties, subject to any overridden values passed in the :properties option (see above). If the payload cannot accept metadata, the message properties can be converted to a convenient Clojure hash using immutant.messaging.core/get-properties.

Immutant features three functions for consuming messages.

Both receive and message-seq expect the destination name as the first parameter, and optionally, the following key/value pairs:

OptionDefaultDescription
:timeout10000An expiration in milliseconds, after which nil is returned; a value of 0 means wait forever
:selectornilA JMS expression used to filter messages according to the values of arbitrary :properties
:decode?trueIf true, the decoded message body is returned. Otherwise, the javax.jms.Message object is returned
:client-idnilIdentifies a durable topic subscriber; ignored for queues
:hostnilA remote HornetQ host to connect to.
:portnil, or 5445 if :host is setA remote HornetQ port to connect to. Requires :host to be set.
:usernamenilThe username to authenticate the connection with (if the broker has authentication enabled). Requires :password to be set.
:passwordnilThe password to authenticate the connection with (if the broker has authentication enabled). Requires :username to be set.

For more details on message selectors, see javax.jms.Message.

The listen method takes two parameters: the destination name and a function accepting one parameter which will be applied to any received message. The following key/value options are also available:

OptionDefaultDescription
:concurrency1The maximum number of listening threads that can simultaneouly call the function
:selectornilSame as receive
:decode?trueIf true, the decoded message body is passed to f. Otherwise, the javax.jms.Message object is passed
:client-idnilIdentifies a durable topic subscriber; ignored for queues
:hostnilA remote HornetQ host to connect to.
:portnil, or 5445 if :host is setA remote HornetQ port to connect to. Requires :host to be set.
:usernamenilThe username to authenticate the connection with (if the broker has authentication enabled). Requires :password to be set.
:passwordnilThe password to authenticate the connection with (if the broker has authentication enabled). Requires :username to be set.

By default, message handlers are transactional, so the function invoked in response to a message effectively demarcates a transaction that will be automatically committed if no exceptions are raised in the handler, and otherwise rolled back.

Any messages published within the handler automatically become part of its transaction, by default. So they won't be delivered until that transaction commits. To override this behavior, wrap your handler inside the immutant.xa.transaction/not-supported macro.

See Distributed Transactions for more details.

7.3.2.1. Some Examples

;; Wait on a task
(let [task (msg/receive "/queue/work")]
  (perform task))

;; Case-sensitive work queues?
(msg/listen ".queue.lower" #(msg/publish "/queue/upper" (.toUpperCase %)))

;; Listen to a remote queue
(msg/listen "queue/remote" #(do-someting %) :host "foo.example.com" :port 5445)

;; Contrived laziness
(let [messages (message-seq queue)]
  (doseq [i (range 4)] (publish queue i))
  (= (range 4) (take 4 messages)))

7.4. Request/Response

Immutant also provides an implementation of the request/response pattern for synchronous work distribution. This feature is provided by two cleverly named functions: request and respond. Since they leverage the existing messaging subsystem, the work is automatically distributed across multiple workers within the same JVM or across multiple nodes if in a cluster.

7.4.1. Request

The immutant.messaging/request function takes a queue, a message, and an optional list of options. It publishes the message to the queue, marking it as a synchronous message and returns a delay that will receive the response from the worker initiated by the respond function. It accepts the same options as publish, with one additional option:

OptionDefaultDescription
:timeout10000Time in ms for the delayed receive to wait once it it is dereferenced, after which nil is returned

7.4.2. Respond

The immutant.messaging/respond method takes a queue, a function, and an optional list of options. It sets up a listener (via the listen function) that applies the given function to any received message and publishes the result back to the queue for the delay returned by request to receive. It accepts the same options as listen.

7.4.3. Some Examples

A basic example:

(require '[immutant.messaging :as msg])

;; setup a responder
(msg/respond "/queue/work" (partial apply +))

;; send a request
(let [result (msg/request "/queue/work" [1 2 3])]
  (println @result)) ;; => 6

An example of using properties and selectors to segment work on the same queue:

(require '[immutant.messaging :as msg])

;; respond to 'add' messages
(msg/respond "/queue/work" (partial apply +) :selector "operation='add'")

;; respond to 'multiply' messages
(msg/respond "/queue/work" (partial apply *) :selector "operation='multiply'")

@(msg/request "/queue/work" [1 2 3 4] :properties {"operation" "add"}) ;; => 9

@(msg/request "/queue/work" [1 2 3 4] :properties {"operation" "multiply"}) ;; => 24
Immutant 0.4.0