WebSockets and SockJS with Immutant and Vert.x - Part 2

This is a followup to our post last week on WebSockets with Vert.x. If you haven't read it, you should do so now. In that post, we set up a simple echo service in Vert.x that bridged the Vert.x EventBus to the browser. But that echo service wasn't very useful - there was no way to process incoming messages outside of the daemon, and no way to send messages down to the browser client from other parts of the application. Today, we're going to look at bridging the EventBus over to Immutant messaging, allowing us to actually interact with the client from anywhere within our application.

Our application

We'll be using the same application we used in the last post, but will be working off of a branch.

To get started, clone the app and run it:1

cd /path/to/simple-immutant-vertx-demo
git checkout with-messaging
lein do immutant deploy, immutant run

Then browse to http://localhost:8080/. You should see a UI that lets you send messages and see those messages echoed back, but now they come back in uppercase:

[UI activity]

Let's see some code!

Most of the application remains the same as it did before. But instead of just copying messages from the request address to the response address, we've now wired our demo.bridge namespace to the Immutant messaging system. We now have functions that bridge EventBus addresses to Immutant messaging destinations, and vice-versa, and have modified the init-bridge function to map the appropriate addresses and destinations:

(ns demo.bridge
  (:require [vertx.embed :as vembed :refer [with-vertx]]
            [vertx.http :as http]
            [vertx.http.sockjs :as sockjs]
            [vertx.eventbus :as eb]
            [immutant.messaging :as msg]))

(defn dest->eventbus
  "Sets up a bridge to copy messages from an Immutant messaging dest to a Vertx address."
  [vertx dest address]
  (msg/listen dest #(with-vertx vertx
                      (eb/publish address %))))

(defn eventbus->dest
  "Sets up a bridge to copy messages from a Vertx address to an Immutant messaging dest."
  [vertx address dest]
  (with-vertx vertx
    (eb/on-message address (partial msg/publish dest))))

(defn- start-sockjs-bridge
  "Creates a Vert.x http server, a sockjs server within that http
  server, then installs an eventbus bridge in the sockjs server."
  [vertx host port path]
  (println (format "Starting SockJS bridge at http://%s:%s%s" host port path))
  (with-vertx vertx
    (let [server (http/server)]
      (-> server
          (sockjs/sockjs-server)
          (sockjs/bridge {:prefix path} [{}] [{}]))
      (http/listen server port host))))

(defn init-bridge
  "Initializes the embedded vertx instance, bridges to Immutant destinations, and starts the sockjs bridge."
  [{:keys [request-dest response-dest]}]
  (let [vertx (vembed/vertx)]
    (eventbus->dest vertx "demo.request" request-dest)
    (dest->eventbus vertx response-dest "demo.response")
    {:vertx vertx
     :server (start-sockjs-bridge vertx "localhost" 8081 "/eventbus")}))

Now that demo.bridge no longer echos, but instead expects something on the other end of the request-dest, we need something listening on the other end to do the work. We've added this to the demo.init namespace, which is also where we define the request/response destination names. Our listener here just watches queue.request, uppercases each message, and publishes it to topic.response. Since we have bridged those same destinations in demo.bridge, we again have a completed circle from the client and back:

(ns demo.init
  (:require [demo.web :as web]
            [demo.daemon :as daemon]
            [immutant.messaging :as msg]))

(def config {:response-dest "topic.response"
             :request-dest "queue.request"
             :process-fn (memfn toUpperCase)})

