Clustering Immutants on OpenShift

Lately I've been spending a lot of time on OpenShift, building and testing a cartridge for Immutant that will properly form a cluster across multiple OpenShift gears. In this post, I'll go through the steps of deploying a simple application that demonstrates all of the Immutant clustering features running on the three small gears you get for free on OpenShift.

Here are the features we'll be demonstrating:

  • Load-balanced message distribution with automatic peer discovery
  • Replicated caching
  • Highly-Available, long-running daemons
  • HA scheduled jobs
  • Web load balancing and session replication

If you haven't already, go set up an OpenShift account and update your rhc gem to the latest version. I used 1.12.4 for this article. Below you'll see references to $namespace -- this corresponds to your OpenShift domain name, set by running rhc setup.

Note: If this is the first time you've used OpenShift, you'll need to visit the console and accept the usage agreement before running the rhc command.

Create a scaled OpenShift app

The Immutant cartridge is available here: https://github.com/immutant/openshift-immutant-cart. As described in its README, we create our app using the following command:

rhc app-create -s demo https://raw.github.com/immutant/openshift-immutant-cart/master/metadata/manifest.yml

We're calling our app demo and we're passing the -s option to make our app scalable. Notice that we're passing a raw URL to the cartridge's manifest.yml.

Small gears are pretty slow, but when app-create finally completes, you'll have a bare-bones, standard Leiningen application beneath the demo/ directory. At this point, you might tail your app's logs or ssh into your gear:

rhc tail demo
rhc ssh demo

The critical log file for Immutant on OpenShift is immutant/logs/server.log. Monitor this file until you eventually see the line, Deployed "your-clojure-application.clj". Then point a browser at http://demo-$namespace.rhcloud.com to see a simple welcome page.

Now we'll put some meat on our bare-bones app!

Push Me, Pull You

Typically, you will add the remote git repository for your real application to the local OpenShift repository you just created. We're going to use https://github.com/immutant/cluster-demo as our "real" application.

git remote add upstream -m master git@github.com:immutant/cluster-demo.git

Deployment of your app to OpenShift amounts to pulling from your real repository and pushing to OpenShift's.

git pull -s recursive -X theirs upstream master
git push

While waiting for that to complete, run rhc tail demo in another shell to monitor your log. This time, the Deployed "your-clojure-application.clj" message is going to scroll off the screen as the cluster-demo app starts logging its output. Eventually, the app should settle into a steady state looking something like this:

The cluster-demo app

If you can ignore the inconsistent thread identifiers in the above output, you'll notice there are exactly four types of messages: send, recv, web, and job. Noting the timestamps in the left column, a send is logged every 5 seconds, as is its corresponding recv, a web logged every 2 seconds, and a job every 20 seconds.

The cluster-demo app is comprised of the following:

  • A message queue named /queue/msg
  • A distributed cache named counters
  • A listener for the queue that prints the received message and the current contents of the cache
  • An HA daemon named counter that queues a cached value and increments it every 5 seconds
  • An HA scheduled job named ajob that increments another counter in the cache every 20 seconds
  • A web request handler mounted at / that logs its :path-info and returns the current values of the two cached counters
  • Another request handler mounted at /count that increments a counter in the user's web session.

All the code (~60 lines) is contained in a single file.

Programming is hard, let's build a cluster!

Now we're ready to form a cluster by adding a gear to our app:

rhc scale-cartridge immutant -a demo 2

Again, this will take a few minutes, and it may return an error even though the operation actually succeeded. You can run the following to see the definitive state of your gears:

rhc show-app --gears

This also gives you the SSH URLs for your two gears. Fire up two shells and ssh into each of your gears using those SSH URLs. Then tail the log on each:

tail -f immutant/logs/server.log

When the dust settles, you'll eventually see the gears discover each other, and you should see both gears logging recv messages, one getting the even numbers and one getting the odd. This is your automatic load-balanced message distribution.

Note also that the counters cache logged in the recv message is correct on both gears, even though it's only being updated by one. This is our cache replication at work.

Let's break stuff!

And see how robust our cluster is.

