JBoss.orgCommunity Documentation

Chapter 7. Immutant Messaging

7.1. Introduction

The term "messaging" encompasses a large area of functionality. Messaging solutions are used to achieve loosely-coupled, asynchronous systems. The primary actors in a messaging-based system are messages, destinations, consumers, and producers. The broker mediates the relationships between the other actors.

7.1.1. HornetQ

Immutant encapsulates the JBoss HornetQ message broker. It is automatically available to you, with no additional configuration required to start the messaging service. HornetQ supports clustered messaging, which provides auto-discovery, load-balancing, and failover, among other things.

7.2. Destinations

A destination represents a rendezvous point where messages are exchanged. A message may be sent to a destination by one actor, and received from the destination by another.

There are two main types of destinations: queues (point-to-point) and topics (publish-subscribe). All destinations allow multiple actors to send messages. The type of destination determines how the message is routed. A queue delivers the message to a single recipient (possibly one of many candidates). And a topic delivers the message to all interested recipients. In both cases, the message producers have no direct knowledge of the message consumers.

7.2.1. Durable Topic Subscribers

Typically, messages published to a topic are only delivered to consumers connected to the topic at that time. But if a consumer identifies itself with a unique name, then the broker will accumulate messages for that client when it's disconnected and deliver them in order whenever the client reconnects.

You implicitly create a durable topic subscriber by passing the :client-id option to the receive and listen functions, described below. As long as you pass the same :client-id in subsequent receive/listen calls, you'll receive every message published to that topic, whether you were connected at the time or not.

7.2.2. Deployment

Use the immutant.messaging/start function to define a messaging destination. A simple naming convention designates the destination as either a queue or a topic: if its name contains "queue", it's a queue; if it contains "topic", it's a topic. If you need to use queue or topic names that don't match the above convention, you can wrap the name in a call to immutant.messaging/as-queue or immutant.messaging/as-topic respectively. If you do wrap a destination name, you'll need to pass the wrapped version to any immutant.messaging function that takes a destination name. It's common to separate sections of the destination name with . or /, but is not required.

The start function accepts the following options:

OptionDefaultDescription
:durabletrueWhether messages persist across restarts
:selectornilA JMS expression used to filter messages by their metadata/properties

If a :selector is provided, then only messages with metadata/properties matching that expression will be accepted for delivery.

Some examples:

