Chapter 8. Immutant Messaging
8.1. Introduction
The term "messaging" encompasses a large area of functionality. Messaging solutions are used to achieve loosely-coupled, asynchronous systems. The primary actors in a messaging-based system are messages, destinations, consumers, and producers. The broker mediates the relationships between the other actors.
8.1.1. HornetQ
Immutant encapsulates the JBoss HornetQ message broker. It is automatically available to you, with no additional configuration required to start the messaging service. HornetQ supports clustered messaging, which provides auto-discovery, load-balancing, and failover, among other things.
8.2. Destinations
A destination represents a rendezvous point where messages are exchanged. A message may be sent to a destination by one actor, and received from the destination by another.
There are two main types of destinations: queues (point-to-point) and topics (publish-subscribe). All destinations allow multiple actors to send messages. The type of destination determines how the message is routed. A queue delivers the message to a single recipient (possibly one of many candidates). And a topic delivers the message to all interested recipients. In both cases, the message producers have no direct knowledge of the message consumers.
8.2.1. Durable Topic Subscribers
Typically, messages published to a topic are only delivered to consumers connected to the topic at that time. But if a consumer identifies itself with a unique name, then the broker will accumulate messages for that client when it's disconnected and deliver them in order whenever the client reconnects.
You implicitly create a durable topic subscriber by passing the
:client-id
option to the receive
and listen
functions,
described below. As long as you pass the same :client-id
in
subsequent receive/listen
calls, you'll receive every message
published to that topic, whether you were connected at the time or
not.
8.2.2. Deployment
Use the immutant.messaging/start function to define a messaging
destination. A simple naming convention designates the destination
as either a queue or a topic: if its name contains "queue",
it's a queue; if it contains "topic", it's a topic. If you
need to use queue or topic names that don't match the above
convention, you can wrap the name in a call to
immutant.messaging/as-queue or immutant.messaging/as-topic
respectively. If you do wrap a destination name, you'll need to
pass the wrapped version to any immutant.messaging
function that
takes a destination name. It's common to separate sections of the
destination name with .
or /
, but is not required.
In addition to the destination name, the start
function accepts
the following options when starting a queue:
Option | Default | Description |
---|---|---|
:durable | true | Whether messages persist across restarts |
:selector | nil | A JMS expression used to filter messages by their metadata/properties |
If a :selector
is provided, then only messages with
metadata/properties matching that expression will be accepted for
delivery.
Some examples:
(require '[immutant.messaging :as msg]) ;; to start queues - these are all valid names (msg/start "/queue/work") (msg/start ".queue.play") (msg/start "queue.sing") (msg/start "dancequeued") ;; wrap an ambiguous queue name in as-queue (msg/start (msg/as-queue "no-q-word")) ;; to start topics - these are all valid names (msg/start "/topic/news") (msg/start "topic/infotainment") (msg/start ".topic") (msg/start "topicality") (msg/start "some.kinda.topic") ;; wrap an ambiguous topic name in as-topic (msg/start (msg/as-topic "no-t-word")) ;; only blue messages will be queued for delivery (msg/start "/queue/blue" :selector "color = 'blue'") ;; like this one... (msg/publish "/queue/blue" "success" :properties {:color "blue"})
Starting a destination is dynamic, and can occur anywhere in your
application code. You can invoke start
anytime during the
lifecycle of your application.
While start
has a complement, immutant.messaging/stop, you
needn't call it directly. It will be invoked when your application
is undeployed. And it's important to note that start
is
idempotent: if an destination has already been started, likely by a
cooperating application, the call is effectively a
no-op. Similarly, a call to stop
will silently fail if the
destination is in use by any other application. Note that in cases
where a destination is started by another application, you will
still need to start
it locally for it to be visible to other
messaging functions within your application.
8.2.3. Setting Advanced HornetQ Options
HornetQ provides some advanced settings that can be applied to a
particular destination or destinations with names matching a
string. Typically, these options are set via XML, but HornetQ does
provide an API for setting them at runtime. Immutant wraps that API
with immutant.messaging.hornetq/set-address-options, and,
as a convenience, allows any of these settings to be passed to
immutant.messaging/start
as well.
The available settings are:
Option | Default | Description |
---|---|---|
:address-full-message-policy | :page | Specifies what should happen when an address reaches :max-size-bytes in undelivered messages. One of: :block , :drop , :fail , or :page . See below for more details |
:dead-letter-address | jms.queue.DLQ | If set, any messages that fail to deliver to their original destination will be delivered here. More info |
:expiry-address | jms.queue.ExpiryQueue | If set, any messages with a :ttl that expires before delivery will be delivered here. More info |
:expiry-delay | -1 | If > -1, this value (in ms) is used as the default :ttl for messages that don't have a :ttl > 0 set. |
:last-value-queue | false | If true, only the most recent message for a last-value property will be retained. More info |
:max-delivery-attempts | 10 | The number of times delivery will be attempted for a message before giving up. If :dead-letter-address is set, the message will be delivered there, or removed otherwise. More info |
:max-size-bytes | 20MB | The maximum size (in bytes) of retained messages on an address before :address-full-message-policy is applied. More info |
:page-cache-max-size | 5 | HornetQ will keep up to this many page files in memory to optimize IO. More info |
:page-size-bytes | 10MB | The size (in bytes) of the page files created when paging. More info |
:redelivery-delay | 0 | Specifies the delay (in ms) between redelivery attempts. More info |
:redelivery-multiplier | 1.0 | Controls the backoff for redeliveries. The delay between redelivery attempts is calculated as :redelivery-delay * (:redelivery-multiplier ^ attempt-count) |
:redistribution-delay | 1000 | Specifies the delay (in ms) to wait before redistributing messages from a node in a cluster to other nodes when the queue no longer has consumers on the current node. More info |
:send-to-dla-on-no-route | false | If true, any message that can't be routed to its destination will be sent to :dead-letter-address. |
8.2.3.1. Options for :address-full-message-policy
The :address-full-message-policy
takes one of the following four
policy specifiers:
:block
- publish calls will block until the current size drops below:max-size-bytes
:drop
- new messages are silently dropped:fail
- new messages are dropped and an exception is thrown on publish:page
- new messages will be paged to disk
See the HornetQ documentation for more information about these policies.
Some Examples, including HornetQ wildcard matching:
;; passing options to start (msg/start "queue.foo" :dead-letter-address "queue.dead-letters" :max-delivery-attempts 20) ;; setting options for an existing queue (hornetq/set-address-options "queue.foo" {:dead-letter-address "queue.dead-letters" :max-delivery-attempts 20}) ;; setting options for all queues of a certain prefix ;; will match: queue.notifications.ham, queue.notifications.biscuits (hornetq/set-address-options "queue.notifications.*" {:dead-letter-address "queue.dead-letters" :max-delivery-attempts 20}) ;; match *all* destinations (hornetq/set-address-options "#" {:dead-letter-address "queue.dead-letters" :max-delivery-attempts 20})
8.2.4. Accessing Destinations Controllers
Each messaging destination has associated controllers that can be used to see message counts, list & remove messages, and perform other operations. Two different controllers are provided by HornetQ for queues, and one for topics, each with slightly different controls. All are available via JMX or from immutant.messaging.hornetq/destination-controller.
For a given destination name (queue.example
and topic.example
in this case), you can access the MBeans via JMX with the following
addresses:
# to access the JMS Queue MBean org.hornetq:module=JMS,type=Queue,name="queue.example" # to access the JMS Topic MBean org.hornetq:module=JMS,type=Topic,name="topic.example" # to access the HornetQ 'core' Queue MBean org.hornetq:module=Core,type=Queue,address="jms.queue.queue.example",name="jms.queue.queue.example"
or via code with:
(require '[immutant.messaging.hornetq :as hq]) ;; for the JMS queue controller (hq/destination-controller "queue.example") ;; for the JMS topic controller (hq/destination-controller "queue.topic") ;; for the HornetQ 'core' queue controller (hq/destination-controller "queue.example" :core)
The returned controller depends on the type of the given
destination and, for queues, the requested control-type (which
defaults to :jms
):
destination | control-type | controller |
---|---|---|
Queue | :jms | org.hornetq.api.jms.management.JMSQueueControl |
Queue | :core | org.hornetq.core.management.impl.QueueControl |
Topic | ignored | org.hornetq.api.jms.management.TopicControl |
8.3. Messages
The unit of communication within a messaging system is a message. A message may either be simply a blob of octets, or it might have some higher-order, application-defined semantics. All messages include a set of headers, similar to email.
8.3.1. Publishing
Any component or client code that creates messages and gives them to the message broker for delivery is considered a producer. Generally speaking, the producer does not know the details of the destination or any of its consumers.
In Immutant, there is only one way to send a message, whether to a queue or a topic: via the immutant.messaging/publish function. It accepts two required parameters: the name of the destination and the message content, which can be just about anything.
If the message has any Clojure metadata attached, it will be passed
as the JMS Message's properties, the names of which are subject to
certain naming restrictions (they must be valid Java identifiers)
since they may be used as expressions in selectors (see below).
It's also possible to pass properties via the :properties
option,
which will override any matching keys in the payload metadata, if
present.
The publish
function accepts the following options:
Option | Default | Description |
---|---|---|
:encoding | :edn | One of :clojure , :edn , :fressian , :json , or :text |
:priority | 4 | An integer (0-9) or one of :low , :normal , :high and :critical which correspond to 0, 4, 7 and 9, respectively |
:ttl | 0 | An integer greater than 0, indicating the number of milliseconds after which the message is discarded if not consumed. A 0 indicates that the message should be held indefinitely. |
:persistent | true | If true, undelivered messages survive restarts (if the destination is durable). |
:properties | {} | A map of arbitrary metadata upon which JMS selector expressions may be constructed to filter received messages |
:correlation-id | nil | Used to set the JMSCorrelationID (see setJMSCorrelationID) |
:host | nil | A remote HornetQ host to connect to. |
:port | nil , or 5445 if :host is set | A remote HornetQ port to connect to. Requires :host to be set. |
:username | nil | The username to authenticate the connection with (if the broker has authentication enabled). Requires :password to be set. |
:password | nil | The password to authenticate the connection with (if the broker has authentication enabled). Requires :username to be set. |
The :json
and :edn
encodings are useful when the message
consumers aren't written in Clojure. For example, TorqueBox Ruby
processors will automatically convert edn-encoded messages
generated by a Clojure function into their analogous Ruby data
structures, so as long as you limit the content of your messages to
standard collections and types, they are transparently
interoperable between Clojure and Ruby in either direction.
8.3.1.1. Some Examples
;; A simple string (msg/publish "/queue/work" "simple string") ;; Notify everyone something interesting just happened (msg/publish "topic/news" {:event "VISIT" :url "/sales-inquiry"}) ;; Move this message to the front of the line (msg/publish "/queue/work" some-message :priority :high :ttl 1000) ;; Make messages as complex as necessary (msg/publish "/queue/work" {:a "b" :c [1 2 3 {:foo 42}]}) ;; Make messages consumable by a Ruby app (msg/publish "/queue/work" {:a "b" :c [1 2 3 {:foo 42}]} :encoding :json) ;; Publish to a remote broker (msg/publish "queue.remote-work" "a message" :host "foo.example.com" :port 5445) ;; The received message's metadata => {:foo 42, :bar 1} (msg/publish q (with-meta msg {:foo 42 :bar 0}) :properties {:bar 1})
8.3.2. Receiving
Any component that waits for messages to be delivered to it by the message broker is consider a consumer. Typically, a consumer is unaware of the producer or any other consumers.
If the published message payload contains metadata, the received
message should have it, too, transferred in the form of JMS
properties, subject to any overridden values passed in the
:properties
option (see above). If the payload cannot accept
metadata, the message properties can be converted to a convenient
Clojure map using immutant.messaging.core/get-properties.
Immutant features three functions for consuming messages.
- immutant.messaging/receive Blocks the caller until a message arrives and returns the decoded message
- immutant.messaging/message-seq Lazily invokes
receive
to create a lazy sequence of messages - immutant.messaging/listen Registers a handler function that will receive the decoded message when it arrives
Both receive
and message-seq
expect the destination name as the
first parameter, and optionally, the following key/value pairs:
Option | Default | Description |
---|---|---|
:timeout | 10000 | An expiration in milliseconds, after which the timeout-val is returned; a value of 0 means wait forever, a value of -1 means don't wait at all |
:timeout-val | nil | The value to return when a timeout occurs. Also returned when a timeout of -1 is specified, and no message is available |
:selector | nil | A JMS expression used to filter messages according to the values of arbitrary :properties |
:decode? | true | If true, the decoded message body is returned. Otherwise, the javax.jms.Message object is returned |
:client-id | nil | Identifies a durable topic subscriber; ignored for queues |
:host | nil | A remote HornetQ host to connect to. |
:port | nil , or 5445 if :host is set | A remote HornetQ port to connect to. Requires :host to be set. |
:username | nil | The username to authenticate the connection with (if the broker has authentication enabled). Requires :password to be set. |
:password | nil | The password to authenticate the connection with (if the broker has authentication enabled). Requires :username to be set. |
By default, the dynamic variable, clojure.core/*read-eval* is set
to false when decoding messages. To override this, you should set
:decode?
to false and bind *read-eval*
to true before passing
the encoded message to
immutant.messaging.codecs/decode-with-metadata yourself.
For more details on message selectors, see javax.jms.Message.
The listen
function takes two parameters: the destination name
and a function accepting one parameter which will be applied to any
received message. All of the above options for receive
except
:timeout
are supported, plus listen
also accepts the following:
Option | Default | Description |
---|---|---|
:concurrency | 1 | The maximum number of listening threads that can simultaneouly call the function |
:xa | true | Whether the handler demarcates an XA transaction |
:retry-interval | The period in milliseconds between subsequent reconnection attempts. | |
:retry-interval-multiplier | A multiplier to apply to the time since the last retry to compute the time to the next retry. | |
:max-retry-interval | 2000 | The max retry interval that will be used. |
:reconnect-attempts | 0 | Total number of reconnect attempts to make before giving up and shutting down. (-1: unlimited) |
listen
is asynchronous; if you need to synchronize on the
completion of its initialization, you should deref the result.
Note: :concurrency
determines the number of consumers connected
to the destination and thus you'll rarely want a concurrency
greater than 1
for topics since this will cause you to process
duplicate messages.
8.3.2.1. Transactional by default
By default, message handlers are transactional, so the function invoked in response to a message effectively demarcates a transaction that will be automatically committed if no exceptions are raised, and otherwise rolled back.
Any messages published within the handler automatically become part of its transaction. So they won't be delivered until that transaction commits, i.e. the handler runs to completion successfully. Operations on Immutant caches and datasources work the same way when called from within a handler.
To override this behavior, set the :xa
option to false when
invoking listen
. You should probably do this for any handlers
you expect to take a long time to complete (>1 minute). By
default, a reaper process will abort any transaction taking longer
than 5 minutes.
For finer-grained control, another option is to wrap any operations outside the scope of your handler's transaction inside a call to immutant.xa.transaction/not-supported.
See Distributed Transactions for more details.
8.3.2.2. Some Examples
;; Wait on a task (let [task (msg/receive "/queue/work")] (perform task)) ;; Case-sensitive work queues? (msg/listen ".queue.lower" #(msg/publish "/queue/upper" (.toUpperCase %))) ;; Listen to a remote queue (msg/listen "queue/remote" #(do-someting %) :host "foo.example.com" :port 5445) ;; Contrived laziness (let [messages (message-seq queue)] (doseq [i (range 4)] (publish queue i)) (= (range 4) (take 4 messages)))
The complement of listen
is immutant.messaging/unlisten, to
which you pass the value returned by listen
to cease the flow of
messages to that handler. Note that unlisten
will be called for
you automatically when your application is undeployed.
Queues and topics behave differently when you map a handler to the
same destination. For queues, the current handler, if any, is
replaced, effectively making the listen
call idempotent.
Multiple listen
calls for topics are idempotent, too, but only
if the parameters are exactly the same. If you call listen
for a
certain topic with different handlers, they are additive. For
example:
(listen "queue" #(println (inc %))) (listen "queue" #(println (dec %))) (publish "queue" 42) => 41 (listen "topic" #(println (inc %))) (listen "topic" #(println (dec %))) (publish "topic" 42) => 43 => 41
Note that even if the contents within #()
are identical, the
actual anonymous functions are still different objects. If you
want idempotent topic listeners, you should pass the same var to
each. And even then, during development, you may inadvertently
redefine the var and create multiple, redundant versions of the
topic listener. Hijinks may ensue.
8.3.2.3. Accessing Listeners via JMX
Each message listener has a MBean exposed via JMX. Currently, you can only stop and start the listener from the MBean.
The MBean name is derived from URL-encoded concatenation of
destination name and the :selector
, if any. If the destination is
a topic, the :client-id
and the handler function name will be
included as well.
The names are so gross-looking that we're loathe to include any examples at this time.
8.3.3. Connections and Sessions
Each of the aforementioned functions requires a JMS Connection and
a JMS Session. By default, new instances will be created each time
you call receive
, publish
, or listen
. For the latter, this is
not of much concern, since the listener's connection will remain
open for its lifetime, but if you're repeatedly calling receive
or publish
in the same thread, a JMS Connection is being
wastefully opened and closed with each call. In that case, you
should use the immutant.messaging/with-connection macro to
establish a single connection used by any messaging function called
within its body. Additionally, any options you pass to
with-connection
will be used as default values for the options
relevant to the messaging calls in its body.
For example, each of the 102 calls in the following block will use
the same JMS Connection. And all of the published messages except
the last one will use the :json
encoding, which doesn't handle
clojure keywords properly.
(with-connection {:encoding :json} (dotimes [x 100] (publish "queue.question" (assoc payload :x x))) (publish "queue.question" :done, :encoding :clojure) (receive "queue.answer"))
Within the body of with-connection
you can call the session
function to access the JMS Session. For example:
(with-connection {} (let [msg (.createBytesMessage (immutant.messaging.core/session))] (.writeBytes msg (.getBytes "foo")) (publish somewhere msg)))
It's also possible to manage connections yourself and use them by
setting the :connection
option, but of course you're then
responsible for starting and closing them when you're done. Here's
an example:
(with-open [c (immutant.messaging.core/create-connection {})] (.start c) (with-connection {:connection c} (publish wherever whatever)))
8.3.4. A word about performance
Though HornetQ is capable of being very fast, Immutant's default
settings are conservative, trading some performance for data
integrity, e.g. deliver-once guarantees of durable messages with
minimal risk of message loss. HornetQ offers many recommendations for tuning performance, but only a few of those settings are
exposed through the Immutant messaging namespace, e.g. :durable
,
:persistent
, :concurrency
, and :xa
.
Probably the biggest impact will come from re-using connections
via with-connection
described above, disabling :xa
in your
listeners when not needed, and increasing their :concurrency
setting from its default of 1.
Other settings will need to go within the <hornetq-server>
element of
$IMMUTANT_HOME/jboss/standalone/configuration/standalone[-ha].xml
8.4. Request/Response
Immutant also provides an implementation of the request/response pattern
for synchronous work distribution. This feature is provided by two
cleverly named functions: request
and respond
. Since they leverage
the existing messaging subsystem, the work is automatically distributed
across multiple workers within the same JVM or across multiple nodes if
in a cluster.
8.4.1. Request
The immutant.messaging/request function takes a queue, a message, and an
optional list of options. It publishes the message to the queue, marking
it as a synchronous message and returns a delay that will receive the
response from the worker initiated by the respond
function. It accepts
the same options as publish
.
8.4.2. Respond
The immutant.messaging/respond method takes a queue, a function, and an
optional list of options. It sets up a listener (via the listen
function) that applies the given function to any received message and publishes
the result back to the queue for the delay returned by request
to receive.
It accepts the same options as listen
.
8.4.3. Some Examples
A basic example:
(require '[immutant.messaging :as msg]) ;; setup a responder (msg/respond "/queue/work" (partial apply +)) ;; send a request (let [result (msg/request "/queue/work" [1 2 3])] (println (deref result 1000 nil)) ;; => 6
An example of using properties and selectors to segment work on the same queue:
(require '[immutant.messaging :as msg]) ;; respond to 'add' messages (msg/respond "/queue/work" (partial apply +) :selector "operation='add'") ;; respond to 'multiply' messages (msg/respond "/queue/work" (partial apply *) :selector "operation='multiply'") (deref (msg/request "/queue/work" [1 2 3 4] :properties {"operation" "add"}) 1000 nil) ;; => 9 (deref (msg/request "/queue/work" [1 2 3 4] :properties {"operation" "multiply"}) 1000 nil) ;; => 24
8.5. Pipelines
Immutant provides a tool called a pipeline. A pipeline is basically a composition of functions (steps), where each function is passed the result of the previous function, dereferenced if needed. It is built on top of the messaging subsystem, allowing each step to have multiple processing threads, and to be automatically load balanced across a cluster. The pipeline functions are available via the immutant.pipeline namespace.
Since messaging is used to pass the data between step functions, the data has to be in a format that can be encoded as clojure via pr. See the above note about encodings.
8.5.1. Creating a pipeline
You create a pipeline with the immutant.pipeline/pipeline
function. The pipeline
function takes a unique (within the scope
of the application) name, one or more single-arity functions, and
optional keyword argument options, returning a function that acts
as an entry point into the pipeline:
(require '[immutant.pipeline :as pl]) (defonce foo-pipeline (pl/pipeline "foo" function-that-does-something another-function))
8.5.1.1. Pipeline options
pipeline
can take the following options, passed as keyword
arguments after the step functions:
Option | Default | Description |
---|---|---|
:concurrency | 1 | The number of threads to use for each step. Can be overridden on a per-step basis (see below). |
:error-handler | nil | A function to call when a step function throws an exception. Receives the exception and the data passed to the step function. Can be overriden on a per-step basis (see below). |
:result-ttl | 1 hour | The time-to-live for the final pipeline result, in ms. Set to 0 for "forever", -1 to disable returning the result via a delay |
:step-deref-timeout | 10 seconds | The amount of time to wait when dereferencing the result of a step that returns a delay, in ms |
(require '[immutant.pipeline :as pl]) (defonce foo-pipeline (pl/pipeline "foo" function-that-does-something another-function :concurrency 2))
8.5.1.2. Per-step options
Each function can optionally be wrapped with metadata via the immutant.pipeline/step function, providing settings for how that particular function is handled within the pipeline:
Option | Default | Description |
---|---|---|
:name | the index of the fn | A name to use for the step |
:concurrency | 1 | The number of threads to use, overriding the pipeline setting |
:error-handler | nil | An error handler function, overriding the pipeline setting |
:step-deref-timeout | 10 seconds | The amount of time to wait when dereferencing the result of a step that returns a delay, in ms. Overrides the pipeline setting |
:fanout? | false | If true, the result of the step will be fanned out to the next step. See the fanout section for more details. |
(require '[immutant.pipeline :as pl]) (pl/pipeline "foo" function-that-does-something (pl/step another-function :concurrency 10) :concurrency 2)
8.5.2. Using a pipeline
The function returned by pipeline acts as an entry function, placing its argument onto the pipeline when called, returning a delay around the end of the pipeline (by default):
(require '[immutant.pipeline :as pl]) (defonce foo-pipeline (pl/pipeline "foo" function-that-does-something another-function)) (deref (foo-pipeline {:ham :biscuit}) 10000 :timeout!)
Pipelines store the result of an execution by default, allowing it
to be retrieved by dereferencing the delay returned by the
pipeline-fn call. To prevent results that may not be retrieved
from being stored indefinitely, they have a default time-to-live of
1 hour. You can control the retention time by passing a
:result-ttl
option to pipeline
. It is specified in
milliseconds, with a value of 0
indicating that the result should
be saved indefinitely, and -1
indicating that the results should
be discarded immediately. If you set the :result-ttl
to -1
, any
attempt to dereference the returned delay will raise an error.
If the result from a step is reference, it will be dereferenced
before being passed to the next step. This allows you to use a
pipeline within a pipeline. The amount of time to wait for the
deref
is controlled by the :step-deref-timeout
option, and
defaults to 10 seconds. Setting it to 0
will cause it to wait
forever, which will tie up a thread indefinitely.
(require '[immutant.pipeline :as pl]) (defonce pipeline-x (pl/pipeline :x function-that-does-something another-function)) (defonce pipeline-y (pl/pipeline :y yet-another-function pipeline-x and-another :step-deref-timeout 60000))
By default, the pipeline entry function places its argument onto
the front of the pipeline. You can insert the data into the
pipeline at any step by passing a :step
keyword argument. The
step name would be the name you provided as an option for that step
using the step function, or the index of the step in the list of
steps if you haven't provided a name:
(require '[immutant.pipeline :as pl]) (defonce foo-pipeline (pl/pipeline "foo" function-that-does-something (pl/step another-function :name :another) a-third-function)) ;; insert at head (foo-pipeline {:ham :biscuit}) ;; skip the first step (foo-pipeline {:ham :biscuit} :step :another) ;; insert at the last step (foo-pipeline {:ham :biscuit} :step 2)
8.5.3. Available bindings
The following vars have bound values inside a step or error-handler invocation:
Var | Value |
---|---|
*pipeline* | The pipeline entry function for the currently active pipeline. |
*current-step* | The name of the currently executing step. |
*next-step* | The name of the next step in the pipeline. |
8.5.4. Error handling
When an exception occurs in a step function, an error-handler function will be invoked if provided for the pipeline or for the particular step. This function will be passed the exception and the original data passed to the step function, and have all of the above bindings available:
;; a naive error handler that sleeps then retries on a network error, ;; logging and discarding otherwise (defn error-handler [ex data] (if (instance? NoRouteToHostException ex) (do (Thread/sleep 1000) (pl/*pipeline* data :step pl/*current-step*)) (println "ERROR:" ex))) (require '[immutant.pipeline :as pl]) (pl/pipeline "foo" connects-to-foo connects-to-bar :error-handler error-handler)
If no error-handler function is provided, the error handling semantics provided by HornetQ are used, which causes the offending step to be retried up to ten times before giving up.
8.5.5. Halting the pipeline for a particular message
If, in a step function, you determine that the data requires no further processing, you can halt that particular pipeline execution by returning a special flag symbol - immutant.pipeline/halt:
(require '[immutant.pipeline :as pl]) ;; halt the pipeline at the second step, causing another-function to ;; not be called (pl/pipeline "foo" function-that-does-something #(if (:some-done-condition %) pl/halt %) another-function)
8.5.6. Fanning out results to the next step
A common pipeline pattern is to have a step that turns a single
input into a sequence of outputs, where each element of the
sequence should continue down the pipeline individually. This can
be done manually by publishing each element to *next-step* and returning =halt
, or you can use the facilities provided by
Immutant.
There are two provided methods for fanning out a result. The first is the immutant.pipeline/fanout. It is used from within a step function as the last invoked s-exp:
(defn some-step-fn [data] (pl/fanout (seq-generating-fn data)))
The second method is to mark the step as a fanout step using the immutant.pipeline/step function:
(pl/pipeline "foo" (pl/step fanning-fn :fanout? true) some-other-fn)
Note that a pipeline that uses fanout cannot be correctly derefenced for a final value, since the deref will only get the first value to finish the pipeline.
8.5.7. Stopping the pipeline
When your application is undeployed, Immutant will automatically shut down the pipeline. If you need to stop the pipeline at runtime, use the immutant.pipeline/stop function:
(require '[immutant.pipeline :as pl]) (let [pipeline (pl/pipeline "foo" ...)] ... (pl/stop pipeline))