JBoss.orgCommunity Documentation

Chapter 8. Immutant Messaging

8.1. Introduction

The term "messaging" encompasses a large area of functionality. Messaging solutions are used to achieve loosely-coupled, asynchronous systems. The primary actors in a messaging-based system are messages, destinations, consumers, and producers. The broker mediates the relationships between the other actors.

8.1.1. HornetQ

Immutant encapsulates the JBoss HornetQ message broker. It is automatically available to you, with no additional configuration required to start the messaging service. HornetQ supports clustered messaging, which provides auto-discovery, load-balancing, and failover, among other things.

8.2. Destinations

A destination represents a rendezvous point where messages are exchanged. A message may be sent to a destination by one actor, and received from the destination by another.

There are two main types of destinations: queues (point-to-point) and topics (publish-subscribe). All destinations allow multiple actors to send messages. The type of destination determines how the message is routed. A queue delivers the message to a single recipient (possibly one of many candidates). And a topic delivers the message to all interested recipients. In both cases, the message producers have no direct knowledge of the message consumers.

8.2.1. Durable Topic Subscribers

Typically, messages published to a topic are only delivered to consumers connected to the topic at that time. But if a consumer identifies itself with a unique name, then the broker will accumulate messages for that client when it's disconnected and deliver them in order whenever the client reconnects.

You implicitly create a durable topic subscriber by passing the :client-id option to the receive and listen functions, described below. As long as you pass the same :client-id in subsequent receive/listen calls, you'll receive every message published to that topic, whether you were connected at the time or not.

8.2.2. Deployment

Use the immutant.messaging/start function to define a messaging destination. A simple naming convention designates the destination as either a queue or a topic: if its name contains "queue", it's a queue; if it contains "topic", it's a topic. If you need to use queue or topic names that don't match the above convention, you can wrap the name in a call to immutant.messaging/as-queue or immutant.messaging/as-topic respectively. If you do wrap a destination name, you'll need to pass the wrapped version to any immutant.messaging function that takes a destination name. It's common to separate sections of the destination name with . or /, but is not required.

In addition to the destination name, the start function accepts the following options when starting a queue:

OptionDefaultDescription
:durabletrueWhether messages persist across restarts
:selectornilA JMS expression used to filter messages by their metadata/properties

If a :selector is provided, then only messages with metadata/properties matching that expression will be accepted for delivery.

Some examples:

