WebSockets and SockJS with Immutant and Vert.x - Part 2

This is a followup to our post last week on WebSockets with Vert.x. If you haven't read it, you should do so now. In that post, we set up a simple echo service in Vert.x that bridged the Vert.x EventBus to the browser. But that echo service wasn't very useful - there was no way to process incoming messages outside of the daemon, and no way to send messages down to the browser client from other parts of the application. Today, we're going to look at bridging the EventBus over to Immutant messaging, allowing us to actually interact with the client from anywhere within our application.

Our application

We'll be using the same application we used in the last post, but will be working off of a branch.

To get started, clone the app and run it:1

cd /path/to/simple-immutant-vertx-demo
git checkout with-messaging
lein do immutant deploy, immutant run

Then browse to http://localhost:8080/. You should see a UI that lets you send messages and see those messages echoed back, but now they come back in uppercase:

[UI activity]

Let's see some code!

Most of the application remains the same as it did before. But instead of just copying messages from the request address to the response address, we've now wired our demo.bridge namespace to the Immutant messaging system. We now have functions that bridge EventBus addresses to Immutant messaging destinations, and vice-versa, and have modified the init-bridge function to map the appropriate addresses and destinations:

(ns demo.bridge
  (:require [vertx.embed :as vembed :refer [with-vertx]]
            [vertx.http :as http]
            [vertx.http.sockjs :as sockjs]
            [vertx.eventbus :as eb]
            [immutant.messaging :as msg]))

(defn dest->eventbus
  "Sets up a bridge to copy messages from an Immutant messaging dest to a Vertx address."
  [vertx dest address]
  (msg/listen dest #(with-vertx vertx
                      (eb/publish address %))))

(defn eventbus->dest
  "Sets up a bridge to copy messages from a Vertx address to an Immutant messaging dest."
  [vertx address dest]
  (with-vertx vertx
    (eb/on-message address (partial msg/publish dest))))

(defn- start-sockjs-bridge
  "Creates a Vert.x http server, a sockjs server within that http
  server, then installs an eventbus bridge in the sockjs server."
  [vertx host port path]
  (println (format "Starting SockJS bridge at http://%s:%s%s" host port path))
  (with-vertx vertx
    (let [server (http/server)]
      (-> server
          (sockjs/bridge {:prefix path} [{}] [{}]))
      (http/listen server port host))))

(defn init-bridge
  "Initializes the embedded vertx instance, bridges to Immutant destinations, and starts the sockjs bridge."
  [{:keys [request-dest response-dest]}]
  (let [vertx (vembed/vertx)]
    (eventbus->dest vertx "demo.request" request-dest)
    (dest->eventbus vertx response-dest "demo.response")
    {:vertx vertx
     :server (start-sockjs-bridge vertx "localhost" 8081 "/eventbus")}))

Now that demo.bridge no longer echos, but instead expects something on the other end of the request-dest, we need something listening on the other end to do the work. We've added this to the demo.init namespace, which is also where we define the request/response destination names. Our listener here just watches queue.request, uppercases each message, and publishes it to topic.response. Since we have bridged those same destinations in demo.bridge, we again have a completed circle from the client and back:

(ns demo.init
  (:require [demo.web :as web]
            [demo.daemon :as daemon]
            [immutant.messaging :as msg]))

(def config {:response-dest "topic.response"
             :request-dest "queue.request"
             :process-fn (memfn toUpperCase)})