High Availability Daemons and Jobs

Of course, the send and job log entries should still only appear on our original gear, because those are our HA singletons. If that gear crashes, our daemon and job should migrate to the other gear. While logged into the gear running your singletons, run this:

immutant/bin/control stop

And watch the other gear's log to verify the daemon and job pick up right where they left off, fetching their counters from the replicated cache. That gear should be consuming all the queued messages, too. Now start the original gear back up:

immutant/bin/control start

Eventually, it'll start receiving half the messages again.

Web

You may be wondering about those web entries showing up in both logs. They are "health check" requests from the HAProxy web load balancer, automatically installed on your primary gear. You can always check the state of your cluster from HAProxy's perspective by visiting http://demo-$namespace.rhcloud.com/haproxy-status. If you see that page without intending to, it means something about your app is broken, so check immutant/logs/server.log for errors and make sure your app responds to a request for the root context, i.e. "/".

Let's try some web stuff. Use curl to hit your app while observing the logs on both gears:

curl http://demo-$namespace.rhcloud.com/xxxxxxxxxxxxxxxxxxxx
curl http://demo-$namespace.rhcloud.com/yyyyyyyyyyyyyyyyyyyy
curl http://demo-$namespace.rhcloud.com/zzzzzzzzzzzzzzzzzzzz

Use an obnoxious path to distinguish your request from the health checks. Repeat the command a few times to observe the gears taking turns responding to your request. Now try it in a browser, and you'll see the same gear handling your request every time you reload. This is because HAProxy is setting cookies in the response to enable session affinity, which your browser is probably sending back. And curl didn't.

Speaking of session affinity, let's break that while we're at it, by invoking our other web handler, the one that increments a counter in the user's web session: http://demo-$namespace.rhcloud.com/count

