Messaging with The Deuce


In this installment of our series on getting started with Immutant 2, we'll take a detailed look at the API of our library for messaging, and show a few examples of usage.

If you're coming from Immutant 1.x, you may notice that the artifact has been renamed (org.immutant/immutant-messaging is now org.immutant/messaging), and the API has changed a bit. We'll point out the notable API changes as we go.


The messaging API is backed by HornetQ, which is an implementation of JMS. JMS provides two primary destination types: queues and topics. Queues represent point-to-point destinations, and topics publish/subscribe.

To use a destination, we need to get a reference to one via the queue or topic functions, depending on the type required. This will create the destination if it does not already exist. This is a bit different than the 1.x API, which provided a single start function for this, and determined the type of destination based on conventions around the provided name. In 2.x, we've removed those naming conventions.

Once we have a reference to a destination, we can operate on it with the following functions:

  • publish - sends a message to the destination
  • receive - receives a single message from the destination
  • listen - registers a function to be called each time a message arrives at the destination

If the destination is a queue, we can do synchronous messaging (request-response):

  • respond - registers a function that receives each request, and the returned value will be sent back to the requester
  • request - sends a message to the responder

Finally, to deregister listeners, responders, and destinations, we provide a single stop function. This is another difference from 1.x - the unlisten and stop functions have been collapsed to stop.

Some Examples

