JBoss.orgCommunity Documentation

Chapter 8. Immutant Messaging

8.1. Introduction

8.1.1. HornetQ

Immutant encapsulates the JBoss HornetQ message broker. It is automatically available to you, with no additional configuration required to start the messaging service. HornetQ supports clustered messaging, which provides auto-discovery, load-balancing, and failover, among other things.

The term "messaging" encompasses a large area of functionality. Messaging solutions are used to achieve loosely-coupled, asynchronous systems. The primary actors in a messaging-based system are messages, destinations, consumers, and producers. The broker mediates the relationships between the other actors.

8.2. Destinations

A destination represents a rendezvous point where messages are exchanged. A message may be sent to a destination by one actor, and received from the destination by another.

There are two main types of destinations: queues (point-to-point) and topics (publish-subscribe). All destinations allow multiple actors to send messages. The type of destination determines how the message is routed. A queue delivers the message to a single recipient (possibly one of many candidates). And a topic delivers the message to all interested recipients. In both cases, the message producers have no direct knowledge of the message consumers.

8.2.1. Deployment

Use immutant.messaging/start function to define a messaging destination. A simple naming convention designates the destination as either a queue or a topic: if its name begins with /queue, it's a queue; if it begins with /topic, it's a topic.

(require '[immutant.messaging :as msg])

(msg/start "/queue/work")   ; to start a queue
(msg/start "/topic/news")   ; to start a topic

You can invoke start from anywhere in your application, but typically it's done in the immutant.clj initialization file.

While start has a complement, immutant.messaging/stop, you needn't call it directly. It will be invoked when your application is undeployed. And it's important to note that start is idempotent: if an endpoint has already been started, likely by a cooperating application, the call is effectively a no-op. Similarly, a call to stop will silently fail if the endpoint is in use by any other application.

8.3. Messages

The unit of communication within a messaging system is a message. A message may either be simply a blob of octets, or it might have some higher-order, application-defined semantics. All messages include a set of headers, similar to email.

8.3.1. Publishing

Any component or client code that creates messages and gives them to the message broker for delivery is considered a producer. Generally speaking, the producer does not know the details of the destination or any of its consumers.

In Immutant, there is only one way to send a message, whether to a queue or a topic: via the immutant.messaging/publish function. It accepts two required parameters: the name of the destination and the message content, which can be just about anything.

A number of optional key-value parameters may be passed as well:

:encoding:clojureOne of :clojure, :json or :text
:priority4An integer (0-9) or one of :low, :normal, :high and :critical which correspond to 0, 4, 7 and 9, respectively
:ttl0An integer greater than 0, indicating the number of milliseconds after which the message is discarded if not consumed
:properties{}A hash of arbitrary metadata upon which JMS selector expressions may be constructed to filter received messages

The :json encoding is useful when the message consumers aren't written in Clojure. For example, TorqueBox Ruby processors will automatically convert json-encoded messages generated by a Clojure function into their analogous Ruby data structures, so as long as you limit the content of your messages to standard collections and types, they are transparently interoperable between Clojure and Ruby in either direction. Some Examples

;; A simple string
(msg/publish "/queue/work" "simple string")
;; Notify everyone something interesting just happened
(msg/publish "/topic/news" {:event "VISIT" :url "/sales-inquiry"})
;; Move this message to the front of the line
(msg/publish "/queue/work" some-message :priority :high :ttl 1000)
;; Make messages as complex as necessary
(msg/publish "/queue/work" {:a "b" :c [1 2 3 {:foo 42}]})
;; Make messages consumable by a Ruby app
(msg/publish "/queue/work" {:a "b" :c [1 2 3 {:foo 42}]} :encoding :json)

8.3.2. Receiving

Any component that waits for messages to be delivered to it by the message broker is consider a consumer. Typically, a consumer is unaware of the producer or any other consumers.

Immutant features three functions for consuming messages.

Both receive and message-seq expect the destination name as the first parameter, and optionally, the following key/value pairs:

:timeout10000An expiration in milliseconds, after which nil is returned; a value of 0 means wait forever
:selectorA JMS expression used to filter messages according to the values of arbitrary :properties

The listen method takes two parameters: the destination name and a function accepting one parameter, to which will be passed the decoded message. The following key/value options are also available:

:concurrency1The maximum number of listening threads that can simultaneouly call the function
:selectorSame as receive Some Examples

;; Wait on a task
(let [task (msg/receive "/queue/work")]
  (perform task))

;; Case-sensitive work queues?
(msg/listen "/queue/lower" #(msg/publish "/queue/upper" (.toUpperCase %)))

;; Contrived laziness
(let [messages (message-seq queue)]
  (doseq [i (range 4)] (publish queue i))
  (= (range 4) (take 4 messages)))
Immutant 0.1.0