You should see the counter increment each time you reload your browser. (You'll need to give curl a cookie store to see it respond with anything other than "1 times")

Pay attention to which gear is responding to the /count request. Now stop that gear like you did before. When you reload your browser, you should see the other gear return the expected value. This is the automatic session replication provided by immutant.web.session/servlet-store.

Don't forget to restart that gear.

The Hat Trick

Hey, OpenShift is giving us 3 free gears, we may as well use 'em all, right?

rhc scale-cartridge immutant -a demo 3

When the third one finally comes up, there are a couple of things you may notice:

  • The health checks will disappear from the primary gear as HAProxy takes it out of the rotation when 2 or more other gears are available, ostensibly to mitigate the observer effect of the health checks.
  • Each cache key will only show up in the recv log messages on 2 of the 3 gears. This is because Immutant caches default to Infinispan's :distributed replication mode in a cluster. This enables Infinispan clusters to achieve "linear scalability" as entries are copied to a fixed number of cluster nodes (default 2) regardless of the cluster size. Distribution uses a consistent hashing algorithm to determine which nodes will store a given entry.

Now what?

Well, that was a lot to cover. I doubt many apps will use all these features, but I think it's nice to have a free playground on which to try them out, even with the resources as constrained as they are on a small gear.

Regardless, I'm pretty happy that Immutant is finally feature-complete on OpenShift now. :-)

Of course, I had a lot of help getting things to this point. Many folks on the OpenShift and JBoss teams were generous with their expertise, but the "three B's" deserve special mention: Ben, Bela, and Bill.

Thanks!

Introducing Immutant Pipelines

[pipeline]

Happy new year! We'd like to celebrate by announcing a new Immutant feature: pipelines. Pipelines are compositions of functions, where the functions are executed in individual threads (or thread pools), potentially on different nodes in an Immutant cluster. They are built on top of our messaging subsystem, and were inspired by lamina's pipelines.

Usage

We'll walk through a simple (and simplistic) example to demonstrate using a pipeline.

Creation

The first thing we have to do is create a pipeline. We do that with a call to the pipeline function, giving it a name and some single-arity functions that form the steps of the pipeline:

(require '[immutant.pipeline :as pl])

(defonce reverse-pl
  (pl/pipeline :reverse-a-string
    seq
    reverse
    (partial apply str)))

This looks similar to a 'thread last' (->>), or a comp in reverse. But for the functions we're using in this sample pipeline, let's pretend that each of them are more computation and time intensive than they actually are, and could benefit from being scaled across threads or across a cluster.

Putting data onto a pipeline

So, moving right along. We now have a pipeline, but how do we put data on it? The call to pipeline returns a function (we'll call it a pipeline-fn) that places data onto the head of the pipeline. Let's use it:

(let [result (reverse-pl "ham")]
  (deref result 1000 nil)) ;; => "mah"

What's with the deref? The pipeline execution is asynchronous - the pipeline-fn places the data onto the head of the pipeline, and immediately returns a delay. We dereference the delay to synchronize on the end of the pipeline, giving us the result. We're careful to use the deref that takes a timeout - it's possible for errors to occur during the pipeline execution, so we may never get a response (we'll talk more about error handling in a bit).

Raising the concurrency

By default, each step gets assigned one thread (per cluster node) to handle its work. If our pipeline processes data at a rate that is slower than the rate of insertion at the head of the pipeline, we can increase the number of threads for each step with the :concurrency option (options are passed as keyword arguments after the list of functions). Let's alter our original pipeline definition to do that:

(defonce reverse-pl
  (pl/pipeline :reverse-a-string
    seq
    reverse
    (partial apply str)
    :concurrency 5)) ;; 5 threads per step

But what if we have one step that is slower than the rest? Let's assume that reverse is the slowpoke here, and raise the :concurrency even higher for that step:

(defonce reverse-pl
  (pl/pipeline :reverse-a-string
    seq
    (pl/step reverse :concurrency 10) ;; 10 threads for this guy
    (partial apply str)
    :concurrency 5)) ;; 5 threads for each of the other steps

Here we've used the step function to attach options to a particular step. Options attached to steps will override the corresponding pipeline option where it makes sense.

Handling errors

Since pipelines are built on top of Immutant's message listeners, the default error handling is what the messaging system provides: if an exception occurs during the execution of a step, the delivery of the data to that step is rolled back, and will be retried up to ten times. If you need errors to be handled differently, you can provide an error handler function that must take two arguments: the exception, and the original argument passed to the step that threw the exception:

(pl/pipeline :do-something-on-the-network
    retrieve-a-url
    process-url-contents
    more-data-processing
    :error-handler (fn [ex v] 
                     (when (instance? ex SomeNetworkException)
                       (println "ERROR, skipping" pl/*current-step* ex)
                       (pl/*pipeline* v :step pl/*next-step*)))) ;; jump to the next step

Above we have a simple error handler that demonstrates putting a value back onto the pipeline, but skips the current step. We do that using a few vars that are bound during a pipeline execution:

If the error handler doesn't put the data back on to the pipeline, that particular pipeline execution is halted.

You can also specify an :error-handler for a particular step, which will override the pipeline error handler.

Let's see the above example again, but with a step-specific error handler that rethrows to trigger the default retry semantics:

(pl/pipeline :do-something-on-the-network
    (pl/step retrieve-a-url 
      :error-handler (fn [ex v] 
                       (if (instance? ex SomeNetworkException)
                         (println "ERROR retrieving url" v ", exiting:" ex) ;; exit the pipeline
                         (throw x)))) ;; retry
    process-url-contents
    more-data-processing
    :error-handler (fn [ex v] 
                     (when (instance? ex SomeNetworkException)
                       (println "ERROR, skipping" pl/*current-step* ex)
                       (pl/*pipeline* v :step pl/*next-step*))))

Pipelines within pipelines

Pipeline-fn's are just functions that happen to return a delay. To facilitate using pipelines within pipelines, any step result that can be dereferenced is, automatically:

(defonce rev-emb-pl 
  (pl/pipeline :reverse-and-embiggen
    reverse-pl 
    (memfn .toUpperCase) 
    #(str \¡ % \!)))

(deref (rev-emb-pl "tiucsib") 1000 nil) ;; => "¡BISCUIT!"

Since it's possible for the result of a step to never arrive, you can control how long this automatic deref waits:

(defonce rev-emb-pl 
  (pl/pipeline :reverse-and-embiggen
    reverse-pl 
    (memfn .toUpperCase) 
    #(str \¡ % \!)
    :step-deref-timeout 60000)) ;; in ms, default is 10 seconds

Like :concurrency and :error-handler, :step-deref-timeout can be overridden on a per-step basis.

Availabilty

Pipelines are currently available in the latest Immutant incremental builds, and will be part of 0.8.0, which should be released today.

We haven't covered everything about pipelines here, see the documentation for more details.

Pipeline support is an alpha feature at the moment, so its API is in flux - please give it a try and let us know how we can improve it.

Image credit: World Bank Photo Collection

Getting Started: Caching

This is the next tutorial in our getting started series: an exploration of Immutant's caching features. JBoss AS7 -- and therefore, Immutant -- comes with the Infinispan data grid baked right in, obviating the need to manage a separate caching service like Memcached for your applications.

Infinispan is a state-of-the-art, high-speed, low-latency, distributed data grid. It is capable of efficiently replicating key-value stores -- essentially souped-up ConcurrentMap implementations -- across a cluster. But it can also serve as a capable in-memory data cache, too: providing features such as write-through/write-behind persistence, multiple eviction policies, and transactions.

Clustering Modes

Infinispan caches adhere to a particular mode of operation. In a non-clustered, standalone Immutant, :local is the only supported mode. But when clustered, you have other options.

  • :local -- This is what you get in non-clustered mode, roughly equivalent to a hash map with write-through/write-behind persistence, JTA/XA support, MVCC (non-blocking, thread-safe reads even during concurrent writes), and JMX manageability.
  • :invalidated -- This is the default clustered mode. It doesn't actually share any data at all, so it's very "bandwidth friendly". Whenever data is changed in a cache, other caches in the cluster are notified that their copies are now stale and should be evicted from memory.
  • :replicated -- In this mode, entries added to any cache instance will be copied to all other cache instances in the cluster, and can then be retrieved locally from any instance. Though simple, it's impractical for clusters of any significant size (>10), and its capacity is equal to the amount of RAM in its smallest peer.
  • :distributed -- This mode is what enables Infinispan clusters to achieve "linear scalability". Cache entries are copied to a fixed number of cluster nodes (default is 2) regardless of the cluster size. Distribution uses a consistent hashing algorithm to determine which nodes will store a given entry.

immutant.cache/InfinispanCache

The first thing you must understand about Immutant's InfinispanCache is that it's mutable. This is sensible in a clustered environment, because the local process benefits from fast reads of data that may have been put there by a remote process. We effectively shift the responsibility of "sane data management", i.e. MVCC, from Clojure to Infinispan.

The second thing to know is that every Immutant cache has a cluster-scoped name and a mode. When you call immutant.cache/cache, the name is required, and it may refer to an existing cache that is already populated. The mode argument (one of :local, :invalidated, :replicated, or :distributed) is optional, defaulting to :invalidated if clustered and :local otherwise.

Because the cache implements many core Clojure interfaces, functions that typically return immutable copies will actually affect the cache contents:

UPDATE 3/22/2012: Due to feedback from our Clojure/West talk we no longer alter the Immutant caches through the core Clojure functions as shown below. See the latest docs for current info and examples.
  user> (def cache (immutant.cache/cache "test"))
  #'user/cache
  user> cache
  {}
  user> (assoc cache :a 1)
  {:a 1}
  user> (merge cache {:b 2, :c 3})
  {:c 3, :a 1, :b 2}
  user> (dissoc cache :c)
  {:a 1, :b 2}
  user> cache
  {:a 1, :b 2}
  user> (empty cache)
  {}
  user> cache
  {}

Further, the InfinispanCache supports a variety of put methods, some that expose the ConcurrentMap features of atomically storing entries based on their presence or absence. These all take lifespan options for time-to-live and max-idle expiration policies.

Memoization

Memoization is an optimization technique associating a cache of calculated values with a potentially expensive function, incurring the expense only once, with subsequent calls retrieving the result from the cache. The keys of the cache are the arguments passed to the function.

Standards for caching and memoization in Clojure are emerging in the form of core.cache and core.memoize, respectively. Because the InfinispanCache implements clojure.core.cache/CacheProtocol it can act as an underlying implementation for clojure.core.memoize/PluggableMemoization. Immutant includes a higher-order memo function for doing exactly that:

(immutant.cache/memo a-slow-function "a name" :distributed)

An Example

Let's ammend the example from our clustering tutorial to demonstrate a replicated cache. We'll create a simple web app with a single request to which we'll pass an integer. The request handler will pass that number to a very slow increment function: it'll sleep for that number of seconds before returning its increment. For us, this sleepy function represents a particularly time-consuming operation that will benefit from memoization.

Of course we'll need a project.clj

(defproject example "1.0.0-SNAPSHOT"
  :dependencies [[org.clojure/clojure "1.3.0"]])

Next, the Immutant application bootstrap file, immutant.clj, into which we'll put all our code for this example.

(ns example.init
  (:use [ring.util.response]
        [ring.middleware.params])
  (:require [immutant.cache :as cache]
            [immutant.web :as web]))

;; Our slow function
(defn slow-inc [t]
  (Thread/sleep (* t 1000))
  (inc t))

;; Our memoized version of the slow function
(def memoized-inc (cache/memo slow-inc "sleepy" :replicated))

;; Our Ring handler
(defn handler [{params :params}]
  (let [t (Integer. (get params "t" 1))]
    (response (str "value=" (memoized-inc t) "\n"))))

;; Start up our web app
(web/start "/" (wrap-params handler))

Make sure you have a recent version of Immutant:

$ lein immutant install

And cd to the directory containing the above two files and deploy your app:

$ lein immutant deploy

Now bring up your simulated cluster. In one shell, run:

$ lein immutant run --clustered -Djboss.node.name=one -Djboss.server.data.dir=/tmp/one

In another shell, run:

$ lein immutant run --clustered -Djboss.node.name=two -Djboss.server.data.dir=/tmp/two -Djboss.socket.binding.port-offset=100

You should have one server listening on port 8080 and another on 8180. So in yet another shell, run this:

$ curl "http://localhost:8080/example/?t=5"

With any luck, that should return 6 after about 5 seconds. Now run it again and it should return 6 immediately. Now for the moment of truth: change the port to 8180. That should return 6 immediately, too! Each unique value of t should only sleep t seconds the first time called on any peer in the cluster.

Here's another trick. Fire off a request with t=20 or so, and wait a few seconds, but before it completes hit the same server again with the same t value. You'll notice that the second request will not have to sleep for the full 20 seconds; it returns immediately after the first completes.

Caveats

Though tested and toyed with, this is seriously Alpha code, and the API is still coagulating, especially with respect to the options for the cache and memo functions, which should probably include :ttl and :idle themselves, for example. Other options may be introduced as more of Infinispan's features are exposed, e.g. transactions and persistence.

By the way, we've recently gotten a decent start on our user manual and api docs, so take a gander there for more details on caching or any of the other Immutant components. And remember to pester us in the usual ways if you have any questions.

Happy hacking!

Getting Started: Simulated Clustering

For this installment of our getting started series we'll experiment a bit with clustering, one of the primary benefits provided by the JBoss AS7 application server, upon which Immutant is built. AS7 features a brand new way of configuring and managing clusters called Domain Mode, but unfortunately its documentation is still evolving. If you insist, try this or possibly this.

We'll save Domain Mode with respect to Immutant for a future post. It's not required for clustering, but it is an option for easier cluster management. In this post, we'll reveal a trick to simulate a cluster on your development box so that you can experiment with Immutant clustering features, which we should probably enumerate now:

  • Automatic load balancing and failover of message consumers
  • HTTP session replication
  • Fine-grained, dynamic web-app configuration and control via mod_cluster
  • Efficiently-replicated distributed caching via Infinispan
  • Singleton scheduled jobs
  • Automatic failover of singleton daemons

Running an Immutant

As you know, installing Immutant is simple:

$ lein plugin install lein-immutant 0.4.1
$ lein immutant install

And running an Immutant is, too:

$ lein immutant run

By passing the --clustered option, you configure the Immutant as a node that will discover other nodes (via multicast, by default) to form a cluster:

$ lein immutant run --clustered

From the first line of its output, you can see what that command is really running:

$ $JBOSS_HOME/bin/standalone.sh --server-config=standalone-ha.xml

Any options passed to lein immutant run are forwarded to standalone.sh, so run the following to see what those are:

$ lein immutant run --help

Simulating a Cluster

TL;DR

To run two immutant instances on a single machine, fire up two shells and...

In one shell, run:

$ lein immutant run --clustered -Djboss.node.name=one -Djboss.server.data.dir=/tmp/one

In another shell, run:

$ lein immutant run --clustered -Djboss.node.name=two -Djboss.server.data.dir=/tmp/two -Djboss.socket.binding.port-offset=100

Boom, you're a cluster!

Details

Each cluster node requires a unique name, which is usually derived from the hostname, but since our Immutants are on the same host, we set the jboss.node.name property uniquely.

Each Immutant will attempt to persist its runtime state to the same files. Hijinks will ensue. We prevent said hijinks by setting the jboss.server.data.dir property uniquely.

JBoss listens for various types of connections on a few ports. One obvious solution to the potential conflicts is to bind each Immutant to a different interface, which we could specify using the -b option.

But rather than go through a platform-specific example of creating an IP alias, I'm going to take advantage of another JBoss feature: the jboss.socket.binding.port-offset property will cause each default port number to be incremented by a specified amount.

So for the second Immutant, I set the offset to 100, resulting in its HTTP service, for example, listening on 8180 instead of the default 8080, on which the first Immutant is listening.

Deploy an Application

With any luck at all, you have two Immutants running locally, both hungry for an app to deploy, so let's create one.

We've been over how to deploy an application before, but this time we're gonna keep it real simple: create a new directory and add two files.

First, you'll need a project.clj

(defproject example "1.0.0-SNAPSHOT"
  :dependencies [[org.clojure/clojure "1.3.0"]])

Next, the Immutant application bootstrap file, immutant.clj, into which we'll put all our code for this example.

(ns example.init
  (:require [immutant.messaging :as messaging]
            [immutant.daemons :as daemon])

;; Create a message queue
(messaging/start "/queue/msg")
;; Define a consumer for our queue
(def listener (messaging/listen "/queue/msg" #(println "received:" %)))

;; Controls the state of our daemon
(def done (atom false))

;; Our daemon's start function
(defn start []
  (reset! done false)
  (loop [i 0]
    (Thread/sleep 1000)
    (when-not @done
      (println "sending:" i)
      (messaging/publish "/queue/msg" i)
      (recur (inc i)))))

;; Our daemon's stop function
(defn stop []
  (reset! done true))

;; Register the daemon
(daemon/start "counter" start stop :singleton true)

We've defined a message queue, a message listener, and a daemon service that, once started, publishes messages to the queue every second.

Daemons require a name (for referencing as a JMX MBean), a start function to be invoked asynchronously, and a stop function that will be automatically invoked when your app is undeployed, allowing you to cleanly teardown any resources used by your service. Optionally, you can declare the service to be a singleton. This means it will only be started on one node in your cluster, and should that node crash, it will be automatically started on another node, essentially giving you a robust, highly-available service.

In the same directory that contains your files, run this:

$ lein immutant deploy

Because both Immutants are monitoring the same deployment directory, this should trigger both to deploy the app.

Now watch the output of the shells in which your Immutants are running. You should see the daemon start up on only one of them, but both should be receiving messages. This is the automatic load balancing of message consumers.

Now kill the Immutant running the daemon. Watch the other one to see that the daemon will start there within seconds. There's your automatic failover. Restart the killed Immutant to see him start to receive messages again. It's fun, right? :)

Whew!

So that's probably enough to show for now. Give it a try, and let us know if it worked for you the very first time. If it doesn't, please reach out to us in the usual ways and we'll be happy to get you going. Above all, have fun!