[mailboxes]

In this installment of our series on getting started with Immutant 2, we'll take a detailed look at the API of our library for messaging, and show a few examples of usage.

If you're coming from Immutant 1.x, you may notice that the artifact has been renamed (org.immutant/immutant-messaging is now org.immutant/messaging), and the API has changed a bit. We'll point out the notable API changes as we go.

The API

The messaging API is backed by HornetQ, which is an implementation of JMS. JMS provides two primary destination types: queues and topics. Queues represent point-to-point destinations, and topics publish/subscribe.

To use a destination, we need to get a reference to one via the queue or topic functions, depending on the type required. This will create the destination if it does not already exist. This is a bit different than the 1.x API, which provided a single start function for this, and determined the type of destination based on conventions around the provided name. In 2.x, we've removed those naming conventions.

Once we have a reference to a destination, we can operate on it with the following functions:

  • publish - sends a message to the destination
  • receive - receives a single message from the destination
  • listen - registers a function to be called each time a message arrives at the destination

If the destination is a queue, we can do synchronous messaging (request-response):

  • respond - registers a function that receives each request, and the returned value will be sent back to the requester
  • request - sends a message to the responder

Finally, to deregister listeners, responders, and destinations, we provide a single stop function. This is another difference from 1.x - the unlisten and stop functions have been collapsed to stop.

Some Examples

The following code fragments were tested against 2.x.incremental.133. You should follow the instructions in the getting started post to set up a project using Immutant 2.x, and add [org.immutant/messaging "2.x.incremental.133"] and [cheshire "5.3.1"] to the project dependencies (we'll be encoding some messages as JSON in our examples below, so we'll go ahead and add cheshire while we're at it). Then, fire up a REPL, and require the immutant.messaging namespace to follow along:

(require '[immutant.messaging :refer :all])

First, let's create a queue:

(queue "my-queue")

That will create the queue in the HornetQ broker for us. We'll need a reference to that queue to operate on it. Let's go ahead and store that reference in a var:

(def q (queue "my-queue"))

We can call queue any number of times - if the queue already exists, we're just grabbing a reference to it.

Now, let's register a listener on our queue. Let's just print every message we get:

(def listener (listen q println))

We can publish to that queue, and see that the listener gets called:

(publish q {:hi :there})

You'll notice that we're publishing a map there - we can publish pretty much any data structure as a message. By default, that message will be encoded using edn. We also support other encodings, namely: :clojure, :fressian, :json, and :text. We can choose a different encoding by passing an :encoding option to publish:

(publish q {:hi :there} :encoding :json)

Out of the box, we provide full support for the :clojure, :edn, and :text encodings. If you want to use :fressian or :json, you'll need to add org.clojure/data.fressian or cheshire to your dependencies to enable them, respectively.

We passed our options to publish as keyword arguments, but they can also be passed as a map:

(publish q {:hi :there} {:encoding :json})

This holds true for any of the messaging functions that take options.

We're also passing the destination reference to publish instead of the destination name. That's a departure from 1.x, where you could just pass the destination name. Since we no longer have conventions about how queues and topics should be named, we can no longer determine the type of the destination from the name alone.

We can deregister the listener by either passing it to stop or calling .close on it:

(stop listener)
;; identical to
(.close listener)

Now let's take a look at synchronous messaging. Let's create a new queue for this (you'll want to use a dedicated queue for each responder) and register a responder that just increments the request:

(def sync-q (queue "sync"))

(def responder (respond sync-q inc))

Then, we make a request, which returns a Future that we can dereference:

@(request sync-q 1)

The responder is just a fancy listener, and can be deregistered the same way as a listener.

That's not all...

That was just a brief introduction to the messaging API. There are features we've yet to cover (durable topic subscriptions, connection/session sharing, transactional sessions, remote connections), but it's getting late, so we'll save those for another time.

Try it out!

As always, we'd love to incorporate your feedback. Find us via our community page and join the fun!

Thanks to John Lillis for the image, used under CC BY-NC-ND