Chapter 7. Immutant Messaging
7.1. Introduction
The term "messaging" encompasses a large area of functionality. Messaging solutions are used to achieve loosely-coupled, asynchronous systems. The primary actors in a messaging-based system are messages, destinations, consumers, and producers. The broker mediates the relationships between the other actors.
7.1.1. HornetQ
Immutant encapsulates the JBoss HornetQ message broker. It is automatically available to you, with no additional configuration required to start the messaging service. HornetQ supports clustered messaging, which provides auto-discovery, load-balancing, and failover, among other things.
7.2. Destinations
A destination represents a rendezvous point where messages are exchanged. A message may be sent to a destination by one actor, and received from the destination by another.
There are two main types of destinations: queues (point-to-point) and topics (publish-subscribe). All destinations allow multiple actors to send messages. The type of destination determines how the message is routed. A queue delivers the message to a single recipient (possibly one of many candidates). And a topic delivers the message to all interested recipients. In both cases, the message producers have no direct knowledge of the message consumers.
7.2.1. Durable Topic Subscribers
Typically, messages published to a topic are only delivered to consumers connected to the topic at that time. But if a consumer identifies itself with a unique name, then the broker will accumulate messages for that client when it's disconnected and deliver them in order whenever the client reconnects.
You implicitly create a durable topic subscriber by passing the
:client-id option to the receive and listen functions,
described below. As long as you pass the same :client-id in
subsequent receive/listen calls, you'll receive every message
published to that topic, whether you were connected at the time or
not.
7.2.2. Deployment
Use the immutant.messaging/start function to define a messaging
destination. A simple naming convention designates the destination
as either a queue or a topic: if its name contains "queue",
it's a queue; if it contains "topic", it's a topic. If you
need to use queue or topic names that don't match the above
convention, you can wrap the name in a call to
immutant.messaging/as-queue or immutant.messaging/as-topic
respectively. If you do wrap a destination name, you'll need to
pass the wrapped version to any immutant.messaging function that
takes a destination name. It's common to separate sections of the
destination name with . or /, but is not required.
The start function accepts the following options:
| Option | Default | Description |
|---|---|---|
:durable | true | Whether messages persist across restarts |
:selector | nil | A JMS expression used to filter messages by their metadata/properties |
If a :selector is provided, then only messages with
metadata/properties matching that expression will be accepted for
delivery.
Some examples:
(require '[immutant.messaging :as msg]) ;; to start queues - these are all valid names (msg/start "/queue/work") (msg/start ".queue.play") (msg/start "queue.sing") (msg/start "dancequeued") ;; wrap an ambiguous queue name in as-queue (msg/start (msg/as-queue "no-q-word")) ;; to start topics - these are all valid names (msg/start "/topic/news") (msg/start "topic/infotainment") (msg/start ".topic") (msg/start "topicality") (msg/start "topicality") (msg/start "some.kinda.topic") ;; wrap an ambiguous topic name in as-topic (msg/start (msg/as-topic "no-t-word")) ;; only blue messages will be queued for delivery (msg/start "/queue/blue" :selector "color = 'blue'") ;; like this one... (msg/publish "/queue/blue" "success" :properties {:color "blue"})
Starting a destination is dynamic, and can occur anywhere in your
application code. You can invoke start anytime during the
lifecycle of your application.
While start has a complement, immutant.messaging/stop, you needn't
call it directly. It will be invoked when your application is
undeployed. And it's important to note that start is idempotent:
if an endpoint has already been started, likely by a cooperating
application, the call is effectively a no-op. Similarly, a call to
stop will silently fail if the endpoint is in use by any other
application.
7.2.3. Accessing Destinations via JMX
Messaging destinations have MBeans exposed via JMX using the HornetQ subsystem built into AS7. From there, you can see message counts, list & remove messages, and perform other operations. Two different MBeans are provided for each destination, each with slightly different operations. One is a JMS MBean, and the other is HornetQ specific.
For a given destination name (queue.example in this case), you
can access the MBeans with the following addresses:
# to access the JMS MBean org.hornetq:module=JMS,type=Queue,name="queue.example" # to access the HornetQ MBean org.hornetq:module=Core,type=Queue,address="jms.queue.queue.example",name="jms.queue.queue.example"
7.3. Messages
The unit of communication within a messaging system is a message. A message may either be simply a blob of octets, or it might have some higher-order, application-defined semantics. All messages include a set of headers, similar to email.
7.3.1. Publishing
Any component or client code that creates messages and gives them to the message broker for delivery is considered a producer. Generally speaking, the producer does not know the details of the destination or any of its consumers.
In Immutant, there is only one way to send a message, whether to a queue or a topic: via the immutant.messaging/publish function. It accepts two required parameters: the name of the destination and the message content, which can be just about anything.
If the message has any Clojure metadata attached, it will be passed
as the JMS Message's properties, the names of which are subject to
certain naming restrictions (they must be valid Java identifiers)
since they may be used as expressions in selectors (see below).
It's also possible to pass properties via the :properties option,
which will override any matching keys in the payload metadata, if
present.
The publish function accepts the following options:
| Option | Default | Description |
|---|---|---|
:encoding | :edn | One of :clojure, :edn, :json, or :text |
:priority | 4 | An integer (0-9) or one of :low, :normal, :high and :critical which correspond to 0, 4, 7 and 9, respectively |
:ttl | 0 | An integer greater than 0, indicating the number of milliseconds after which the message is discarded if not consumed. A 0 indicates that the message should be held indefinitely. |
:persistent | true | If true, undelivered messages survive restarts. |
:properties | {} | A hash of arbitrary metadata upon which JMS selector expressions may be constructed to filter received messages |
:correlation-id | nil | Used to set the JMSCorrelationID (see setJMSCorrelationID) |
:host | nil | A remote HornetQ host to connect to. |
:port | nil, or 5445 if :host is set | A remote HornetQ port to connect to. Requires :host to be set. |
:username | nil | The username to authenticate the connection with (if the broker has authentication enabled). Requires :password to be set. |
:password | nil | The password to authenticate the connection with (if the broker has authentication enabled). Requires :username to be set. |
The :json and :edn encodings are useful when the message
consumers aren't written in Clojure. For example, TorqueBox Ruby
processors will automatically convert edn-encoded messages
generated by a Clojure function into their analogous Ruby data
structures, so as long as you limit the content of your messages to
standard collections and types, they are transparently
interoperable between Clojure and Ruby in either direction.
7.3.1.1. Some Examples
;; A simple string (msg/publish "/queue/work" "simple string") ;; Notify everyone something interesting just happened (msg/publish "topic/news" {:event "VISIT" :url "/sales-inquiry"}) ;; Move this message to the front of the line (msg/publish "/queue/work" some-message :priority :high :ttl 1000) ;; Make messages as complex as necessary (msg/publish "/queue/work" {:a "b" :c [1 2 3 {:foo 42}]}) ;; Make messages consumable by a Ruby app (msg/publish "/queue/work" {:a "b" :c [1 2 3 {:foo 42}]} :encoding :json) ;; Publish to a remote broker (msg/publish "queue.remote-work" "a message" :host "foo.example.com" :port 5445) ;; The received message's metadata => {:foo 42, :bar 1} (msg/publish q (with-meta msg {:foo 42 :bar 0}) :properties {:bar 1})
7.3.2. Receiving
Any component that waits for messages to be delivered to it by the message broker is consider a consumer. Typically, a consumer is unaware of the producer or any other consumers.
If the published message payload contains metadata, the received
message should have it, too, transferred in the form of JMS
properties, subject to any overridden values passed in the
:properties option (see above). If the payload cannot accept
metadata, the message properties can be converted to a convenient
Clojure hash using immutant.messaging.core/get-properties.
Immutant features three functions for consuming messages.
- immutant.messaging/receive Blocks the caller until a message arrives and returns the decoded message
- immutant.messaging/message-seq Creates a lazy sequence of messages
- immutant.messaging/listen Register a handler function that will receive the decoded message when it arrives
Both receive and message-seq expect the destination name as the
first parameter, and optionally, the following key/value pairs:
| Option | Default | Description |
|---|---|---|
:timeout | 10000 | An expiration in milliseconds, after which nil is returned; a value of 0 means wait forever, a value of -1 means don't wait at all |
:selector | nil | A JMS expression used to filter messages according to the values of arbitrary :properties |
:decode? | true | If true, the decoded message body is returned. Otherwise, the javax.jms.Message object is returned |
:client-id | nil | Identifies a durable topic subscriber; ignored for queues |
:host | nil | A remote HornetQ host to connect to. |
:port | nil, or 5445 if :host is set | A remote HornetQ port to connect to. Requires :host to be set. |
:username | nil | The username to authenticate the connection with (if the broker has authentication enabled). Requires :password to be set. |
:password | nil | The password to authenticate the connection with (if the broker has authentication enabled). Requires :username to be set. |
By default, the dynamic variable, clojure.core/*read-eval* is set
to false when decoding messages. To override this, you should set
:decode? to false and bind *read-eval* to true before passing
the encoded message to
immutant.messaging.codecs/decode-with-metadata yourself.
For more details on message selectors, see javax.jms.Message.
The listen function takes two parameters: the destination name
and a function accepting one parameter which will be applied to any
received message. All of the above options except :timeout are
supported, plus listen also accepts the following:
| Option | Default | Description |
|---|---|---|
:concurrency | 1 | The maximum number of listening threads that can simultaneouly call the function |
:retry-interval | The period in milliseconds between subsequent reconnection attempts. | |
:retry-interval-multiplier | A multiplier to apply to the time since the last retry to compute the time to the next retry. | |
:max-retry-interval | 2000 | The max retry interval that will be used. |
:reconnect-attempts | 0 | Total number of reconnect attempts to make before giving up and shutting down. (-1: unlimited) |
listen is asynchronous - if you need to synchronize on its
completion, you should deref the result.
By default, message handlers are transactional, so the function invoked in response to a message effectively demarcates a transaction that will be automatically committed if no exceptions are raised in the handler, and otherwise rolled back.
Any messages published within the handler automatically become part of its transaction, by default. So they won't be delivered until that transaction commits. To override this behavior, wrap your handler inside the immutant.xa.transaction/not-supported macro.
See Distributed Transactions for more details.
7.3.2.1. Some Examples
;; Wait on a task (let [task (msg/receive "/queue/work")] (perform task)) ;; Case-sensitive work queues? (msg/listen ".queue.lower" #(msg/publish "/queue/upper" (.toUpperCase %))) ;; Listen to a remote queue (msg/listen "queue/remote" #(do-someting %) :host "foo.example.com" :port 5445) ;; Contrived laziness (let [messages (message-seq queue)] (doseq [i (range 4)] (publish queue i)) (= (range 4) (take 4 messages)))
The complement of listen is immutant.messaging/unlisten, to
which you pass the value returned by listen to cease the flow of
messages to that handler.
Queues and topics behave differently when you map a handler to the
same destination. For queues, the current handler, if any, is
replaced, effectively making the listen call idempotent.
Multiple listen calls for topics are idempotent, too, but only
if the parameters are exactly the same. If you call listen for a
certain topic with different handlers, they are additive. For
example:
(listen "queue" #(println (inc %))) (listen "queue" #(println (dec %))) (publish "queue" 42) => 41 (listen "topic" #(println (inc %))) (listen "topic" #(println (dec %))) (publish "topic" 42) => 43 => 41
Note that even if the contents within #() are identical, the actual
functions are still different. If you want idempotent topic
listeners, you should pass the same var to each. And even then,
during development, you may inadvertently redefine the var and
create multiple, redundant versions of the topic listener. Hijinks
may ensue.
7.3.2.2. Accessing Listeners via JMX
Each message listener has a MBean exposed via JMX. Currently, you can only stop and start the listener from the MBean.
The MBean name is derived from URL-encoded concatenation of
destination name and the :selector, if any. If the destination is
a topic, the :client-id and the handler function name will be
included as well.
The names are so gross-looking that I'm loathe to include any examples at this time.
7.4. Request/Response
Immutant also provides an implementation of the request/response pattern
for synchronous work distribution. This feature is provided by two
cleverly named functions: request and respond. Since they leverage
the existing messaging subsystem, the work is automatically distributed
across multiple workers within the same JVM or across multiple nodes if
in a cluster.
7.4.1. Request
The immutant.messaging/request function takes a queue, a message, and an
optional list of options. It publishes the message to the queue, marking
it as a synchronous message and returns a delay that will receive the
response from the worker initiated by the respond function. It accepts
the same options as publish.
7.4.2. Respond
The immutant.messaging/respond method takes a queue, a function, and an
optional list of options. It sets up a listener (via the listen
function) that applies the given function to any received message and publishes
the result back to the queue for the delay returned by request to receive.
It accepts the same options as listen.
7.4.3. Some Examples
A basic example:
(require '[immutant.messaging :as msg]) ;; setup a responder (msg/respond "/queue/work" (partial apply +)) ;; send a request (let [result (msg/request "/queue/work" [1 2 3])] (println (deref result 1000 nil)) ;; => 6
An example of using properties and selectors to segment work on the same queue:
(require '[immutant.messaging :as msg]) ;; respond to 'add' messages (msg/respond "/queue/work" (partial apply +) :selector "operation='add'") ;; respond to 'multiply' messages (msg/respond "/queue/work" (partial apply *) :selector "operation='multiply'") (deref (msg/request "/queue/work" [1 2 3 4] :properties {"operation" "add"}) 1000 nil) ;; => 9 (deref (msg/request "/queue/work" [1 2 3 4] :properties {"operation" "multiply"}) 1000 nil) ;; => 24
7.5. Pipelines
Immutant provides a tool called a pipeline. A pipeline is basically a composition of functions (steps), where each function is passed the result of the previous function, dereferenced if needed. It is built on top of the messaging subsystem, allowing each step to have multiple processing threads, and to be automatically load balanced across a cluster. The pipeline functions are available via the immutant.pipeline namespace.
Since messaging is used to pass the data between step functions, the data has to be in a format that can be encoded as clojure via pr. See the above note about encodings.
Pipeline support is currently in an alpha state, and may be subject to massive changes.
7.5.1. Creating a pipeline
You create a pipeline with the immutant.pipeline/pipeline
function. The pipeline function takes a unique (within the scope
of the application) name, one or more single-arity functions, and
optional keyword argument options, returning a function that acts
as an entry point into the pipeline:
(require '[immutant.pipeline :as pl]) (defonce foo-pipeline (pl/pipeline "foo" function-that-does-something another-function))
7.5.1.1. Pipeline options
pipeline can take the following options, passed as keyword
arguments after the step functions:
| Option | Default | Description |
|---|---|---|
:concurrency | 1 | The number of threads to use for each step. Can be overridden on a per-step basis (see below). |
:error-handler | nil | A function to call when a step function throws an exception. Receives the exception and the data passed to the step function. Can be overriden on a per-step basis (see below). |
:result-ttl | 1 hour | The time-to-live for the final pipeline result, in ms. Set to 0 for "forever", -1 to disable returning the result via a delay |
:step-deref-timeout | 10 seconds | The amount of time to wait when dereferencing the result of a step that returns a delay, in ms |
(require '[immutant.pipeline :as pl]) (defonce foo-pipeline (pl/pipeline "foo" function-that-does-something another-function :concurrency 2))
7.5.1.2. Per-step options
Each function can optionally be wrapped with metadata via the immutant.pipeline/step function, providing settings for how that particular function is handled within the pipeline:
| Option | Default | Description |
|---|---|---|
:name | the index of the fn | A name to use for the step |
:concurrency | 1 | The number of threads to use, overriding the pipeline setting |
:error-handler | nil | An error handler function that can override the pipeline setting |
:step-deref-timeout | 10 seconds | The amount of time to wait when dereferencing the result of a step that returns a delay, in ms. Overrides the pipeline setting |
(require '[immutant.pipeline :as pl]) (pl/pipeline "foo" function-that-does-something (pl/step another-function :concurrency 10) :concurrency 2)
7.5.2. Using a pipeline
The function returned by pipeline acts as an entry function, placing its argument onto the pipeline when called, returning a delay around the end of the pipeline (by default):
(require '[immutant.pipeline :as pl]) (defonce foo-pipeline (pl/pipeline "foo" function-that-does-something another-function)) (deref (foo-pipeline {:ham :biscuit}) 10000 :timeout!)
Pipelines store the result of an execution by default, allowing it
to be retrieved by dereferencing the delay returned by the
pipeline-fn call. To prevent results that may not be retrieved
from being stored indefinitely, they have a default time-to-live of
1 hour. You can control the retention time by passing a
:result-ttl option to pipeline. It is specified in
milliseconds, with a value of 0 indicating that the result should
be saved indefinitely, and -1 indicating that the results should
be discarded immediately. If you set the :result-ttl to -1, any
attempt to dereference the returned delay will raise an error.
If the result from a step is reference, it will be dereferenced
before being passed to the next step. This allows you to use a
pipeline within a pipeline. The amount of time to wait for the
deref is controlled by the :step-deref-timeout option, and
defaults to 10 seconds. Setting it to 0 will cause it to wait
forever, which will tie up a thread
(require '[immutant.pipeline :as pl]) (defonce pipeline-x (pl/pipeline :x function-that-does-something another-function)) (defonce pipeline-y (pl/pipeline :y yet-another-function pipeline-x and-another :step-deref-timeout 60000))
By default, the pipeline entry function places its argument onto
the front of the pipeline. You can insert the data into the
pipeline at any step by passing a :step keyword argument. The
step name would be the name you provided as an option for that step
using the step function, or the index of the step in the list of
steps if you haven't provided a name:
(require '[immutant.pipeline :as pl]) (defonce foo-pipeline (pl/pipeline "foo" function-that-does-something (pl/step another-function :name :another) a-third-function)) ;; insert at head (foo-pipeline {:ham :biscuit}) ;; skip the first step (foo-pipeline {:ham :biscuit} :step :another) ;; insert at the last step (foo-pipeline {:ham :biscuit} :step 2)
7.5.3. Available bindings
The following vars have bound values inside a step or error-handler invocation:
| Var | Value |
|---|---|
*pipeline* | The pipeline entry function for the currently active pipeline. |
*current-step* | The name of the currently executing step. |
*next-step* | The name of the next step in the pipeline. |
7.5.4. Error handling
When an exception occurs in a step function, an error-handler function will be invoked if provided for the pipeline or for the particular step. This function will be passed the exception and the original data passed to the step function, and have all of the above bindings available:
;; a naive error handler that sleeps then retries on a network error, ;; logging and discarding otherwise (defn error-handler [ex data] (if (instance? NoRouteToHostException ex) (do (Thread/sleep 1000) (pl/*pipeline* data :step pl/*current-step*)) (println "ERROR:" ex))) (require '[immutant.pipeline :as pl]) (pl/pipeline "foo" connects-to-foo connects-to-bar :error-handler error-handler)
If no error-handler function is provided, the error handling semantics provided my HornetQ are used, which causes the offending step to be retried up to ten times before giving up.
7.5.5. Halting the pipeline for a particular message
If, in a step function, you determine that the data requires no further processing, you can halt that particular pipeline execution by returning a special flag symbol - immutant.pipeline/halt:
(require '[immutant.pipeline :as pl]) ;; halt the pipeline at the second step, causing another-function to ;; not be called (pl/pipeline "foo" function-that-does-something #(if (:some-done-condition %) pl/halt %) another-function)
7.5.6. Stopping the pipeline
When your application is undeployed, Immutant will automatically shut down the pipeline. If you need to stop the pipeline at runtime, use the immutant.pipeline/stop function:
(require '[immutant.pipeline :as pl]) (let [pipeline (pl/pipeline "foo" ...)] ... (pl/stop pipeline))