(defn init []
  (let [{:keys [request-dest response-dest process-fn]} config]
    (msg/start request-dest)
    (msg/start response-dest)
    (msg/listen request-dest
                #(msg/publish response-dest (process-fn %))))
  (web/start)
  (daemon/start config))

Touch the UI from anywhere

Now that we've bridged the EventBus to the Immutant messaging system, we can interact with our client UI from anywhere within our application. Just to beat the horse a bit more, let's do it from the repl. Connect to the nREPL endpoint in the application running on port 53092 using your favorite client, then try sending messages directly to the response topic, or to the request queue to have them uppercased first:

user> (require '[immutant.messaging :as msg])
nil
user> (msg/publish "topic.response" "ahoyhoy")
#<HornetQTextMessage HornetQMessage[ID:8af51642-2478-11e3-9deb-25745b71356d]:PERSISTENT>
user> (msg/publish "queue.request" "ahoyhoy")
#<HornetQTextMessage HornetQMessage[ID:90e4b5b8-2478-11e3-9deb-25745b71356d]:PERSISTENT>
user> 

You can also send structured messages:

user> (msg/publish "topic.response" {:x :y})
#<HornetQTextMessage HornetQMessage[ID:e09bf794-2478-11e3-9deb-25745b71356d]:PERSISTENT>

And see them all displayed in the client UI:

[repl UI activity]

Fare thee well

We've extended our prior example to make it actually useful, and maintained a separation of concerns within our application - code outside of the demo.bridge namespace has no knowledge of Vert.x, nor of the UI communication mechanism. We think this provides a compelling way to provide dynamic updates to the browser, but if you don't, or have any other questions, comments, or feedback, please get in touch.


  1. ^ This assumes you have a recent Immutant installed.
  2. ^ The demo application specifies 5309 as the :nrepl-port in its project.clj. If you have :immutant {:nrepl-port some-port} set in your ~/.lein/profiles.clj, that will override 5309 and you'll need to connect to whatever port the endpoint is bound to.

WebSockets and SockJS with Immutant and Vert.x

Currently, Immutant doesn't provide any native WebSockets support. However, it is possible to use another WebSockets-capable server from within an Immutant daemon. There are quite a few well-known options in the Clojure ecosystem that we could use here, like Aleph, http-kit, or raw Jetty or Netty. Instead, we're going to use a relative newcomer to the Clojure space: Vert.x.

What is Vert.x?

Vert.x is an asynchronous polyglot application platform built on top of Netty that has been around for a while, but just recently gained Clojure support. It provides (among other things) its own message passing system (the EventBus), and provides a SockJS implementation that allows browser clients to participate as peers in the EventBus over WebSockets, falling back to other protocols as the browser and network topology dictate. SockJS and an EventBus that is bridged to the client abstracts away some of the complexity of managing dynamic updates to the browser, and is the primary reason we're using Vert.x over some of the alternatives mentioned above.

Vert.x includes a javascript EventBus client for use in client-side code that allows the browser to participate in the EventBus as a peer. The Vert.x Clojure language module includes a ClojureScript wrapper around that javascript client, which we'll use in a bit.

Our application

To demonstrate using the Vert.x EventBus bridge from Immutant, we're going to look at a simple application that embeds1 Vert.x into an Immutant daemon to provide an echo service over the EventBus.

To get started, clone the app and run it:2

cd /path/to/simple-immutant-vertx-demo
lein do immutant deploy, immutant run

Then browse to http://localhost:8080/. You should see a UI that lets you send messages and see those messages echoed back. If you're using a browser with a Network console, you should be able to see the the SockJS WebSockets traffic, like so:

[websocket activity]

Let's see some code!

First, let's take a look at the ClojureScript client. It's fairly standard Enfocus transformations, with EventBus calls mixed in:

(ns demo.client
  (:require [enfocus.core :as ef]
            [enfocus.events :as events]
            [vertx.client.eventbus :as eb]))

(defn open-eventbus
  "Opens a connection to the remote EventBus endpoint."
  [& on-open]
  (let [eb (eb/eventbus "http://localhost:8081/eventbus")]
    (eb/on-open eb #(.log js/console "eventbus opened"))
    (mapv #(eb/on-open eb (fn [] (% eb))) on-open)))

(defn append-content
  "Append the given content to the element specified by id"
  [id content]
  (ef/at id (ef/append (ef/html [:div content]))))

(defn send-message
  "Sends a message to the request address."
  [eb message]
  (eb/publish eb "demo.request" message))

(defn attach-listeners
  "Attaches listeners to both the the request and response addresses,
   displaying the received messages in the appropriate divs."
  [eb]
  (eb/on-message eb "demo.request" (partial append-content "#sent"))
  (eb/on-message eb "demo.response" (partial append-content "#rcvd")))

(defn attach-send-click
  "Attaches handler to send a message when the send button is clicked."
  [eb]
  (ef/at "#send-message"
         (events/listen :click
                        #(send-message eb (ef/from "#message"
                                                   (ef/get-prop :value))))))
(defn init []
  (open-eventbus
   attach-listeners
   attach-send-click))

(set! (.-onload js/window) init)

On the server side, we start up the SockJS EventBus bridge as an Immutant daemon in the demo.daemon namespace, and is standard Immutant daemon management code. The functions that actually do the work of setting up the bridge are in the demo.bridge namespace:3

(ns demo.bridge
  (:require [vertx.embed :as vembed :refer [with-vertx]]
            [vertx.http :as http]
            [vertx.http.sockjs :as sockjs]
            [vertx.eventbus :as eb]))

(defn- start-sockjs-bridge
  "Creates a Vert.x http server, a sockjs server within that http
  server, then installs an eventbus bridge in the sockjs server."
  [vertx host port path]
  (println (format "Starting SockJS bridge at http://%s:%s%s" host port path))
  (with-vertx vertx
    (let [server (http/server)]
      (-> server
          (sockjs/sockjs-server)
          (sockjs/bridge {:prefix path} [{}] [{}]))
      (http/listen server port host))))

(defn init-bridge
  "Initializes the embedded vertx instance, sets up our echo handler,
   and starts the sockjs bridge."
  []
  (let [vertx (vembed/vertx)]
    (with-vertx vertx
      (eb/on-message "demo.request"
                     (partial eb/publish "demo.response")))
    {:vertx vertx
     :server (start-sockjs-bridge vertx "localhost" 8081 "/eventbus")}))

A nice, neat little package?

So, what have we done here? We've added dynamic updates over WebSockets (with fallback) to an Immutant application, without having to handle the minutiae of bi-directional communication over WebSockets and any fallback protocols. And since Vert.x allows the browser client to be an equal peer in the EventBus, we were able to use a similar API on the server and client.

However, it's not all roses - there is a drawback to this approach: since Immutant doesn't support WebSockets natively, we can't share the http port and upgrade connections to WebSockets on request. This means that any WebSockets solution we run as a daemon has to bind to its own port.

This has been an exploration of one way to add simple dynamic interaction to an Immutant application, but is certainly not the only way. If you try this approach with another WebSockets server, let us know how it goes.

Watch for a post in the future that presents a more complex application that bridges the Vert.x EventBus to Immutant messaging destinations.

If you have any questions, comments, or feedback, please get in touch.


  1. ^ Vert.x provides its own application container, but we're using it embedded, which is an advanced usage.
  2. ^ This assumes you have a recent Immutant installed.
  3. ^ For this example, we're not securing the EventBus bridge at all. Doing so is probably a good idea.