(defn init []
  (let [{:keys [request-dest response-dest process-fn]} config]
    (msg/start request-dest)
    (msg/start response-dest)
    (msg/listen request-dest
                #(msg/publish response-dest (process-fn %))))
  (daemon/start config))

Touch the UI from anywhere

Now that we've bridged the EventBus to the Immutant messaging system, we can interact with our client UI from anywhere within our application. Just to beat the horse a bit more, let's do it from the repl. Connect to the nREPL endpoint in the application running on port 53092 using your favorite client, then try sending messages directly to the response topic, or to the request queue to have them uppercased first:

user> (require '[immutant.messaging :as msg])
user> (msg/publish "topic.response" "ahoyhoy")
#<HornetQTextMessage HornetQMessage[ID:8af51642-2478-11e3-9deb-25745b71356d]:PERSISTENT>
user> (msg/publish "queue.request" "ahoyhoy")
#<HornetQTextMessage HornetQMessage[ID:90e4b5b8-2478-11e3-9deb-25745b71356d]:PERSISTENT>

You can also send structured messages:

user> (msg/publish "topic.response" {:x :y})
#<HornetQTextMessage HornetQMessage[ID:e09bf794-2478-11e3-9deb-25745b71356d]:PERSISTENT>

And see them all displayed in the client UI:

[repl UI activity]

Fare thee well

We've extended our prior example to make it actually useful, and maintained a separation of concerns within our application - code outside of the demo.bridge namespace has no knowledge of Vert.x, nor of the UI communication mechanism. We think this provides a compelling way to provide dynamic updates to the browser, but if you don't, or have any other questions, comments, or feedback, please get in touch.

  1. ^ This assumes you have a recent Immutant installed.
  2. ^ The demo application specifies 5309 as the :nrepl-port in its project.clj. If you have :immutant {:nrepl-port some-port} set in your ~/.lein/profiles.clj, that will override 5309 and you'll need to connect to whatever port the endpoint is bound to.

Clustering Immutants on OpenShift

Lately I've been spending a lot of time on OpenShift, building and testing a cartridge for Immutant that will properly form a cluster across multiple OpenShift gears. In this post, I'll go through the steps of deploying a simple application that demonstrates all of the Immutant clustering features running on the three small gears you get for free on OpenShift.

Here are the features we'll be demonstrating:

  • Load-balanced message distribution with automatic peer discovery
  • Replicated caching
  • Highly-Available, long-running daemons
  • HA scheduled jobs
  • Web load balancing and session replication

If you haven't already, go set up an OpenShift account and update your rhc gem to the latest version. I used 1.12.4 for this article. Below you'll see references to $namespace -- this corresponds to your OpenShift domain name, set by running rhc setup.

Note: If this is the first time you've used OpenShift, you'll need to visit the console and accept the usage agreement before running the rhc command.

Create a scaled OpenShift app

The Immutant cartridge is available here: https://github.com/immutant/openshift-immutant-cart. As described in its README, we create our app using the following command:

rhc app-create -s demo https://raw.github.com/immutant/openshift-immutant-cart/master/metadata/manifest.yml

We're calling our app demo and we're passing the -s option to make our app scalable. Notice that we're passing a raw URL to the cartridge's manifest.yml.

Small gears are pretty slow, but when app-create finally completes, you'll have a bare-bones, standard Leiningen application beneath the demo/ directory. At this point, you might tail your app's logs or ssh into your gear:

rhc tail demo
rhc ssh demo

The critical log file for Immutant on OpenShift is immutant/logs/server.log. Monitor this file until you eventually see the line, Deployed "your-clojure-application.clj". Then point a browser at http://demo-$namespace.rhcloud.com to see a simple welcome page.

Now we'll put some meat on our bare-bones app!

Push Me, Pull You

Typically, you will add the remote git repository for your real application to the local OpenShift repository you just created. We're going to use https://github.com/immutant/cluster-demo as our "real" application.

git remote add upstream -m master git@github.com:immutant/cluster-demo.git

Deployment of your app to OpenShift amounts to pulling from your real repository and pushing to OpenShift's.

git pull -s recursive -X theirs upstream master
git push

While waiting for that to complete, run rhc tail demo in another shell to monitor your log. This time, the Deployed "your-clojure-application.clj" message is going to scroll off the screen as the cluster-demo app starts logging its output. Eventually, the app should settle into a steady state looking something like this:

The cluster-demo app

If you can ignore the inconsistent thread identifiers in the above output, you'll notice there are exactly four types of messages: send, recv, web, and job. Noting the timestamps in the left column, a send is logged every 5 seconds, as is its corresponding recv, a web logged every 2 seconds, and a job every 20 seconds.

The cluster-demo app is comprised of the following:

  • A message queue named /queue/msg
  • A distributed cache named counters
  • A listener for the queue that prints the received message and the current contents of the cache
  • An HA daemon named counter that queues a cached value and increments it every 5 seconds
  • An HA scheduled job named ajob that increments another counter in the cache every 20 seconds
  • A web request handler mounted at / that logs its :path-info and returns the current values of the two cached counters
  • Another request handler mounted at /count that increments a counter in the user's web session.

All the code (~60 lines) is contained in a single file.

Programming is hard, let's build a cluster!

Now we're ready to form a cluster by adding a gear to our app:

rhc scale-cartridge immutant -a demo 2

Again, this will take a few minutes, and it may return an error even though the operation actually succeeded. You can run the following to see the definitive state of your gears:

rhc show-app --gears

This also gives you the SSH URLs for your two gears. Fire up two shells and ssh into each of your gears using those SSH URLs. Then tail the log on each:

tail -f immutant/logs/server.log

When the dust settles, you'll eventually see the gears discover each other, and you should see both gears logging recv messages, one getting the even numbers and one getting the odd. This is your automatic load-balanced message distribution.

Note also that the counters cache logged in the recv message is correct on both gears, even though it's only being updated by one. This is our cache replication at work.

Let's break stuff!

And see how robust our cluster is.

High Availability Daemons and Jobs

Of course, the send and job log entries should still only appear on our original gear, because those are our HA singletons. If that gear crashes, our daemon and job should migrate to the other gear. While logged into the gear running your singletons, run this:

immutant/bin/control stop

And watch the other gear's log to verify the daemon and job pick up right where they left off, fetching their counters from the replicated cache. That gear should be consuming all the queued messages, too. Now start the original gear back up:

immutant/bin/control start

Eventually, it'll start receiving half the messages again.


You may be wondering about those web entries showing up in both logs. They are "health check" requests from the HAProxy web load balancer, automatically installed on your primary gear. You can always check the state of your cluster from HAProxy's perspective by visiting http://demo-$namespace.rhcloud.com/haproxy-status. If you see that page without intending to, it means something about your app is broken, so check immutant/logs/server.log for errors and make sure your app responds to a request for the root context, i.e. "/".

Let's try some web stuff. Use curl to hit your app while observing the logs on both gears:

curl http://demo-$namespace.rhcloud.com/xxxxxxxxxxxxxxxxxxxx
curl http://demo-$namespace.rhcloud.com/yyyyyyyyyyyyyyyyyyyy
curl http://demo-$namespace.rhcloud.com/zzzzzzzzzzzzzzzzzzzz

Use an obnoxious path to distinguish your request from the health checks. Repeat the command a few times to observe the gears taking turns responding to your request. Now try it in a browser, and you'll see the same gear handling your request every time you reload. This is because HAProxy is setting cookies in the response to enable session affinity, which your browser is probably sending back. And curl didn't.

Speaking of session affinity, let's break that while we're at it, by invoking our other web handler, the one that increments a counter in the user's web session: http://demo-$namespace.rhcloud.com/count

You should see the counter increment each time you reload your browser. (You'll need to give curl a cookie store to see it respond with anything other than "1 times")

Pay attention to which gear is responding to the /count request. Now stop that gear like you did before. When you reload your browser, you should see the other gear return the expected value. This is the automatic session replication provided by immutant.web.session/servlet-store.

Don't forget to restart that gear.

The Hat Trick

Hey, OpenShift is giving us 3 free gears, we may as well use 'em all, right?

rhc scale-cartridge immutant -a demo 3

When the third one finally comes up, there are a couple of things you may notice:

  • The health checks will disappear from the primary gear as HAProxy takes it out of the rotation when 2 or more other gears are available, ostensibly to mitigate the observer effect of the health checks.
  • Each cache key will only show up in the recv log messages on 2 of the 3 gears. This is because Immutant caches default to Infinispan's :distributed replication mode in a cluster. This enables Infinispan clusters to achieve "linear scalability" as entries are copied to a fixed number of cluster nodes (default 2) regardless of the cluster size. Distribution uses a consistent hashing algorithm to determine which nodes will store a given entry.

Now what?

Well, that was a lot to cover. I doubt many apps will use all these features, but I think it's nice to have a free playground on which to try them out, even with the resources as constrained as they are on a small gear.

Regardless, I'm pretty happy that Immutant is finally feature-complete on OpenShift now. :-)

Of course, I had a lot of help getting things to this point. Many folks on the OpenShift and JBoss teams were generous with their expertise, but the "three B's" deserve special mention: Ben, Bela, and Bill.


Overlay Screencast

I put together a quick screencast showing how to overlay the latest incremental releases of both Immutant and TorqueBox into a single app server capable of deploying both Clojure and Ruby apps.

Further, I show how those Clojure and Ruby apps can seamlessly exchange messages and share caches using the respective messaging and caching libraries from both Immutant and TorqueBox.


Introducing Immutant Pipelines


Happy new year! We'd like to celebrate by announcing a new Immutant feature: pipelines. Pipelines are compositions of functions, where the functions are executed in individual threads (or thread pools), potentially on different nodes in an Immutant cluster. They are built on top of our messaging subsystem, and were inspired by lamina's pipelines.


We'll walk through a simple (and simplistic) example to demonstrate using a pipeline.


The first thing we have to do is create a pipeline. We do that with a call to the pipeline function, giving it a name and some single-arity functions that form the steps of the pipeline:

(require '[immutant.pipeline :as pl])

(defonce reverse-pl
  (pl/pipeline :reverse-a-string
    (partial apply str)))

This looks similar to a 'thread last' (->>), or a comp in reverse. But for the functions we're using in this sample pipeline, let's pretend that each of them are more computation and time intensive than they actually are, and could benefit from being scaled across threads or across a cluster.

Putting data onto a pipeline

So, moving right along. We now have a pipeline, but how do we put data on it? The call to pipeline returns a function (we'll call it a pipeline-fn) that places data onto the head of the pipeline. Let's use it:

(let [result (reverse-pl "ham")]
  (deref result 1000 nil)) ;; => "mah"

What's with the deref? The pipeline execution is asynchronous - the pipeline-fn places the data onto the head of the pipeline, and immediately returns a delay. We dereference the delay to synchronize on the end of the pipeline, giving us the result. We're careful to use the deref that takes a timeout - it's possible for errors to occur during the pipeline execution, so we may never get a response (we'll talk more about error handling in a bit).

Raising the concurrency

By default, each step gets assigned one thread (per cluster node) to handle its work. If our pipeline processes data at a rate that is slower than the rate of insertion at the head of the pipeline, we can increase the number of threads for each step with the :concurrency option (options are passed as keyword arguments after the list of functions). Let's alter our original pipeline definition to do that:

(defonce reverse-pl
  (pl/pipeline :reverse-a-string
    (partial apply str)
    :concurrency 5)) ;; 5 threads per step

But what if we have one step that is slower than the rest? Let's assume that reverse is the slowpoke here, and raise the :concurrency even higher for that step:

(defonce reverse-pl
  (pl/pipeline :reverse-a-string
    (pl/step reverse :concurrency 10) ;; 10 threads for this guy
    (partial apply str)
    :concurrency 5)) ;; 5 threads for each of the other steps

Here we've used the step function to attach options to a particular step. Options attached to steps will override the corresponding pipeline option where it makes sense.

Handling errors

Since pipelines are built on top of Immutant's message listeners, the default error handling is what the messaging system provides: if an exception occurs during the execution of a step, the delivery of the data to that step is rolled back, and will be retried up to ten times. If you need errors to be handled differently, you can provide an error handler function that must take two arguments: the exception, and the original argument passed to the step that threw the exception:

(pl/pipeline :do-something-on-the-network
    :error-handler (fn [ex v] 
                     (when (instance? ex SomeNetworkException)
                       (println "ERROR, skipping" pl/*current-step* ex)
                       (pl/*pipeline* v :step pl/*next-step*)))) ;; jump to the next step

Above we have a simple error handler that demonstrates putting a value back onto the pipeline, but skips the current step. We do that using a few vars that are bound during a pipeline execution:

If the error handler doesn't put the data back on to the pipeline, that particular pipeline execution is halted.

You can also specify an :error-handler for a particular step, which will override the pipeline error handler.

Let's see the above example again, but with a step-specific error handler that rethrows to trigger the default retry semantics:

(pl/pipeline :do-something-on-the-network
    (pl/step retrieve-a-url 
      :error-handler (fn [ex v] 
                       (if (instance? ex SomeNetworkException)
                         (println "ERROR retrieving url" v ", exiting:" ex) ;; exit the pipeline
                         (throw x)))) ;; retry
    :error-handler (fn [ex v] 
                     (when (instance? ex SomeNetworkException)
                       (println "ERROR, skipping" pl/*current-step* ex)
                       (pl/*pipeline* v :step pl/*next-step*))))

Pipelines within pipelines

Pipeline-fn's are just functions that happen to return a delay. To facilitate using pipelines within pipelines, any step result that can be dereferenced is, automatically:

(defonce rev-emb-pl 
  (pl/pipeline :reverse-and-embiggen
    (memfn .toUpperCase) 
    #(str \¡ % \!)))

(deref (rev-emb-pl "tiucsib") 1000 nil) ;; => "¡BISCUIT!"

Since it's possible for the result of a step to never arrive, you can control how long this automatic deref waits:

(defonce rev-emb-pl 
  (pl/pipeline :reverse-and-embiggen
    (memfn .toUpperCase) 
    #(str \¡ % \!)
    :step-deref-timeout 60000)) ;; in ms, default is 10 seconds

Like :concurrency and :error-handler, :step-deref-timeout can be overridden on a per-step basis.


Pipelines are currently available in the latest Immutant incremental builds, and will be part of 0.8.0, which should be released today.

We haven't covered everything about pipelines here, see the documentation for more details.

Pipeline support is an alpha feature at the moment, so its API is in flux - please give it a try and let us know how we can improve it.

Image credit: World Bank Photo Collection

Introducing Distributed XA Transaction Support

We're as happy as a bear in a koi pond to announce support for Distributed (XA) Transactions in Immutant.

Messaging and Caching resources in Immutant are now automatically transactional and XA capable. And we make it easy for you to create DataSources for your XA compliant SQL databases so that you can then define transactions incorporating all three types of resources in your Clojure applications.

Some Background

X/Open XA is a standard specification for allowing multiple, independent resources to participate in a single, distributed transaction using a two-phase commit (2PC) protocol.

Say your application stores data in more than one place, perhaps an Oracle database and a Postgres database. When a function in your application writes data to those two databases, XA can ensure that it doesn't leave your data in an inconsistent state when the Oracle database fails. ;-)

To accomplish this, the commit and rollback methods are invoked not on any single resource like a JDBC or JMS connection but on a TransactionManager instead. Its job is to coordinate the commit or rollback of each resource involved in a particular transaction.

Defining an XA Transaction

Let's start with an example:

  (ns xa.example
    (:require [immutant.xa :as xa]
              [immutant.cache :as cache]
              [immutant.messaging :as msg]
              [clojure.java.jdbc :as sql]))
  (defn do-three-things []
     (sql/with-connection {:datasource my-ds}
       (sql/insert-records :things {:name "foo"}))
     (cache/put my-cache :a 1)
     (msg/publish "/queue/test" "success!")))

The do-three-things function will insert a record into a SQL database, write an entry to a cache and publish a message to a queue, all within a single transaction. When it completes, either all of those things will have happened or none will, depending on whether an exception is tossed from the body passed to xa/transaction.

By the way, don't let that {:datasource my-ds} spec throw you just yet. We're going to discuss that in a minute.

So the xa/transaction macro starts a transaction, executes its body, and then commits the transaction unless an exception is caught, in which case the transaction is rolled back.

Transaction Scope

I lied a little. The xa/transaction macro is really just an alias for the required macro in the immutant.xa.transaction namespace, which is one of six macros matching the transaction attributes for JEE Container-Managed Persistence: required, requires-new, not-supported, supports, mandatory, and never. According to that spec, required is the default, so we alias it in the main immutant.xa namespace.

These macros allow you to control the scope of your transactions when your functions call each other. For example,

  (ns xa.example ...)
  (defn foo []

Here, we have one function, foo, defining a transaction that calls another function, do-three-things that, as you recall from above, seemingly defines another transaction. Or does it? In fact, the required macro won't start a new transaction if there's already one associated with the current thread. It'll simply include its body in the transaction started by the caller. If we really wanted a new transaction, we'd call requires-new inside do-three-things.

Here's another example:

  (ns xa.example
    (:require [immutant.xa.transaction :as tx]))
  (tx/required           ; begin tx #1
   (tx/not-supported     ; suspend tx #1
   (tx/requires-new      ; begin tx #2
    (three))             ; commit tx #2
   (throw (Exception.))) ; rollback tx #1

Here we have a function, one, running within a transaction that is suspended prior to calling the function two, that runs completely outside of any transaction, after which a second transaction is started before calling the function, three.

We then toss an exception (we could've also called tx/set-rollback-only) that causes everything we did in one to rollback. The exception does not affect what we did in two or three, however.

Incidentally, any exception tossed in two or three would also rollback the actions of one since all the macros re-throw whatever they catch.

Creating an XA DataSource

So now we'll discuss that {:datasource my-ds} spec from the first example.

To include your database in a distributed transaction, you need to create an XA DataSource for it. Do this using the immutant.xa/datasource function. It will expect the appropriate JDBC driver for your database to be available in the classpath, so you'll need to add one of the following to your Leiningen project.clj:

  (defproject foo "1.0.0-SNAPSHOT"
    :dependencies [[com.h2database/h2 "1.3.160"]              ; H2
                   [org.clojars.gukjoon/ojdbc "1.4"]          ; Oracle
                   [org.clojars.kjw/mysql-connector "5.1.11"] ; MySQL
                   [postgresql "9.0-801.jdbc4"]               ; Postgres
                   [net.sourceforge.jtds/jtds "1.2.4"]        ; MS SQL Server
                   [java.jdbc "0.2.2"]])

The comments on the right indicate the database types we currently support, and the versions above have been successfully tested on Immutant.

With the driver available, all that's left is to create the DataSource. Here are some examples from our integration tests:

  (defonce my-ds (xa/datasource "h2" {:adapter "h2" :database "mem:foo"}))
  (defonce my-ds (xa/datasource "oracle" 
                                {:adapter "oracle"
                                 :host "oracle.cpct4icp7nye.us-east-1.rds.amazonaws.com"
                                 :username "myuser"
                                 :password "mypassword"
                                 :database "mydb"}))
  (defonce my-ds (xa/datasource "mysql" 
                                {:adapter "mysql"
                                 :host "mysql.cpct4icp7nye.us-east-1.rds.amazonaws.com"
                                 :username "myuser"
                                 :password "mypassword"
                                 :database "mydb"}))
  (defonce my-ds (xa/datasource "postgres" 
                                {:adapter "postgresql"
                                 :username "myuser"
                                 :password "mypassword"
                                 :database "mydb"}))
  (defonce my-ds (xa/datasource "mssql" 
                                {:adapter "mssql"
                                 :host "mssql.cpct4icp7nye.us-east-1.rds.amazonaws.com"
                                 :username "myuser"
                                 :password "mypassword"
                                 :database "mydb"}))

To use one of those in a clojure.java.jdbc connection spec, you should associate it with the :datasource key, like so:

  (jdbc/with-connection {:datasource my-ds}
    (jdbc/create-table :things [:name "varchar(50)"]))

This should of course work with any Clojure SQL library built on clojure.java.jdbc, e.g. Korma, ClojureQL, Lobos, etc.

See the manual for more details.


XA is not for every application. It's useful when you have multiple JDBC backends or you need to synchronize your JDBC and JMS (HornetQ messaging) calls. Transactional data-grids (Infinispan caching) are also often handy, so we feel good about making all of these resources automatically transactional in Immutant, not to mention providing clean Clojure interfaces for them.

Distributed transactions are available now in our latest incremental builds and will of course be included in our upcoming 0.2.0 release of Immutant expected in the next week or so. As always, feel free to find us in the normal channels if you have any questions.