The following code fragments were tested against 2.x.incremental.133. You should follow the instructions in the getting started post to set up a project using Immutant 2.x, and add [org.immutant/messaging "2.x.incremental.133"] and [cheshire "5.3.1"] to the project dependencies (we'll be encoding some messages as JSON in our examples below, so we'll go ahead and add cheshire while we're at it). Then, fire up a REPL, and require the immutant.messaging namespace to follow along:

(require '[immutant.messaging :refer :all])

First, let's create a queue:

(queue "my-queue")

That will create the queue in the HornetQ broker for us. We'll need a reference to that queue to operate on it. Let's go ahead and store that reference in a var:

(def q (queue "my-queue"))

We can call queue any number of times - if the queue already exists, we're just grabbing a reference to it.

Now, let's register a listener on our queue. Let's just print every message we get:

(def listener (listen q println))

We can publish to that queue, and see that the listener gets called:

(publish q {:hi :there})

You'll notice that we're publishing a map there - we can publish pretty much any data structure as a message. By default, that message will be encoded using edn. We also support other encodings, namely: :clojure, :fressian, :json, and :text. We can choose a different encoding by passing an :encoding option to publish:

(publish q {:hi :there} :encoding :json)

Out of the box, we provide full support for the :clojure, :edn, and :text encodings. If you want to use :fressian or :json, you'll need to add org.clojure/data.fressian or cheshire to your dependencies to enable them, respectively.

We passed our options to publish as keyword arguments, but they can also be passed as a map:

(publish q {:hi :there} {:encoding :json})

This holds true for any of the messaging functions that take options.

We're also passing the destination reference to publish instead of the destination name. That's a departure from 1.x, where you could just pass the destination name. Since we no longer have conventions about how queues and topics should be named, we can no longer determine the type of the destination from the name alone.

We can deregister the listener by either passing it to stop or calling .close on it:

(stop listener)
;; identical to
(.close listener)

Now let's take a look at synchronous messaging. Let's create a new queue for this (you'll want to use a dedicated queue for each responder) and register a responder that just increments the request:

(def sync-q (queue "sync"))

(def responder (respond sync-q inc))

Then, we make a request, which returns a Future that we can dereference:

@(request sync-q 1)

The responder is just a fancy listener, and can be deregistered the same way as a listener.

That's not all...

That was just a brief introduction to the messaging API. There are features we've yet to cover (durable topic subscriptions, connection/session sharing, transactional sessions, remote connections), but it's getting late, so we'll save those for another time.

Try it out!

As always, we'd love to incorporate your feedback. Find us via our community page and join the fun!

Thanks to John Lillis for the image, used under CC BY-NC-ND

Schedule Some Time with The Deuce


In this installment of our series on getting started with Immutant 2, we'll take a detailed look at the API of our library for scheduling jobs, and show a few examples of usage.

If you're coming from Immutant 1.x, you'll notice that the namespace and artifact have been renamed (what used to be and org.immutant/immutant-jobs is now immutant.scheduling and org.immutant/scheduling), and the API has changed a bit. It's still based on Quartz 2.2, though.


At first glance, the API for immutant.scheduling appears bigger than it really is, but there are only two essential functions:

  • schedule - for scheduling your jobs
  • stop - for canceling them

The remainder of the namespace is syntactic sugar: functions that can be composed to create the specification for when your job should run.

Your "job" will take the form of a plain ol' Clojure function taking no arguments. The schedule function takes your job and a specification map as arguments. The map determines when your function gets called. It may contain any of the following keys:

  • :in - a period after which your function will be called
  • :at - an instant in time after which your function will be called
  • :every - the period between calls
  • :until - stops the calls at a specific time
  • :limit - limits the calls to a specific count
  • :cron - calls your function according to a Quartz-style cron spec

For each key there is a corresponding "sugar function". We'll see those in the examples below.

Units for periods (:in and :every) are milliseconds, but can also be represented as a keyword or a vector of multiplier/keyword pairs, e.g. [1 :week, 4 :days, 2 :hours, 30 :minutes, 59 :seconds]. Both singular and plural keywords are valid.

Time values (:at and :until) can be a java.util.Date, a long representing milliseconds-since-epoch, or a String in HH:mm format. The latter will be interpreted as the next occurence of HH:mm:00 in the currently active timezone.

Two additional options may be passed in the spec map:

  • :id - a unique identifier for the scheduled job
  • :singleton - a boolean denoting the job's behavior in a cluster [true]

In Immutant 1.x, a name for the job was required. In Immutant 2, the :id is optional, and if not provided, a UUID will be generated. If schedule is called with an :id for a job that has already been scheduled, the prior job will be replaced.

The return value from schedule is a map of the options with any missing defaults filled in, including a generated id if necessary. This result can be passed to stop to cancel the job.

Some Examples

The following code fragments were tested against 2.x.incremental.119. You should read through the getting started post and require the immutant.scheduling namespace at a REPL to follow along:

(require '[immutant.scheduling :refer :all])

We'll need a job to schedule. Here's one!

(defn job []
  (prn 'fire!))

Let's schedule it:

(schedule job)

That was pretty useless. Without a spec, the job will be immediately called asynchronously on one of the Quartz scheduler's threads. Instead, let's have it run in 5 minutes:

(schedule job (in 5 :minutes))

And maybe run again every second after that:

(schedule job
  (-> (in 5 :minutes)
    (every :second)))

But no more than 60 times:

(schedule job
  (-> (in 5 :minutes)
    (every :second)
    (limit 60)))

We could also anticipate getting stupid bored about halfway through, and schedule another job to cancel the first one:

(let [it (schedule job
           (-> (in 5 :minutes)
             (every :second)
             (limit 60)))]
  (schedule #(stop it) (in 5 :minutes, 30 :seconds)))

Of course, you can bring your own job id's if you like:

(schedule job (-> (id :purge) (every 30 :minutes)))
(schedule job (-> (id :purge) (every :hour)))  ; reschedule
(stop (id :purge))

If a job is successfully canceled, stop returns true.

It's Just Maps

Ultimately, the spec passed to schedule is just a map, and the sugar functions are just assoc'ing keys corresponding to their names. The map can be passed either explicitly or via keyword arguments, so all of the following are equivalent:

(schedule job (-> (in 5 :minutes) (every :day)))
(schedule job {:in [5 :minutes], :every :day})
(schedule job :in [5 :minutes], :every :day)

Supports Joda clj-time

If you're using the clj-time library in your project, you can load the immutant.scheduling.joda namespace. This will extend org.joda.time.DateTime instances to the AsTime protocol, enabling them to be used as arguments to at and until, e.g.

(require '[clj-time.core :refer [today-at plus hours]])

(let [t (today-at 9 00)]
  (schedule job
    (-> (at t)
      (every 2 :hours)
      (until (plus t (hours 8))))))

It also provides the function, schedule-seq. Inspired by chime-at, it takes not a specification map but a sequence of times, as might be returned from clj-time.periodic/periodic-seq, subject to the application of any of Clojure's core sequence-manipulating functions.

When defining complex recurring schedules, this presents an interesting alternative to traditional cron specs. For example, consider a job that must run at 10am every weekday. Here's how we'd schedule that with a Quartz-style cron spec:

(schedule job (cron "0 0 10 ? * MON-FRI"))

And here's the same schedule using a lazy sequence:

(require '[immutant.scheduling.joda :refer [schedule-seq]]
         '[clj-time.core            :refer [today-at days]]
         '[clj-time.periodic        :refer [periodic-seq]]
         '[clj-time.predicates      :refer [weekday?]])

(schedule-seq job
  (->> (periodic-seq (today-at 10 0) (days 1))
    (filter weekday?)))

So each has trade-offs, of course. The cron spec is more concise, but also arguably more error-prone, e.g. what is that ? for!?

One very cool thing about the sequence is that I can test it without actually scheduling it. On the other hand, my cron spec test is going to take more than a week to run! ;)

Try it out!

As always, we'd love to incorporate your feedback. Find us via our community page and join the fun!

Thanks to Phil Hart for the image, used under CC BY-NC-SA

Untangling the Web in The Deuce

Our org.immutant/web library has changed quite a bit in Immutant 2, both its API and its foundation: the Undertow web server. Among other things, this gives us much better performance (~35% more throughput than v1.1.1) and built-in support for websockets.

We've given a lot of thought to the API, specifically argument names, types, order, and return values. We're reasonably happy with what we have at this point, but still very much open to suggestions for improvements.

The Web API

The API for immutant.web is small, just two functions and a convenient macro:

  • run - runs your handler in a specific environment, responding to web requests matching a given host, port, and path. The handler may be either a Ring function, Servlet instance, or Undertow HttpHandler.
  • run-dmc - runs your handler in Development Mode (the 'C' is silent)
  • stop - stops your handler[s]

The following code fragments were tested against 2.x.incremental.96. You should read through the getting started post and require the immutant.web namespace at a REPL to follow along:

(require '[immutant.web :refer :all])

Common Usage

First, we'll need a Ring handler. Yours is probably fancier, but this one will do:

(defn app [request]
  {:status 200
   :body "Hello world!"})

To make the app available at http://localhost:8080/, do this:

(run app)

Which, if we make the default values explicit, is equivalent to this:

(run app {:host "localhost" :port 8080 :path "/"})

Or, since run takes options as either an explicit map or keyword arguments (kwargs), this:

(run app :host "localhost" :port 8080 :path "/")

The options passed to run determine the URL used to invoke your handler: http://{host}:{port}{path}

To replace your app handler with another, just call run again with the same options, and it'll replace the old handler with the new:

(run (fn [_] {:status 200 :body "hi!"}))

To stop the handler, do this:


Which is equivalent to this:

(stop {:host "localhost" :port 8080 :path "/"})

Or, if you prefer kwargs, this:

(stop :host "localhost" :port 8080 :path "/")

Alternatively, you can save run's return value and pass it to stop to stop your handler.

(def server (run app {:port 4242 :path "/hello"}))
(stop server)

That's pretty much all there is to it.

You don't even really need to stop your handlers if you're content to just let the JVM exit, but it can be handy at a REPL.

Advanced Usage

The run function returns a map that includes the options passed to it, so you can thread run calls together, useful when your application runs multiple handlers. For example,

(def everything (-> (run hello)
                  (assoc :path "/howdy")
                  (->> (run howdy))
                  (merge {:path "/" :port 8081})
                  (->> (run ola))))

The above actually creates two Undertow web server instances: one serving requests for the hello and howdy handlers on port 8080, and one serving ola responses on port 8081.

You can stop all three apps (and shutdown the two web servers) like so:

(stop everything)

Alternatively, you could stop only the ola app like so:

(stop {:path "/" :port 8081})

You could even omit :path since "/" is the default. And because ola was the only app running on the web server listening on port 8081, it will be shutdown automatically.

Handler Types

Though the handlers you run will typically be Ring functions, you can also pass any valid implementation of javax.servlet.Servlet or io.undertow.server.HttpHandler. For an example of the former, here's a very simple Pedestal service running on Immutant:

(ns testing.hello.service
  (:require [io.pedestal.service.http :as http]
            [io.pedestal.service.http.route.definition :refer [defroutes]]
            [ring.util.response :refer [response]]
            [immutant.web :refer [run]]))

(defn home-page [request] (response "Hello World!"))
(defroutes routes [[["/" {:get home-page}]]])
(def service {::http/routes routes})

(defn start [options]
  (run (::http/servlet (http/create-servlet service)) options))

Development Mode

The run-dmc macro resulted from a desire to provide a no-fuss way to enjoy all the benefits of REPL-based development. Before calling run, run-dmc will first ensure that your Ring handler is var-quoted and wrapped in the reload and stacktrace middleware from the ring-devel library (which must be included among your [:profiles :dev :dependencies] in project.clj). It'll then open your app in a browser.

Both run and run-dmc accept the same options. You can even mix them within a single threaded call.

The Websocket API

Also included in the org.immutant/web library is the immutant.websocket namespace, which includes a Channel protocol and the create-handler function. It accepts a map of callback functions, invoked asynchronously during the lifecycle of a websocket. The valid websocket event keywords and their corresponding callback signatures are as follows:

  :on-message (fn [channel message])
  :on-open    (fn [channel])
  :on-close   (fn [channel {:keys [code reason]}])
  :on-error   (fn [channel throwable])
  :fallback   (fn [request] (response ...))

To create your websocket endpoint, pass the result from create-handler to immutant.web/run. Here's an example that asynchronously returns the upper-cased equivalent of whatever message it receives:

(ns whatever
  (:require [immutant.web :as web]
            [immutant.websocket :as ws]
            [clojure.string :refer [upper-case]]))

(defn create-websocket []
    (ws/create-handler {:on-message (fn [c m] (ws/send! c (upper-case m)))})
    {:path "/websocket"}))

Another function, immutant.websocket/create-servlet, can be used to create a JSR 356 Endpoint. The channel passed to the callbacks is an instance of javax.websocket.Session, extended to the immutant.websocket.Channel protocol.

Try it out!

We'd love to hear some feedback on this stuff. Find us on our community page and join the fun!

Getting Started With Immutant 2

We're hard at work on Immutant 2.0, and have a ways to go before we reach feature parity with 1.x. But if you are interested in playing with what we have now, then this article is for you!

Getting Immutant

Our CI server publishes an incremental release for each successful build. In order to use an incremental build, you'll need to add a repository and dependencies to your project.clj. Currently, only our scheduling and web artifacts contain implementations, so let's add those:

(defproject some-project "1.2.3"
  :dependencies [...
                 [org.immutant/scheduling "2.x.incremental.BUILD_NUM"]
                 [org.immutant/web        "2.x.incremental.BUILD_NUM"]]
  :repositories [["Immutant 2.x incremental builds"

replacing BUILD_NUM with the build number for the version you want to use. You can get the build number from our builds page - the latest build number is 62 as of this article.

Edit: build 62 has been garbage collected - be sure to grab the latest build from the builds page.

Note: We're bringing in the artifacts piecemeal above, but we also provide an aggregate artifact that brings in all of the Immutant dependencies in one shot if you'd rather use that: org.immutant/immutant.

That's it! If you are used to Immutant 1.x, you'll notice that there is no download or install step - Immutant 2 is usable as a set of libraries that you consume just like any other Clojure library.

What's in those artifacts?

Along with the artifacts, each CI build publishes the API docs for all of the Immutant namespaces. You can see the docs for a specific build (build #62 in this case), or the latest build.

If you start playing with any of that API, be aware that it is currently in a pre-alpha state, and may change dramatically at any time.

What's next?

We plan on publishing articles in the near future focusing on using the web and scheduling namespaces, followed by articles on websockets, messaging, and caching as soon as we get those implementations off of the ground. We'll also cover using those same namespaces within a WildFly container, taking advantage of the clustering features the container provides.

Once we have most of our namespaces in semi-decent shape, we'll release our first alpha, probably in mid to late June.

As always, if you have any questions, comments, or other feedback, feel free to get in touch.

Clustering Immutants on OpenShift

Lately I've been spending a lot of time on OpenShift, building and testing a cartridge for Immutant that will properly form a cluster across multiple OpenShift gears. In this post, I'll go through the steps of deploying a simple application that demonstrates all of the Immutant clustering features running on the three small gears you get for free on OpenShift.

Here are the features we'll be demonstrating:

  • Load-balanced message distribution with automatic peer discovery
  • Replicated caching
  • Highly-Available, long-running daemons
  • HA scheduled jobs
  • Web load balancing and session replication

If you haven't already, go set up an OpenShift account and update your rhc gem to the latest version. I used 1.12.4 for this article. Below you'll see references to $namespace -- this corresponds to your OpenShift domain name, set by running rhc setup.

Note: If this is the first time you've used OpenShift, you'll need to visit the console and accept the usage agreement before running the rhc command.

Create a scaled OpenShift app

The Immutant cartridge is available here: As described in its README, we create our app using the following command:

rhc app-create -s demo

We're calling our app demo and we're passing the -s option to make our app scalable. Notice that we're passing a raw URL to the cartridge's manifest.yml.

Small gears are pretty slow, but when app-create finally completes, you'll have a bare-bones, standard Leiningen application beneath the demo/ directory. At this point, you might tail your app's logs or ssh into your gear:

rhc tail demo
rhc ssh demo

The critical log file for Immutant on OpenShift is immutant/logs/server.log. Monitor this file until you eventually see the line, Deployed "your-clojure-application.clj". Then point a browser at http://demo-$ to see a simple welcome page.

Now we'll put some meat on our bare-bones app!

Push Me, Pull You

Typically, you will add the remote git repository for your real application to the local OpenShift repository you just created. We're going to use as our "real" application.

git remote add upstream -m master

Deployment of your app to OpenShift amounts to pulling from your real repository and pushing to OpenShift's.

git pull -s recursive -X theirs upstream master
git push

While waiting for that to complete, run rhc tail demo in another shell to monitor your log. This time, the Deployed "your-clojure-application.clj" message is going to scroll off the screen as the cluster-demo app starts logging its output. Eventually, the app should settle into a steady state looking something like this:

The cluster-demo app

If you can ignore the inconsistent thread identifiers in the above output, you'll notice there are exactly four types of messages: send, recv, web, and job. Noting the timestamps in the left column, a send is logged every 5 seconds, as is its corresponding recv, a web logged every 2 seconds, and a job every 20 seconds.

The cluster-demo app is comprised of the following:

  • A message queue named /queue/msg
  • A distributed cache named counters
  • A listener for the queue that prints the received message and the current contents of the cache
  • An HA daemon named counter that queues a cached value and increments it every 5 seconds
  • An HA scheduled job named ajob that increments another counter in the cache every 20 seconds
  • A web request handler mounted at / that logs its :path-info and returns the current values of the two cached counters
  • Another request handler mounted at /count that increments a counter in the user's web session.

All the code (~60 lines) is contained in a single file.

Programming is hard, let's build a cluster!

Now we're ready to form a cluster by adding a gear to our app:

rhc scale-cartridge immutant -a demo 2

Again, this will take a few minutes, and it may return an error even though the operation actually succeeded. You can run the following to see the definitive state of your gears:

rhc show-app --gears

This also gives you the SSH URLs for your two gears. Fire up two shells and ssh into each of your gears using those SSH URLs. Then tail the log on each:

tail -f immutant/logs/server.log

When the dust settles, you'll eventually see the gears discover each other, and you should see both gears logging recv messages, one getting the even numbers and one getting the odd. This is your automatic load-balanced message distribution.

Note also that the counters cache logged in the recv message is correct on both gears, even though it's only being updated by one. This is our cache replication at work.

Let's break stuff!

And see how robust our cluster is.

High Availability Daemons and Jobs

Of course, the send and job log entries should still only appear on our original gear, because those are our HA singletons. If that gear crashes, our daemon and job should migrate to the other gear. While logged into the gear running your singletons, run this:

immutant/bin/control stop

And watch the other gear's log to verify the daemon and job pick up right where they left off, fetching their counters from the replicated cache. That gear should be consuming all the queued messages, too. Now start the original gear back up:

immutant/bin/control start

Eventually, it'll start receiving half the messages again.


You may be wondering about those web entries showing up in both logs. They are "health check" requests from the HAProxy web load balancer, automatically installed on your primary gear. You can always check the state of your cluster from HAProxy's perspective by visiting http://demo-$ If you see that page without intending to, it means something about your app is broken, so check immutant/logs/server.log for errors and make sure your app responds to a request for the root context, i.e. "/".

Let's try some web stuff. Use curl to hit your app while observing the logs on both gears:

curl http://demo-$
curl http://demo-$
curl http://demo-$

Use an obnoxious path to distinguish your request from the health checks. Repeat the command a few times to observe the gears taking turns responding to your request. Now try it in a browser, and you'll see the same gear handling your request every time you reload. This is because HAProxy is setting cookies in the response to enable session affinity, which your browser is probably sending back. And curl didn't.

Speaking of session affinity, let's break that while we're at it, by invoking our other web handler, the one that increments a counter in the user's web session: http://demo-$

You should see the counter increment each time you reload your browser. (You'll need to give curl a cookie store to see it respond with anything other than "1 times")

Pay attention to which gear is responding to the /count request. Now stop that gear like you did before. When you reload your browser, you should see the other gear return the expected value. This is the automatic session replication provided by immutant.web.session/servlet-store.

Don't forget to restart that gear.

The Hat Trick

Hey, OpenShift is giving us 3 free gears, we may as well use 'em all, right?

rhc scale-cartridge immutant -a demo 3

When the third one finally comes up, there are a couple of things you may notice:

  • The health checks will disappear from the primary gear as HAProxy takes it out of the rotation when 2 or more other gears are available, ostensibly to mitigate the observer effect of the health checks.
  • Each cache key will only show up in the recv log messages on 2 of the 3 gears. This is because Immutant caches default to Infinispan's :distributed replication mode in a cluster. This enables Infinispan clusters to achieve "linear scalability" as entries are copied to a fixed number of cluster nodes (default 2) regardless of the cluster size. Distribution uses a consistent hashing algorithm to determine which nodes will store a given entry.

Now what?

Well, that was a lot to cover. I doubt many apps will use all these features, but I think it's nice to have a free playground on which to try them out, even with the resources as constrained as they are on a small gear.

Regardless, I'm pretty happy that Immutant is finally feature-complete on OpenShift now. :-)

Of course, I had a lot of help getting things to this point. Many folks on the OpenShift and JBoss teams were generous with their expertise, but the "three B's" deserve special mention: Ben, Bela, and Bill.