WebSockets and SockJS with Immutant and Vert.x - Part 2

This is a followup to our post last week on WebSockets with Vert.x. If you haven't read it, you should do so now. In that post, we set up a simple echo service in Vert.x that bridged the Vert.x EventBus to the browser. But that echo service wasn't very useful - there was no way to process incoming messages outside of the daemon, and no way to send messages down to the browser client from other parts of the application. Today, we're going to look at bridging the EventBus over to Immutant messaging, allowing us to actually interact with the client from anywhere within our application.

Our application

We'll be using the same application we used in the last post, but will be working off of a branch.

To get started, clone the app and run it:1

cd /path/to/simple-immutant-vertx-demo
git checkout with-messaging
lein do immutant deploy, immutant run

Then browse to http://localhost:8080/. You should see a UI that lets you send messages and see those messages echoed back, but now they come back in uppercase:

[UI activity]

Let's see some code!

Most of the application remains the same as it did before. But instead of just copying messages from the request address to the response address, we've now wired our demo.bridge namespace to the Immutant messaging system. We now have functions that bridge EventBus addresses to Immutant messaging destinations, and vice-versa, and have modified the init-bridge function to map the appropriate addresses and destinations:

(ns demo.bridge
  (:require [vertx.embed :as vembed :refer [with-vertx]]
            [vertx.http :as http]
            [vertx.http.sockjs :as sockjs]
            [vertx.eventbus :as eb]
            [immutant.messaging :as msg]))

(defn dest->eventbus
  "Sets up a bridge to copy messages from an Immutant messaging dest to a Vertx address."
  [vertx dest address]
  (msg/listen dest #(with-vertx vertx
                      (eb/publish address %))))

(defn eventbus->dest
  "Sets up a bridge to copy messages from a Vertx address to an Immutant messaging dest."
  [vertx address dest]
  (with-vertx vertx
    (eb/on-message address (partial msg/publish dest))))

(defn- start-sockjs-bridge
  "Creates a Vert.x http server, a sockjs server within that http
  server, then installs an eventbus bridge in the sockjs server."
  [vertx host port path]
  (println (format "Starting SockJS bridge at http://%s:%s%s" host port path))
  (with-vertx vertx
    (let [server (http/server)]
      (-> server
          (sockjs/sockjs-server)
          (sockjs/bridge {:prefix path} [{}] [{}]))
      (http/listen server port host))))

(defn init-bridge
  "Initializes the embedded vertx instance, bridges to Immutant destinations, and starts the sockjs bridge."
  [{:keys [request-dest response-dest]}]
  (let [vertx (vembed/vertx)]
    (eventbus->dest vertx "demo.request" request-dest)
    (dest->eventbus vertx response-dest "demo.response")
    {:vertx vertx
     :server (start-sockjs-bridge vertx "localhost" 8081 "/eventbus")}))

Now that demo.bridge no longer echos, but instead expects something on the other end of the request-dest, we need something listening on the other end to do the work. We've added this to the demo.init namespace, which is also where we define the request/response destination names. Our listener here just watches queue.request, uppercases each message, and publishes it to topic.response. Since we have bridged those same destinations in demo.bridge, we again have a completed circle from the client and back:

(ns demo.init
  (:require [demo.web :as web]
            [demo.daemon :as daemon]
            [immutant.messaging :as msg]))

(def config {:response-dest "topic.response"
             :request-dest "queue.request"
             :process-fn (memfn toUpperCase)})