(require '[immutant.messaging :as msg])

;; to start queues - these are all valid names
(msg/start "/queue/work")
(msg/start ".queue.play")
(msg/start "queue.sing")
(msg/start "dancequeued")

;; wrap an ambiguous queue name in as-queue
(msg/start (msg/as-queue "no-q-word"))

;; to start topics - these are all valid names
(msg/start "/topic/news")
(msg/start "topic/infotainment")
(msg/start ".topic")
(msg/start "topicality")
(msg/start "topicality")
(msg/start "some.kinda.topic")

;; wrap an ambiguous topic name in as-topic
(msg/start (msg/as-topic "no-t-word"))

;; only blue messages will be queued for delivery
(msg/start "/queue/blue" :selector "color = 'blue'")
;; like this one...
(msg/publish "/queue/blue" "success" :properties {:color "blue"})

Starting a destination is dynamic, and can occur anywhere in your application code. You can invoke start anytime during the lifecycle of your application.

While start has a complement, immutant.messaging/stop, you needn't call it directly. It will be invoked when your application is undeployed. And it's important to note that start is idempotent: if an endpoint has already been started, likely by a cooperating application, the call is effectively a no-op. Similarly, a call to stop will silently fail if the endpoint is in use by any other application.

7.2.3. Accessing Destinations Controllers

Each messaging destination has associated controllers that can be used to see message counts, list & remove messages, and perform other operations. Two different controllers are provided by HornetQ for queues, and one for topics, each with slightly different controls. All are available via JMX or from immutant.messaging.hornetq/destination-controller.

For a given destination name (queue.example and topic.example in this case), you can access the MBeans via JMX with the following addresses:

# to access the JMS Queue MBean
org.hornetq:module=JMS,type=Queue,name="queue.example"

# to access the JMS Topic MBean
org.hornetq:module=JMS,type=Topic,name="topic.example"

# to access the HornetQ 'core' Queue MBean
org.hornetq:module=Core,type=Queue,address="jms.queue.queue.example",name="jms.queue.queue.example"

or via code with:

(require '[immutant.messaging.hornetq :as hq])

;; for the JMS queue controller
(hq/destination-controller "queue.example")

;; for the JMS topic controller
(hq/destination-controller "queue.topic")

;; for the HornetQ 'core' queue controller
(hq/destination-controller "queue.example" :core)

The returned controller depends on the type of the given destination and, for queues, the requested control-type (which defaults to :jms):

destinationcontrol-typecontroller
Queue:jmsorg.hornetq.api.jms.management.JMSQueueControl
Queue:coreorg.hornetq.core.management.impl.QueueControl
Topicignoredorg.hornetq.api.jms.management.TopicControl

7.3. Messages

The unit of communication within a messaging system is a message. A message may either be simply a blob of octets, or it might have some higher-order, application-defined semantics. All messages include a set of headers, similar to email.

7.3.1. Publishing

Any component or client code that creates messages and gives them to the message broker for delivery is considered a producer. Generally speaking, the producer does not know the details of the destination or any of its consumers.

In Immutant, there is only one way to send a message, whether to a queue or a topic: via the immutant.messaging/publish function. It accepts two required parameters: the name of the destination and the message content, which can be just about anything.

If the message has any Clojure metadata attached, it will be passed as the JMS Message's properties, the names of which are subject to certain naming restrictions (they must be valid Java identifiers) since they may be used as expressions in selectors (see below). It's also possible to pass properties via the :properties option, which will override any matching keys in the payload metadata, if present.

The publish function accepts the following options:

OptionDefaultDescription
:encoding:ednOne of :clojure, :edn, :json, or :text
:priority4An integer (0-9) or one of :low, :normal, :high and :critical which correspond to 0, 4, 7 and 9, respectively
:ttl0An integer greater than 0, indicating the number of milliseconds after which the message is discarded if not consumed. A 0 indicates that the message should be held indefinitely.
:persistenttrueIf true, undelivered messages survive restarts.
:properties{}A hash of arbitrary metadata upon which JMS selector expressions may be constructed to filter received messages
:correlation-idnilUsed to set the JMSCorrelationID (see setJMSCorrelationID)
:hostnilA remote HornetQ host to connect to.
:portnil, or 5445 if :host is setA remote HornetQ port to connect to. Requires :host to be set.
:usernamenilThe username to authenticate the connection with (if the broker has authentication enabled). Requires :password to be set.
:passwordnilThe password to authenticate the connection with (if the broker has authentication enabled). Requires :username to be set.

The :json and :edn encodings are useful when the message consumers aren't written in Clojure. For example, TorqueBox Ruby processors will automatically convert edn-encoded messages generated by a Clojure function into their analogous Ruby data structures, so as long as you limit the content of your messages to standard collections and types, they are transparently interoperable between Clojure and Ruby in either direction.

7.3.1.1. Some Examples

;; A simple string
(msg/publish "/queue/work" "simple string")
;; Notify everyone something interesting just happened
(msg/publish "topic/news" {:event "VISIT" :url "/sales-inquiry"})
;; Move this message to the front of the line
(msg/publish "/queue/work" some-message :priority :high :ttl 1000)
;; Make messages as complex as necessary
(msg/publish "/queue/work" {:a "b" :c [1 2 3 {:foo 42}]})
;; Make messages consumable by a Ruby app
(msg/publish "/queue/work" {:a "b" :c [1 2 3 {:foo 42}]} :encoding :json)
;; Publish to a remote broker
(msg/publish "queue.remote-work" "a message" :host "foo.example.com" :port 5445)
;; The received message's metadata => {:foo 42, :bar 1}
(msg/publish q (with-meta msg {:foo 42 :bar 0}) :properties {:bar 1})

7.3.1.2. A note about encodings

None of the built-in encodings can encode every java object, so you need to pay attention to the payloads you publish. For example, none of the encodings can handle an OutputStream.

7.3.2. Receiving

Any component that waits for messages to be delivered to it by the message broker is consider a consumer. Typically, a consumer is unaware of the producer or any other consumers.

If the published message payload contains metadata, the received message should have it, too, transferred in the form of JMS properties, subject to any overridden values passed in the :properties option (see above). If the payload cannot accept metadata, the message properties can be converted to a convenient Clojure hash using immutant.messaging.core/get-properties.

Immutant features three functions for consuming messages.

Both receive and message-seq expect the destination name as the first parameter, and optionally, the following key/value pairs:

OptionDefaultDescription
:timeout10000An expiration in milliseconds, after which nil is returned; a value of 0 means wait forever, a value of -1 means don't wait at all
:selectornilA JMS expression used to filter messages according to the values of arbitrary :properties
:decode?trueIf true, the decoded message body is returned. Otherwise, the javax.jms.Message object is returned
:client-idnilIdentifies a durable topic subscriber; ignored for queues
:hostnilA remote HornetQ host to connect to.
:portnil, or 5445 if :host is setA remote HornetQ port to connect to. Requires :host to be set.
:usernamenilThe username to authenticate the connection with (if the broker has authentication enabled). Requires :password to be set.
:passwordnilThe password to authenticate the connection with (if the broker has authentication enabled). Requires :username to be set.

By default, the dynamic variable, clojure.core/*read-eval* is set to false when decoding messages. To override this, you should set :decode? to false and bind *read-eval* to true before passing the encoded message to immutant.messaging.codecs/decode-with-metadata yourself.

For more details on message selectors, see javax.jms.Message.

The listen function takes two parameters: the destination name and a function accepting one parameter which will be applied to any received message. All of the above options except :timeout are supported, plus listen also accepts the following:

OptionDefaultDescription
:concurrency1The maximum number of listening threads that can simultaneouly call the function
:retry-intervalThe period in milliseconds between subsequent reconnection attempts.
:retry-interval-multiplierA multiplier to apply to the time since the last retry to compute the time to the next retry.
:max-retry-interval2000The max retry interval that will be used.
:reconnect-attempts0Total number of reconnect attempts to make before giving up and shutting down. (-1: unlimited)

listen is asynchronous - if you need to synchronize on its completion, you should deref the result.

By default, message handlers are transactional, so the function invoked in response to a message effectively demarcates a transaction that will be automatically committed if no exceptions are raised in the handler, and otherwise rolled back.

Any messages published within the handler automatically become part of its transaction, by default. So they won't be delivered until that transaction commits. To override this behavior, wrap your handler inside the immutant.xa.transaction/not-supported macro.

See Distributed Transactions for more details.

7.3.2.1. Some Examples

;; Wait on a task
(let [task (msg/receive "/queue/work")]
  (perform task))

;; Case-sensitive work queues?
(msg/listen ".queue.lower" #(msg/publish "/queue/upper" (.toUpperCase %)))

;; Listen to a remote queue
(msg/listen "queue/remote" #(do-someting %) :host "foo.example.com" :port 5445)

;; Contrived laziness
(let [messages (message-seq queue)]
  (doseq [i (range 4)] (publish queue i))
  (= (range 4) (take 4 messages)))

The complement of listen is immutant.messaging/unlisten, to which you pass the value returned by listen to cease the flow of messages to that handler.

Queues and topics behave differently when you map a handler to the same destination. For queues, the current handler, if any, is replaced, effectively making the listen call idempotent. Multiple listen calls for topics are idempotent, too, but only if the parameters are exactly the same. If you call listen for a certain topic with different handlers, they are additive. For example:

(listen "queue" #(println (inc %)))
(listen "queue" #(println (dec %)))
(publish "queue" 42)
=> 41

(listen "topic" #(println (inc %)))
(listen "topic" #(println (dec %)))
(publish "topic" 42)
=> 43
=> 41

Note that even if the contents within #() are identical, the actual functions are still different. If you want idempotent topic listeners, you should pass the same var to each. And even then, during development, you may inadvertently redefine the var and create multiple, redundant versions of the topic listener. Hijinks may ensue.

7.3.2.2. Accessing Listeners via JMX

Each message listener has a MBean exposed via JMX. Currently, you can only stop and start the listener from the MBean.

The MBean name is derived from URL-encoded concatenation of destination name and the :selector, if any. If the destination is a topic, the :client-id and the handler function name will be included as well.

The names are so gross-looking that I'm loathe to include any examples at this time.

7.4. Request/Response

Immutant also provides an implementation of the request/response pattern for synchronous work distribution. This feature is provided by two cleverly named functions: request and respond. Since they leverage the existing messaging subsystem, the work is automatically distributed across multiple workers within the same JVM or across multiple nodes if in a cluster.

7.4.1. Request

The immutant.messaging/request function takes a queue, a message, and an optional list of options. It publishes the message to the queue, marking it as a synchronous message and returns a delay that will receive the response from the worker initiated by the respond function. It accepts the same options as publish.

7.4.2. Respond

The immutant.messaging/respond method takes a queue, a function, and an optional list of options. It sets up a listener (via the listen function) that applies the given function to any received message and publishes the result back to the queue for the delay returned by request to receive. It accepts the same options as listen.

7.4.3. Some Examples

A basic example:

(require '[immutant.messaging :as msg])

;; setup a responder
(msg/respond "/queue/work" (partial apply +))

;; send a request
(let [result (msg/request "/queue/work" [1 2 3])]
  (println (deref result 1000 nil)) ;; => 6

An example of using properties and selectors to segment work on the same queue:

(require '[immutant.messaging :as msg])

;; respond to 'add' messages
(msg/respond "/queue/work" (partial apply +) :selector "operation='add'")

;; respond to 'multiply' messages
(msg/respond "/queue/work" (partial apply *) :selector "operation='multiply'")

(deref
 (msg/request "/queue/work" [1 2 3 4] :properties {"operation" "add"})
 1000 nil) ;; => 9

(deref
 (msg/request "/queue/work" [1 2 3 4] :properties {"operation" "multiply"})
 1000 nil) ;; => 24

7.5. Pipelines

Immutant provides a tool called a pipeline. A pipeline is basically a composition of functions (steps), where each function is passed the result of the previous function, dereferenced if needed. It is built on top of the messaging subsystem, allowing each step to have multiple processing threads, and to be automatically load balanced across a cluster. The pipeline functions are available via the immutant.pipeline namespace.

Since messaging is used to pass the data between step functions, the data has to be in a format that can be encoded as clojure via pr. See the above note about encodings.

Pipeline support is currently in an alpha state, and may be subject to massive changes.

7.5.1. Creating a pipeline

You create a pipeline with the immutant.pipeline/pipeline function. The pipeline function takes a unique (within the scope of the application) name, one or more single-arity functions, and optional keyword argument options, returning a function that acts as an entry point into the pipeline:

(require '[immutant.pipeline :as pl])

(defonce foo-pipeline
  (pl/pipeline "foo"
    function-that-does-something
    another-function))

7.5.1.1. Pipeline options

pipeline can take the following options, passed as keyword arguments after the step functions:

OptionDefaultDescription
:concurrency1The number of threads to use for each step. Can be overridden on a per-step basis (see below).
:error-handlernilA function to call when a step function throws an exception. Receives the exception and the data passed to the step function. Can be overriden on a per-step basis (see below).
:result-ttl1 hourThe time-to-live for the final pipeline result, in ms. Set to 0 for "forever", -1 to disable returning the result via a delay
:step-deref-timeout10 secondsThe amount of time to wait when dereferencing the result of a step that returns a delay, in ms
(require '[immutant.pipeline :as pl])

(defonce foo-pipeline
  (pl/pipeline "foo"
    function-that-does-something
    another-function
    :concurrency 2))

7.5.1.2. Per-step options

Each function can optionally be wrapped with metadata via the immutant.pipeline/step function, providing settings for how that particular function is handled within the pipeline:

OptionDefaultDescription
:namethe index of the fnA name to use for the step
:concurrency1The number of threads to use, overriding the pipeline setting
:error-handlernilAn error handler function that can override the pipeline setting
:step-deref-timeout10 secondsThe amount of time to wait when dereferencing the result of a step that returns a delay, in ms. Overrides the pipeline setting
(require '[immutant.pipeline :as pl])

(pl/pipeline "foo"
  function-that-does-something
  (pl/step another-function :concurrency 10)
  :concurrency 2)

7.5.2. Using a pipeline

The function returned by pipeline acts as an entry function, placing its argument onto the pipeline when called, returning a delay around the end of the pipeline (by default):

(require '[immutant.pipeline :as pl])

(defonce foo-pipeline
  (pl/pipeline "foo"
    function-that-does-something
    another-function))

(deref (foo-pipeline {:ham :biscuit}) 10000 :timeout!)

Pipelines store the result of an execution by default, allowing it to be retrieved by dereferencing the delay returned by the pipeline-fn call. To prevent results that may not be retrieved from being stored indefinitely, they have a default time-to-live of 1 hour. You can control the retention time by passing a :result-ttl option to pipeline. It is specified in milliseconds, with a value of 0 indicating that the result should be saved indefinitely, and -1 indicating that the results should be discarded immediately. If you set the :result-ttl to -1, any attempt to dereference the returned delay will raise an error.

If the result from a step is reference, it will be dereferenced before being passed to the next step. This allows you to use a pipeline within a pipeline. The amount of time to wait for the deref is controlled by the :step-deref-timeout option, and defaults to 10 seconds. Setting it to 0 will cause it to wait forever, which will tie up a thread

(require '[immutant.pipeline :as pl])

(defonce pipeline-x
  (pl/pipeline :x
    function-that-does-something
    another-function))

(defonce pipeline-y
  (pl/pipeline :y
    yet-another-function
    pipeline-x
    and-another
    :step-deref-timeout 60000))

By default, the pipeline entry function places its argument onto the front of the pipeline. You can insert the data into the pipeline at any step by passing a :step keyword argument. The step name would be the name you provided as an option for that step using the step function, or the index of the step in the list of steps if you haven't provided a name:

(require '[immutant.pipeline :as pl])

(defonce foo-pipeline
  (pl/pipeline "foo"
    function-that-does-something
    (pl/step another-function :name :another)
    a-third-function))

;; insert at head
(foo-pipeline {:ham :biscuit})

;; skip the first step
(foo-pipeline {:ham :biscuit} :step :another)

;; insert at the last step 
(foo-pipeline {:ham :biscuit} :step 2)

7.5.3. Available bindings

The following vars have bound values inside a step or error-handler invocation:

VarValue
*pipeline*The pipeline entry function for the currently active pipeline.
*current-step*The name of the currently executing step.
*next-step*The name of the next step in the pipeline.

7.5.4. Error handling

When an exception occurs in a step function, an error-handler function will be invoked if provided for the pipeline or for the particular step. This function will be passed the exception and the original data passed to the step function, and have all of the above bindings available:

;; a naive error handler that sleeps then retries on a network error,
;; logging and discarding otherwise
(defn error-handler [ex data]
  (if (instance? NoRouteToHostException ex)
    (do
      (Thread/sleep 1000)
      (pl/*pipeline* data :step pl/*current-step*))
    (println "ERROR:" ex)))

(require '[immutant.pipeline :as pl])

(pl/pipeline "foo"
  connects-to-foo
  connects-to-bar
  :error-handler error-handler)

If no error-handler function is provided, the error handling semantics provided my HornetQ are used, which causes the offending step to be retried up to ten times before giving up.

7.5.5. Halting the pipeline for a particular message

If, in a step function, you determine that the data requires no further processing, you can halt that particular pipeline execution by returning a special flag symbol - immutant.pipeline/halt:

(require '[immutant.pipeline :as pl])

;; halt the pipeline at the second step, causing another-function to
;; not be called
(pl/pipeline "foo"
  function-that-does-something
  #(if (:some-done-condition %)
     pl/halt
     %)
  another-function)

7.5.6. Stopping the pipeline

When your application is undeployed, Immutant will automatically shut down the pipeline. If you need to stop the pipeline at runtime, use the immutant.pipeline/stop function:

(require '[immutant.pipeline :as pl])

(let [pipeline (pl/pipeline "foo" ...)]
  ...
  (pl/stop pipeline))
Immutant 1.x.incremental.878