(require '[immutant.messaging :as msg])

;; to start queues - these are all valid names
(msg/start "/queue/work")
(msg/start ".queue.play")
(msg/start "queue.sing")
(msg/start "dancequeued")

;; wrap an ambiguous queue name in as-queue
(msg/start (msg/as-queue "no-q-word"))

;; to start topics - these are all valid names
(msg/start "/topic/news")
(msg/start "topic/infotainment")
(msg/start ".topic")
(msg/start "topicality")
(msg/start "some.kinda.topic")

;; wrap an ambiguous topic name in as-topic
(msg/start (msg/as-topic "no-t-word"))

;; only blue messages will be queued for delivery
(msg/start "/queue/blue" :selector "color = 'blue'")
;; like this one...
(msg/publish "/queue/blue" "success" :properties {:color "blue"})

Starting a destination is dynamic, and can occur anywhere in your application code. You can invoke start anytime during the lifecycle of your application.

While start has a complement, immutant.messaging/stop, you needn't call it directly. It will be invoked when your application is undeployed. And it's important to note that start is idempotent: if an destination has already been started, likely by a cooperating application, the call is effectively a no-op. Similarly, a call to stop will silently fail if the destination is in use by any other application. Note that in cases where a destination is started by another application, you will still need to start it locally for it to be visible to other messaging functions within your application.

8.2.3. Setting Advanced HornetQ Options

HornetQ provides some advanced settings that can be applied to a particular destination or destinations with names matching a string. Typically, these options are set via XML, but HornetQ does provide an API for setting them at runtime. Immutant wraps that API with immutant.messaging.hornetq/set-address-options, and, as a convenience, allows any of these settings to be passed to immutant.messaging/start as well.

The available settings are:

OptionDefaultDescription
:address-full-message-policy:pageSpecifies what should happen when an address reaches :max-size-bytes in undelivered messages. One of: :block, :drop, :fail, or :page. See below for more details
:dead-letter-addressjms.queue.DLQIf set, any messages that fail to deliver to their original destination will be delivered here. More info
:expiry-addressjms.queue.ExpiryQueueIf set, any messages with a :ttl that expires before delivery will be delivered here. More info
:expiry-delay-1If > -1, this value (in ms) is used as the default :ttl for messages that don't have a :ttl > 0 set.
:last-value-queuefalseIf true, only the most recent message for a last-value property will be retained. More info
:max-delivery-attempts10The number of times delivery will be attempted for a message before giving up. If :dead-letter-address is set, the message will be delivered there, or removed otherwise. More info
:max-size-bytes20MBThe maximum size (in bytes) of retained messages on an address before :address-full-message-policy is applied. More info
:page-cache-max-size5HornetQ will keep up to this many page files in memory to optimize IO. More info
:page-size-bytes10MBThe size (in bytes) of the page files created when paging. More info
:redelivery-delay0Specifies the delay (in ms) between redelivery attempts. More info
:redelivery-multiplier1.0Controls the backoff for redeliveries. The delay between redelivery attempts is calculated as :redelivery-delay * (:redelivery-multiplier ^ attempt-count)
:redistribution-delay1000Specifies the delay (in ms) to wait before redistributing messages from a node in a cluster to other nodes when the queue no longer has consumers on the current node. More info
:send-to-dla-on-no-routefalseIf true, any message that can't be routed to its destination will be sent to :dead-letter-address.

8.2.3.1. Options for :address-full-message-policy

The :address-full-message-policy takes one of the following four policy specifiers:

  • :block - publish calls will block until the current size drops below :max-size-bytes
  • :drop - new messages are silently dropped
  • :fail - new messages are dropped and an exception is thrown on publish
  • :page - new messages will be paged to disk

See the HornetQ documentation for more information about these policies.

Some Examples, including HornetQ wildcard matching:

;; passing options to start
(msg/start "queue.foo"
  :dead-letter-address "queue.dead-letters"
  :max-delivery-attempts 20)

;; setting options for an existing queue
(hornetq/set-address-options "queue.foo"
  {:dead-letter-address "queue.dead-letters"
   :max-delivery-attempts 20})

;; setting options for all queues of a certain prefix
;; will match: queue.notifications.ham, queue.notifications.biscuits
(hornetq/set-address-options "queue.notifications.*"
  {:dead-letter-address "queue.dead-letters"
   :max-delivery-attempts 20})

;; match *all* destinations
(hornetq/set-address-options "#"
  {:dead-letter-address "queue.dead-letters"
   :max-delivery-attempts 20})

8.2.4. Accessing Destinations Controllers

Each messaging destination has associated controllers that can be used to see message counts, list & remove messages, and perform other operations. Two different controllers are provided by HornetQ for queues, and one for topics, each with slightly different controls. All are available via JMX or from immutant.messaging.hornetq/destination-controller.

For a given destination name (queue.example and topic.example in this case), you can access the MBeans via JMX with the following addresses:

# to access the JMS Queue MBean
org.hornetq:module=JMS,type=Queue,name="queue.example"

# to access the JMS Topic MBean
org.hornetq:module=JMS,type=Topic,name="topic.example"

# to access the HornetQ 'core' Queue MBean
org.hornetq:module=Core,type=Queue,address="jms.queue.queue.example",name="jms.queue.queue.example"

or via code with:

(require '[immutant.messaging.hornetq :as hq])

;; for the JMS queue controller
(hq/destination-controller "queue.example")

;; for the JMS topic controller
(hq/destination-controller "queue.topic")

;; for the HornetQ 'core' queue controller
(hq/destination-controller "queue.example" :core)

The returned controller depends on the type of the given destination and, for queues, the requested control-type (which defaults to :jms):

destinationcontrol-typecontroller
Queue:jmsorg.hornetq.api.jms.management.JMSQueueControl
Queue:coreorg.hornetq.core.management.impl.QueueControl
Topicignoredorg.hornetq.api.jms.management.TopicControl

8.3. Messages

The unit of communication within a messaging system is a message. A message may either be simply a blob of octets, or it might have some higher-order, application-defined semantics. All messages include a set of headers, similar to email.

8.3.1. Publishing

Any component or client code that creates messages and gives them to the message broker for delivery is considered a producer. Generally speaking, the producer does not know the details of the destination or any of its consumers.

In Immutant, there is only one way to send a message, whether to a queue or a topic: via the immutant.messaging/publish function. It accepts two required parameters: the name of the destination and the message content, which can be just about anything.

If the message has any Clojure metadata attached, it will be passed as the JMS Message's properties, the names of which are subject to certain naming restrictions (they must be valid Java identifiers) since they may be used as expressions in selectors (see below). It's also possible to pass properties via the :properties option, which will override any matching keys in the payload metadata, if present.

The publish function accepts the following options:

OptionDefaultDescription
:encoding:ednOne of :clojure, :edn, :fressian, :json, or :text
:priority4An integer (0-9) or one of :low, :normal, :high and :critical which correspond to 0, 4, 7 and 9, respectively
:ttl0An integer greater than 0, indicating the number of milliseconds after which the message is discarded if not consumed. A 0 indicates that the message should be held indefinitely.
:persistenttrueIf true, undelivered messages survive restarts (if the destination is durable).
:properties{}A map of arbitrary metadata upon which JMS selector expressions may be constructed to filter received messages
:correlation-idnilUsed to set the JMSCorrelationID (see setJMSCorrelationID)
:hostnilA remote HornetQ host to connect to.
:portnil, or 5445 if :host is setA remote HornetQ port to connect to. Requires :host to be set.
:usernamenilThe username to authenticate the connection with (if the broker has authentication enabled). Requires :password to be set.
:passwordnilThe password to authenticate the connection with (if the broker has authentication enabled). Requires :username to be set.

The :json and :edn encodings are useful when the message consumers aren't written in Clojure. For example, TorqueBox Ruby processors will automatically convert edn-encoded messages generated by a Clojure function into their analogous Ruby data structures, so as long as you limit the content of your messages to standard collections and types, they are transparently interoperable between Clojure and Ruby in either direction.

8.3.1.1. Some Examples

;; A simple string
(msg/publish "/queue/work" "simple string")
;; Notify everyone something interesting just happened
(msg/publish "topic/news" {:event "VISIT" :url "/sales-inquiry"})
;; Move this message to the front of the line
(msg/publish "/queue/work" some-message :priority :high :ttl 1000)
;; Make messages as complex as necessary
(msg/publish "/queue/work" {:a "b" :c [1 2 3 {:foo 42}]})
;; Make messages consumable by a Ruby app
(msg/publish "/queue/work" {:a "b" :c [1 2 3 {:foo 42}]} :encoding :json)
;; Publish to a remote broker
(msg/publish "queue.remote-work" "a message" :host "foo.example.com" :port 5445)
;; The received message's metadata => {:foo 42, :bar 1}
(msg/publish q (with-meta msg {:foo 42 :bar 0}) :properties {:bar 1})

8.3.1.2. A note about encodings

None of the built-in encodings can encode every java object, so you need to pay attention to the payloads you publish. For example, none of the encodings can handle an OutputStream.

8.3.2. Receiving

Any component that waits for messages to be delivered to it by the message broker is consider a consumer. Typically, a consumer is unaware of the producer or any other consumers.

If the published message payload contains metadata, the received message should have it, too, transferred in the form of JMS properties, subject to any overridden values passed in the :properties option (see above). If the payload cannot accept metadata, the message properties can be converted to a convenient Clojure map using immutant.messaging.core/get-properties.

Immutant features three functions for consuming messages.

Both receive and message-seq expect the destination name as the first parameter, and optionally, the following key/value pairs:

OptionDefaultDescription
:timeout10000An expiration in milliseconds, after which the timeout-val is returned; a value of 0 means wait forever, a value of -1 means don't wait at all
:timeout-valnilThe value to return when a timeout occurs. Also returned when a timeout of -1 is specified, and no message is available
:selectornilA JMS expression used to filter messages according to the values of arbitrary :properties
:decode?trueIf true, the decoded message body is returned. Otherwise, the javax.jms.Message object is returned
:client-idnilIdentifies a durable topic subscriber; ignored for queues
:hostnilA remote HornetQ host to connect to.
:portnil, or 5445 if :host is setA remote HornetQ port to connect to. Requires :host to be set.
:usernamenilThe username to authenticate the connection with (if the broker has authentication enabled). Requires :password to be set.
:passwordnilThe password to authenticate the connection with (if the broker has authentication enabled). Requires :username to be set.

By default, the dynamic variable, clojure.core/*read-eval* is set to false when decoding messages. To override this, you should set :decode? to false and bind *read-eval* to true before passing the encoded message to immutant.messaging.codecs/decode-with-metadata yourself.

For more details on message selectors, see javax.jms.Message.

The listen function takes two parameters: the destination name and a function accepting one parameter which will be applied to any received message. All of the above options for receive except :timeout are supported, plus listen also accepts the following:

OptionDefaultDescription
:concurrency1The maximum number of listening threads that can simultaneouly call the function
:xatrueWhether the handler demarcates an XA transaction
:retry-intervalThe period in milliseconds between subsequent reconnection attempts.
:retry-interval-multiplierA multiplier to apply to the time since the last retry to compute the time to the next retry.
:max-retry-interval2000The max retry interval that will be used.
:reconnect-attempts0Total number of reconnect attempts to make before giving up and shutting down. (-1: unlimited)

listen is asynchronous; if you need to synchronize on the completion of its initialization, you should deref the result.

Note: :concurrency determines the number of consumers connected to the destination and thus you'll rarely want a concurrency greater than 1 for topics since this will cause you to process duplicate messages.

8.3.2.1. Transactional by default

By default, message handlers are transactional, so the function invoked in response to a message effectively demarcates a transaction that will be automatically committed if no exceptions are raised, and otherwise rolled back.

Any messages published within the handler automatically become part of its transaction. So they won't be delivered until that transaction commits, i.e. the handler runs to completion successfully. Operations on Immutant caches and datasources work the same way when called from within a handler.

To override this behavior, set the :xa option to false when invoking listen. You should probably do this for any handlers you expect to take a long time to complete (>1 minute). By default, a reaper process will abort any transaction taking longer than 5 minutes.

For finer-grained control, another option is to wrap any operations outside the scope of your handler's transaction inside a call to immutant.xa.transaction/not-supported.

See Distributed Transactions for more details.

8.3.2.2. Some Examples

;; Wait on a task
(let [task (msg/receive "/queue/work")]
  (perform task))

;; Case-sensitive work queues?
(msg/listen ".queue.lower" #(msg/publish "/queue/upper" (.toUpperCase %)))

;; Listen to a remote queue
(msg/listen "queue/remote" #(do-someting %) :host "foo.example.com" :port 5445)

;; Contrived laziness
(let [messages (message-seq queue)]
  (doseq [i (range 4)] (publish queue i))
  (= (range 4) (take 4 messages)))

The complement of listen is immutant.messaging/unlisten, to which you pass the value returned by listen to cease the flow of messages to that handler. Note that unlisten will be called for you automatically when your application is undeployed.

Queues and topics behave differently when you map a handler to the same destination. For queues, the current handler, if any, is replaced, effectively making the listen call idempotent. Multiple listen calls for topics are idempotent, too, but only if the parameters are exactly the same. If you call listen for a certain topic with different handlers, they are additive. For example:

(listen "queue" #(println (inc %)))
(listen "queue" #(println (dec %)))
(publish "queue" 42)
=> 41

(listen "topic" #(println (inc %)))
(listen "topic" #(println (dec %)))
(publish "topic" 42)
=> 43
=> 41

Note that even if the contents within #() are identical, the actual anonymous functions are still different objects. If you want idempotent topic listeners, you should pass the same var to each. And even then, during development, you may inadvertently redefine the var and create multiple, redundant versions of the topic listener. Hijinks may ensue.

8.3.2.3. Accessing Listeners via JMX

Each message listener has a MBean exposed via JMX. Currently, you can only stop and start the listener from the MBean.

The MBean name is derived from URL-encoded concatenation of destination name and the :selector, if any. If the destination is a topic, the :client-id and the handler function name will be included as well.

The names are so gross-looking that we're loathe to include any examples at this time.

8.3.3. Connections and Sessions

Each of the aforementioned functions requires a JMS Connection and a JMS Session. By default, new instances will be created each time you call receive, publish, or listen. For the latter, this is not of much concern, since the listener's connection will remain open for its lifetime, but if you're repeatedly calling receive or publish in the same thread, a JMS Connection is being wastefully opened and closed with each call. In that case, you should use the immutant.messaging/with-connection macro to establish a single connection used by any messaging function called within its body. Additionally, any options you pass to with-connection will be used as default values for the options relevant to the messaging calls in its body.

For example, each of the 102 calls in the following block will use the same JMS Connection. And all of the published messages except the last one will use the :json encoding, which doesn't handle clojure keywords properly.

(with-connection {:encoding :json}
  (dotimes [x 100]
    (publish "queue.question" (assoc payload :x x)))
  (publish "queue.question" :done, :encoding :clojure)
  (receive "queue.answer"))

Within the body of with-connection you can call the session function to access the JMS Session. For example:

(with-connection {}
  (let [msg (.createBytesMessage (immutant.messaging.core/session))]
    (.writeBytes msg (.getBytes "foo"))
    (publish somewhere msg)))

It's also possible to manage connections yourself and use them by setting the :connection option, but of course you're then responsible for starting and closing them when you're done. Here's an example:

(with-open [c (immutant.messaging.core/create-connection {})]
  (.start c)
  (with-connection {:connection c}
    (publish wherever whatever)))

8.3.4. A word about performance

Though HornetQ is capable of being very fast, Immutant's default settings are conservative, trading some performance for data integrity, e.g. deliver-once guarantees of durable messages with minimal risk of message loss. HornetQ offers many recommendations for tuning performance, but only a few of those settings are exposed through the Immutant messaging namespace, e.g. :durable, :persistent, :concurrency, and :xa.

Probably the biggest impact will come from re-using connections via with-connection described above, disabling :xa in your listeners when not needed, and increasing their :concurrency setting from its default of 1.

Other settings will need to go within the <hornetq-server> element of $IMMUTANT_HOME/jboss/standalone/configuration/standalone[-ha].xml

8.4. Request/Response

Immutant also provides an implementation of the request/response pattern for synchronous work distribution. This feature is provided by two cleverly named functions: request and respond. Since they leverage the existing messaging subsystem, the work is automatically distributed across multiple workers within the same JVM or across multiple nodes if in a cluster.

8.4.1. Request

The immutant.messaging/request function takes a queue, a message, and an optional list of options. It publishes the message to the queue, marking it as a synchronous message and returns a delay that will receive the response from the worker initiated by the respond function. It accepts the same options as publish.

8.4.2. Respond

The immutant.messaging/respond method takes a queue, a function, and an optional list of options. It sets up a listener (via the listen function) that applies the given function to any received message and publishes the result back to the queue for the delay returned by request to receive. It accepts the same options as listen.

8.4.3. Some Examples

A basic example:

(require '[immutant.messaging :as msg])

;; setup a responder
(msg/respond "/queue/work" (partial apply +))

;; send a request
(let [result (msg/request "/queue/work" [1 2 3])]
  (println (deref result 1000 nil)) ;; => 6

An example of using properties and selectors to segment work on the same queue:

(require '[immutant.messaging :as msg])

;; respond to 'add' messages
(msg/respond "/queue/work" (partial apply +) :selector "operation='add'")

;; respond to 'multiply' messages
(msg/respond "/queue/work" (partial apply *) :selector "operation='multiply'")

(deref
 (msg/request "/queue/work" [1 2 3 4] :properties {"operation" "add"})
 1000 nil) ;; => 9

(deref
 (msg/request "/queue/work" [1 2 3 4] :properties {"operation" "multiply"})
 1000 nil) ;; => 24

8.5. Pipelines

Immutant provides a tool called a pipeline. A pipeline is basically a composition of functions (steps), where each function is passed the result of the previous function, dereferenced if needed. It is built on top of the messaging subsystem, allowing each step to have multiple processing threads, and to be automatically load balanced across a cluster. The pipeline functions are available via the immutant.pipeline namespace.

Since messaging is used to pass the data between step functions, the data has to be in a format that can be encoded as clojure via pr. See the above note about encodings.

8.5.1. Creating a pipeline

You create a pipeline with the immutant.pipeline/pipeline function. The pipeline function takes a unique (within the scope of the application) name, one or more single-arity functions, and optional keyword argument options, returning a function that acts as an entry point into the pipeline:

(require '[immutant.pipeline :as pl])

(defonce foo-pipeline
  (pl/pipeline "foo"
    function-that-does-something
    another-function))

8.5.1.1. Pipeline options

pipeline can take the following options, passed as keyword arguments after the step functions:

OptionDefaultDescription
:concurrency1The number of threads to use for each step. Can be overridden on a per-step basis (see below).
:error-handlernilA function to call when a step function throws an exception. Receives the exception and the data passed to the step function. Can be overriden on a per-step basis (see below).
:result-ttl1 hourThe time-to-live for the final pipeline result, in ms. Set to 0 for "forever", -1 to disable returning the result via a delay
:step-deref-timeout10 secondsThe amount of time to wait when dereferencing the result of a step that returns a delay, in ms
(require '[immutant.pipeline :as pl])

(defonce foo-pipeline
  (pl/pipeline "foo"
    function-that-does-something
    another-function
    :concurrency 2))

8.5.1.2. Per-step options

Each function can optionally be wrapped with metadata via the immutant.pipeline/step function, providing settings for how that particular function is handled within the pipeline:

OptionDefaultDescription
:namethe index of the fnA name to use for the step
:concurrency1The number of threads to use, overriding the pipeline setting
:error-handlernilAn error handler function, overriding the pipeline setting
:step-deref-timeout10 secondsThe amount of time to wait when dereferencing the result of a step that returns a delay, in ms. Overrides the pipeline setting
:fanout?falseIf true, the result of the step will be fanned out to the next step. See the fanout section for more details.
(require '[immutant.pipeline :as pl])

(pl/pipeline "foo"
  function-that-does-something
  (pl/step another-function :concurrency 10)
  :concurrency 2)

8.5.2. Using a pipeline

The function returned by pipeline acts as an entry function, placing its argument onto the pipeline when called, returning a delay around the end of the pipeline (by default):

(require '[immutant.pipeline :as pl])

(defonce foo-pipeline
  (pl/pipeline "foo"
    function-that-does-something
    another-function))

(deref (foo-pipeline {:ham :biscuit}) 10000 :timeout!)

Pipelines store the result of an execution by default, allowing it to be retrieved by dereferencing the delay returned by the pipeline-fn call. To prevent results that may not be retrieved from being stored indefinitely, they have a default time-to-live of 1 hour. You can control the retention time by passing a :result-ttl option to pipeline. It is specified in milliseconds, with a value of 0 indicating that the result should be saved indefinitely, and -1 indicating that the results should be discarded immediately. If you set the :result-ttl to -1, any attempt to dereference the returned delay will raise an error.

If the result from a step is reference, it will be dereferenced before being passed to the next step. This allows you to use a pipeline within a pipeline. The amount of time to wait for the deref is controlled by the :step-deref-timeout option, and defaults to 10 seconds. Setting it to 0 will cause it to wait forever, which will tie up a thread indefinitely.

(require '[immutant.pipeline :as pl])

(defonce pipeline-x
  (pl/pipeline :x
    function-that-does-something
    another-function))

(defonce pipeline-y
  (pl/pipeline :y
    yet-another-function
    pipeline-x
    and-another
    :step-deref-timeout 60000))

By default, the pipeline entry function places its argument onto the front of the pipeline. You can insert the data into the pipeline at any step by passing a :step keyword argument. The step name would be the name you provided as an option for that step using the step function, or the index of the step in the list of steps if you haven't provided a name:

(require '[immutant.pipeline :as pl])

(defonce foo-pipeline
  (pl/pipeline "foo"
    function-that-does-something
    (pl/step another-function :name :another)
    a-third-function))

;; insert at head
(foo-pipeline {:ham :biscuit})

;; skip the first step
(foo-pipeline {:ham :biscuit} :step :another)

;; insert at the last step 
(foo-pipeline {:ham :biscuit} :step 2)

8.5.3. Available bindings

The following vars have bound values inside a step or error-handler invocation:

VarValue
*pipeline*The pipeline entry function for the currently active pipeline.
*current-step*The name of the currently executing step.
*next-step*The name of the next step in the pipeline.

8.5.4. Error handling

When an exception occurs in a step function, an error-handler function will be invoked if provided for the pipeline or for the particular step. This function will be passed the exception and the original data passed to the step function, and have all of the above bindings available:

;; a naive error handler that sleeps then retries on a network error,
;; logging and discarding otherwise
(defn error-handler [ex data]
  (if (instance? NoRouteToHostException ex)
    (do
      (Thread/sleep 1000)
      (pl/*pipeline* data :step pl/*current-step*))
    (println "ERROR:" ex)))

(require '[immutant.pipeline :as pl])

(pl/pipeline "foo"
  connects-to-foo
  connects-to-bar
  :error-handler error-handler)

If no error-handler function is provided, the error handling semantics provided by HornetQ are used, which causes the offending step to be retried up to ten times before giving up.

8.5.5. Halting the pipeline for a particular message

If, in a step function, you determine that the data requires no further processing, you can halt that particular pipeline execution by returning a special flag symbol - immutant.pipeline/halt:

(require '[immutant.pipeline :as pl])

;; halt the pipeline at the second step, causing another-function to
;; not be called
(pl/pipeline "foo"
  function-that-does-something
  #(if (:some-done-condition %)
     pl/halt
     %)
  another-function)

8.5.6. Fanning out results to the next step

A common pipeline pattern is to have a step that turns a single input into a sequence of outputs, where each element of the sequence should continue down the pipeline individually. This can be done manually by publishing each element to *next-step* and returning =halt, or you can use the facilities provided by Immutant.

There are two provided methods for fanning out a result. The first is the immutant.pipeline/fanout. It is used from within a step function as the last invoked s-exp:

(defn some-step-fn [data]
  (pl/fanout (seq-generating-fn data)))

The second method is to mark the step as a fanout step using the immutant.pipeline/step function:

(pl/pipeline "foo"
  (pl/step fanning-fn :fanout? true)
  some-other-fn)

Note that a pipeline that uses fanout cannot be correctly derefenced for a final value, since the deref will only get the first value to finish the pipeline.

8.5.7. Stopping the pipeline

When your application is undeployed, Immutant will automatically shut down the pipeline. If you need to stop the pipeline at runtime, use the immutant.pipeline/stop function:

(require '[immutant.pipeline :as pl])

(let [pipeline (pl/pipeline "foo" ...)]
  ...
  (pl/stop pipeline))
Immutant 1.x.incremental.1213