(defn init []
  (let [{:keys [request-dest response-dest process-fn]} config]
    (msg/start request-dest)
    (msg/start response-dest)
    (msg/listen request-dest
                #(msg/publish response-dest (process-fn %))))
  (web/start)
  (daemon/start config))

Touch the UI from anywhere

Now that we've bridged the EventBus to the Immutant messaging system, we can interact with our client UI from anywhere within our application. Just to beat the horse a bit more, let's do it from the repl. Connect to the nREPL endpoint in the application running on port 53092 using your favorite client, then try sending messages directly to the response topic, or to the request queue to have them uppercased first:

user> (require '[immutant.messaging :as msg])
nil
user> (msg/publish "topic.response" "ahoyhoy")
#<HornetQTextMessage HornetQMessage[ID:8af51642-2478-11e3-9deb-25745b71356d]:PERSISTENT>
user> (msg/publish "queue.request" "ahoyhoy")
#<HornetQTextMessage HornetQMessage[ID:90e4b5b8-2478-11e3-9deb-25745b71356d]:PERSISTENT>
user> 

You can also send structured messages:

user> (msg/publish "topic.response" {:x :y})
#<HornetQTextMessage HornetQMessage[ID:e09bf794-2478-11e3-9deb-25745b71356d]:PERSISTENT>

And see them all displayed in the client UI:

[repl UI activity]

Fare thee well

We've extended our prior example to make it actually useful, and maintained a separation of concerns within our application - code outside of the demo.bridge namespace has no knowledge of Vert.x, nor of the UI communication mechanism. We think this provides a compelling way to provide dynamic updates to the browser, but if you don't, or have any other questions, comments, or feedback, please get in touch.


  1. ^ This assumes you have a recent Immutant installed.
  2. ^ The demo application specifies 5309 as the :nrepl-port in its project.clj. If you have :immutant {:nrepl-port some-port} set in your ~/.lein/profiles.clj, that will override 5309 and you'll need to connect to whatever port the endpoint is bound to.

WebSockets and SockJS with Immutant and Vert.x

Currently, Immutant doesn't provide any native WebSockets support. However, it is possible to use another WebSockets-capable server from within an Immutant daemon. There are quite a few well-known options in the Clojure ecosystem that we could use here, like Aleph, http-kit, or raw Jetty or Netty. Instead, we're going to use a relative newcomer to the Clojure space: Vert.x.

What is Vert.x?

Vert.x is an asynchronous polyglot application platform built on top of Netty that has been around for a while, but just recently gained Clojure support. It provides (among other things) its own message passing system (the EventBus), and provides a SockJS implementation that allows browser clients to participate as peers in the EventBus over WebSockets, falling back to other protocols as the browser and network topology dictate. SockJS and an EventBus that is bridged to the client abstracts away some of the complexity of managing dynamic updates to the browser, and is the primary reason we're using Vert.x over some of the alternatives mentioned above.

Vert.x includes a javascript EventBus client for use in client-side code that allows the browser to participate in the EventBus as a peer. The Vert.x Clojure language module includes a ClojureScript wrapper around that javascript client, which we'll use in a bit.

Our application

To demonstrate using the Vert.x EventBus bridge from Immutant, we're going to look at a simple application that embeds1 Vert.x into an Immutant daemon to provide an echo service over the EventBus.

To get started, clone the app and run it:2

cd /path/to/simple-immutant-vertx-demo
lein do immutant deploy, immutant run

Then browse to http://localhost:8080/. You should see a UI that lets you send messages and see those messages echoed back. If you're using a browser with a Network console, you should be able to see the the SockJS WebSockets traffic, like so:

[websocket activity]

Let's see some code!

First, let's take a look at the ClojureScript client. It's fairly standard Enfocus transformations, with EventBus calls mixed in:

(ns demo.client
  (:require [enfocus.core :as ef]
            [enfocus.events :as events]
            [vertx.client.eventbus :as eb]))

(defn open-eventbus
  "Opens a connection to the remote EventBus endpoint."
  [& on-open]
  (let [eb (eb/eventbus "http://localhost:8081/eventbus")]
    (eb/on-open eb #(.log js/console "eventbus opened"))
    (mapv #(eb/on-open eb (fn [] (% eb))) on-open)))

(defn append-content
  "Append the given content to the element specified by id"
  [id content]
  (ef/at id (ef/append (ef/html [:div content]))))

(defn send-message
  "Sends a message to the request address."
  [eb message]
  (eb/publish eb "demo.request" message))

(defn attach-listeners
  "Attaches listeners to both the the request and response addresses,
   displaying the received messages in the appropriate divs."
  [eb]
  (eb/on-message eb "demo.request" (partial append-content "#sent"))
  (eb/on-message eb "demo.response" (partial append-content "#rcvd")))

(defn attach-send-click
  "Attaches handler to send a message when the send button is clicked."
  [eb]
  (ef/at "#send-message"
         (events/listen :click
                        #(send-message eb (ef/from "#message"
                                                   (ef/get-prop :value))))))
(defn init []
  (open-eventbus
   attach-listeners
   attach-send-click))

(set! (.-onload js/window) init)

On the server side, we start up the SockJS EventBus bridge as an Immutant daemon in the demo.daemon namespace, and is standard Immutant daemon management code. The functions that actually do the work of setting up the bridge are in the demo.bridge namespace:3

(ns demo.bridge
  (:require [vertx.embed :as vembed :refer [with-vertx]]
            [vertx.http :as http]
            [vertx.http.sockjs :as sockjs]
            [vertx.eventbus :as eb]))

(defn- start-sockjs-bridge
  "Creates a Vert.x http server, a sockjs server within that http
  server, then installs an eventbus bridge in the sockjs server."
  [vertx host port path]
  (println (format "Starting SockJS bridge at http://%s:%s%s" host port path))
  (with-vertx vertx
    (let [server (http/server)]
      (-> server
          (sockjs/sockjs-server)
          (sockjs/bridge {:prefix path} [{}] [{}]))
      (http/listen server port host))))

(defn init-bridge
  "Initializes the embedded vertx instance, sets up our echo handler,
   and starts the sockjs bridge."
  []
  (let [vertx (vembed/vertx)]
    (with-vertx vertx
      (eb/on-message "demo.request"
                     (partial eb/publish "demo.response")))
    {:vertx vertx
     :server (start-sockjs-bridge vertx "localhost" 8081 "/eventbus")}))

A nice, neat little package?

So, what have we done here? We've added dynamic updates over WebSockets (with fallback) to an Immutant application, without having to handle the minutiae of bi-directional communication over WebSockets and any fallback protocols. And since Vert.x allows the browser client to be an equal peer in the EventBus, we were able to use a similar API on the server and client.

However, it's not all roses - there is a drawback to this approach: since Immutant doesn't support WebSockets natively, we can't share the http port and upgrade connections to WebSockets on request. This means that any WebSockets solution we run as a daemon has to bind to its own port.

This has been an exploration of one way to add simple dynamic interaction to an Immutant application, but is certainly not the only way. If you try this approach with another WebSockets server, let us know how it goes.

Watch for a post in the future that presents a more complex application that bridges the Vert.x EventBus to Immutant messaging destinations.

If you have any questions, comments, or feedback, please get in touch.


  1. ^ Vert.x provides its own application container, but we're using it embedded, which is an advanced usage.
  2. ^ This assumes you have a recent Immutant installed.
  3. ^ For this example, we're not securing the EventBus bridge at all. Doing so is probably a good idea.

Clustering Immutants on OpenShift

Lately I've been spending a lot of time on OpenShift, building and testing a cartridge for Immutant that will properly form a cluster across multiple OpenShift gears. In this post, I'll go through the steps of deploying a simple application that demonstrates all of the Immutant clustering features running on the three small gears you get for free on OpenShift.

Here are the features we'll be demonstrating:

  • Load-balanced message distribution with automatic peer discovery
  • Replicated caching
  • Highly-Available, long-running daemons
  • HA scheduled jobs
  • Web load balancing and session replication

If you haven't already, go set up an OpenShift account and update your rhc gem to the latest version. I used 1.12.4 for this article. Below you'll see references to $namespace -- this corresponds to your OpenShift domain name, set by running rhc setup.

Note: If this is the first time you've used OpenShift, you'll need to visit the console and accept the usage agreement before running the rhc command.

Create a scaled OpenShift app

The Immutant cartridge is available here: https://github.com/immutant/openshift-immutant-cart. As described in its README, we create our app using the following command:

rhc app-create -s demo https://raw.github.com/immutant/openshift-immutant-cart/master/metadata/manifest.yml

We're calling our app demo and we're passing the -s option to make our app scalable. Notice that we're passing a raw URL to the cartridge's manifest.yml.

Small gears are pretty slow, but when app-create finally completes, you'll have a bare-bones, standard Leiningen application beneath the demo/ directory. At this point, you might tail your app's logs or ssh into your gear:

rhc tail demo
rhc ssh demo

The critical log file for Immutant on OpenShift is immutant/logs/server.log. Monitor this file until you eventually see the line, Deployed "your-clojure-application.clj". Then point a browser at http://demo-$namespace.rhcloud.com to see a simple welcome page.

Now we'll put some meat on our bare-bones app!

Push Me, Pull You

Typically, you will add the remote git repository for your real application to the local OpenShift repository you just created. We're going to use https://github.com/immutant/cluster-demo as our "real" application.

git remote add upstream -m master git@github.com:immutant/cluster-demo.git

Deployment of your app to OpenShift amounts to pulling from your real repository and pushing to OpenShift's.

git pull -s recursive -X theirs upstream master
git push

While waiting for that to complete, run rhc tail demo in another shell to monitor your log. This time, the Deployed "your-clojure-application.clj" message is going to scroll off the screen as the cluster-demo app starts logging its output. Eventually, the app should settle into a steady state looking something like this:

The cluster-demo app

If you can ignore the inconsistent thread identifiers in the above output, you'll notice there are exactly four types of messages: send, recv, web, and job. Noting the timestamps in the left column, a send is logged every 5 seconds, as is its corresponding recv, a web logged every 2 seconds, and a job every 20 seconds.

The cluster-demo app is comprised of the following:

  • A message queue named /queue/msg
  • A distributed cache named counters
  • A listener for the queue that prints the received message and the current contents of the cache
  • An HA daemon named counter that queues a cached value and increments it every 5 seconds
  • An HA scheduled job named ajob that increments another counter in the cache every 20 seconds
  • A web request handler mounted at / that logs its :path-info and returns the current values of the two cached counters
  • Another request handler mounted at /count that increments a counter in the user's web session.

All the code (~60 lines) is contained in a single file.

Programming is hard, let's build a cluster!

Now we're ready to form a cluster by adding a gear to our app:

rhc scale-cartridge immutant -a demo 2

Again, this will take a few minutes, and it may return an error even though the operation actually succeeded. You can run the following to see the definitive state of your gears:

rhc show-app --gears

This also gives you the SSH URLs for your two gears. Fire up two shells and ssh into each of your gears using those SSH URLs. Then tail the log on each:

tail -f immutant/logs/server.log

When the dust settles, you'll eventually see the gears discover each other, and you should see both gears logging recv messages, one getting the even numbers and one getting the odd. This is your automatic load-balanced message distribution.

Note also that the counters cache logged in the recv message is correct on both gears, even though it's only being updated by one. This is our cache replication at work.

Let's break stuff!

And see how robust our cluster is.

High Availability Daemons and Jobs

Of course, the send and job log entries should still only appear on our original gear, because those are our HA singletons. If that gear crashes, our daemon and job should migrate to the other gear. While logged into the gear running your singletons, run this:

immutant/bin/control stop

And watch the other gear's log to verify the daemon and job pick up right where they left off, fetching their counters from the replicated cache. That gear should be consuming all the queued messages, too. Now start the original gear back up:

immutant/bin/control start

Eventually, it'll start receiving half the messages again.

Web

You may be wondering about those web entries showing up in both logs. They are "health check" requests from the HAProxy web load balancer, automatically installed on your primary gear. You can always check the state of your cluster from HAProxy's perspective by visiting http://demo-$namespace.rhcloud.com/haproxy-status. If you see that page without intending to, it means something about your app is broken, so check immutant/logs/server.log for errors and make sure your app responds to a request for the root context, i.e. "/".

Let's try some web stuff. Use curl to hit your app while observing the logs on both gears:

curl http://demo-$namespace.rhcloud.com/xxxxxxxxxxxxxxxxxxxx
curl http://demo-$namespace.rhcloud.com/yyyyyyyyyyyyyyyyyyyy
curl http://demo-$namespace.rhcloud.com/zzzzzzzzzzzzzzzzzzzz

Use an obnoxious path to distinguish your request from the health checks. Repeat the command a few times to observe the gears taking turns responding to your request. Now try it in a browser, and you'll see the same gear handling your request every time you reload. This is because HAProxy is setting cookies in the response to enable session affinity, which your browser is probably sending back. And curl didn't.

Speaking of session affinity, let's break that while we're at it, by invoking our other web handler, the one that increments a counter in the user's web session: http://demo-$namespace.rhcloud.com/count

You should see the counter increment each time you reload your browser. (You'll need to give curl a cookie store to see it respond with anything other than "1 times")

Pay attention to which gear is responding to the /count request. Now stop that gear like you did before. When you reload your browser, you should see the other gear return the expected value. This is the automatic session replication provided by immutant.web.session/servlet-store.

Don't forget to restart that gear.

The Hat Trick

Hey, OpenShift is giving us 3 free gears, we may as well use 'em all, right?

rhc scale-cartridge immutant -a demo 3

When the third one finally comes up, there are a couple of things you may notice:

  • The health checks will disappear from the primary gear as HAProxy takes it out of the rotation when 2 or more other gears are available, ostensibly to mitigate the observer effect of the health checks.
  • Each cache key will only show up in the recv log messages on 2 of the 3 gears. This is because Immutant caches default to Infinispan's :distributed replication mode in a cluster. This enables Infinispan clusters to achieve "linear scalability" as entries are copied to a fixed number of cluster nodes (default 2) regardless of the cluster size. Distribution uses a consistent hashing algorithm to determine which nodes will store a given entry.

Now what?

Well, that was a lot to cover. I doubt many apps will use all these features, but I think it's nice to have a free playground on which to try them out, even with the resources as constrained as they are on a small gear.

Regardless, I'm pretty happy that Immutant is finally feature-complete on OpenShift now. :-)

Of course, I had a lot of help getting things to this point. Many folks on the OpenShift and JBoss teams were generous with their expertise, but the "three B's" deserve special mention: Ben, Bela, and Bill.

Thanks!

Getting Started: Simulated Clustering

For this installment of our getting started series we'll experiment a bit with clustering, one of the primary benefits provided by the JBoss AS7 application server, upon which Immutant is built. AS7 features a brand new way of configuring and managing clusters called Domain Mode, but unfortunately its documentation is still evolving. If you insist, try this or possibly this.

We'll save Domain Mode with respect to Immutant for a future post. It's not required for clustering, but it is an option for easier cluster management. In this post, we'll reveal a trick to simulate a cluster on your development box so that you can experiment with Immutant clustering features, which we should probably enumerate now:

  • Automatic load balancing and failover of message consumers
  • HTTP session replication
  • Fine-grained, dynamic web-app configuration and control via mod_cluster
  • Efficiently-replicated distributed caching via Infinispan
  • Singleton scheduled jobs
  • Automatic failover of singleton daemons

Running an Immutant

As you know, installing Immutant is simple:

$ lein plugin install lein-immutant 0.4.1
$ lein immutant install

And running an Immutant is, too:

$ lein immutant run

By passing the --clustered option, you configure the Immutant as a node that will discover other nodes (via multicast, by default) to form a cluster:

$ lein immutant run --clustered

From the first line of its output, you can see what that command is really running:

$ $JBOSS_HOME/bin/standalone.sh --server-config=standalone-ha.xml

Any options passed to lein immutant run are forwarded to standalone.sh, so run the following to see what those are:

$ lein immutant run --help

Simulating a Cluster

TL;DR

To run two immutant instances on a single machine, fire up two shells and...

In one shell, run:

$ lein immutant run --clustered -Djboss.node.name=one -Djboss.server.data.dir=/tmp/one

In another shell, run:

$ lein immutant run --clustered -Djboss.node.name=two -Djboss.server.data.dir=/tmp/two -Djboss.socket.binding.port-offset=100

Boom, you're a cluster!

Details

Each cluster node requires a unique name, which is usually derived from the hostname, but since our Immutants are on the same host, we set the jboss.node.name property uniquely.

Each Immutant will attempt to persist its runtime state to the same files. Hijinks will ensue. We prevent said hijinks by setting the jboss.server.data.dir property uniquely.

JBoss listens for various types of connections on a few ports. One obvious solution to the potential conflicts is to bind each Immutant to a different interface, which we could specify using the -b option.

But rather than go through a platform-specific example of creating an IP alias, I'm going to take advantage of another JBoss feature: the jboss.socket.binding.port-offset property will cause each default port number to be incremented by a specified amount.

So for the second Immutant, I set the offset to 100, resulting in its HTTP service, for example, listening on 8180 instead of the default 8080, on which the first Immutant is listening.

Deploy an Application

With any luck at all, you have two Immutants running locally, both hungry for an app to deploy, so let's create one.

We've been over how to deploy an application before, but this time we're gonna keep it real simple: create a new directory and add two files.

First, you'll need a project.clj

(defproject example "1.0.0-SNAPSHOT"
  :dependencies [[org.clojure/clojure "1.3.0"]])

Next, the Immutant application bootstrap file, immutant.clj, into which we'll put all our code for this example.

(ns example.init
  (:require [immutant.messaging :as messaging]
            [immutant.daemons :as daemon])

;; Create a message queue
(messaging/start "/queue/msg")
;; Define a consumer for our queue
(def listener (messaging/listen "/queue/msg" #(println "received:" %)))

;; Controls the state of our daemon
(def done (atom false))

;; Our daemon's start function
(defn start []
  (reset! done false)
  (loop [i 0]
    (Thread/sleep 1000)
    (when-not @done
      (println "sending:" i)
      (messaging/publish "/queue/msg" i)
      (recur (inc i)))))

;; Our daemon's stop function
(defn stop []
  (reset! done true))

;; Register the daemon
(daemon/start "counter" start stop :singleton true)

We've defined a message queue, a message listener, and a daemon service that, once started, publishes messages to the queue every second.

Daemons require a name (for referencing as a JMX MBean), a start function to be invoked asynchronously, and a stop function that will be automatically invoked when your app is undeployed, allowing you to cleanly teardown any resources used by your service. Optionally, you can declare the service to be a singleton. This means it will only be started on one node in your cluster, and should that node crash, it will be automatically started on another node, essentially giving you a robust, highly-available service.

In the same directory that contains your files, run this:

$ lein immutant deploy

Because both Immutants are monitoring the same deployment directory, this should trigger both to deploy the app.

Now watch the output of the shells in which your Immutants are running. You should see the daemon start up on only one of them, but both should be receiving messages. This is the automatic load balancing of message consumers.

Now kill the Immutant running the daemon. Watch the other one to see that the daemon will start there within seconds. There's your automatic failover. Restart the killed Immutant to see him start to receive messages again. It's fun, right? :)

Whew!

So that's probably enough to show for now. Give it a try, and let us know if it worked for you the very first time. If it doesn't, please reach out to us in the usual ways and we'll be happy to get you going. Above all, have fun!