Clustering Immutants on OpenShift

Lately I've been spending a lot of time on OpenShift, building and testing a cartridge for Immutant that will properly form a cluster across multiple OpenShift gears. In this post, I'll go through the steps of deploying a simple application that demonstrates all of the Immutant clustering features running on the three small gears you get for free on OpenShift.

Here are the features we'll be demonstrating:

  • Load-balanced message distribution with automatic peer discovery
  • Replicated caching
  • Highly-Available, long-running daemons
  • HA scheduled jobs
  • Web load balancing and session replication

If you haven't already, go set up an OpenShift account and update your rhc gem to the latest version. I used 1.12.4 for this article. Below you'll see references to $namespace -- this corresponds to your OpenShift domain name, set by running rhc setup.

Note: If this is the first time you've used OpenShift, you'll need to visit the console and accept the usage agreement before running the rhc command.

Create a scaled OpenShift app

The Immutant cartridge is available here: https://github.com/immutant/openshift-immutant-cart. As described in its README, we create our app using the following command:

rhc app-create -s demo https://raw.github.com/immutant/openshift-immutant-cart/master/metadata/manifest.yml

We're calling our app demo and we're passing the -s option to make our app scalable. Notice that we're passing a raw URL to the cartridge's manifest.yml.

Small gears are pretty slow, but when app-create finally completes, you'll have a bare-bones, standard Leiningen application beneath the demo/ directory. At this point, you might tail your app's logs or ssh into your gear:

rhc tail demo
rhc ssh demo

The critical log file for Immutant on OpenShift is immutant/logs/server.log. Monitor this file until you eventually see the line, Deployed "your-clojure-application.clj". Then point a browser at http://demo-$namespace.rhcloud.com to see a simple welcome page.

Now we'll put some meat on our bare-bones app!

Push Me, Pull You

Typically, you will add the remote git repository for your real application to the local OpenShift repository you just created. We're going to use https://github.com/immutant/cluster-demo as our "real" application.

git remote add upstream -m master git@github.com:immutant/cluster-demo.git

Deployment of your app to OpenShift amounts to pulling from your real repository and pushing to OpenShift's.

git pull -s recursive -X theirs upstream master
git push

While waiting for that to complete, run rhc tail demo in another shell to monitor your log. This time, the Deployed "your-clojure-application.clj" message is going to scroll off the screen as the cluster-demo app starts logging its output. Eventually, the app should settle into a steady state looking something like this:

The cluster-demo app

If you can ignore the inconsistent thread identifiers in the above output, you'll notice there are exactly four types of messages: send, recv, web, and job. Noting the timestamps in the left column, a send is logged every 5 seconds, as is its corresponding recv, a web logged every 2 seconds, and a job every 20 seconds.

The cluster-demo app is comprised of the following:

  • A message queue named /queue/msg
  • A distributed cache named counters
  • A listener for the queue that prints the received message and the current contents of the cache
  • An HA daemon named counter that queues a cached value and increments it every 5 seconds
  • An HA scheduled job named ajob that increments another counter in the cache every 20 seconds
  • A web request handler mounted at / that logs its :path-info and returns the current values of the two cached counters
  • Another request handler mounted at /count that increments a counter in the user's web session.

All the code (~60 lines) is contained in a single file.

Programming is hard, let's build a cluster!

Now we're ready to form a cluster by adding a gear to our app:

rhc scale-cartridge immutant -a demo 2

Again, this will take a few minutes, and it may return an error even though the operation actually succeeded. You can run the following to see the definitive state of your gears:

rhc show-app --gears

This also gives you the SSH URLs for your two gears. Fire up two shells and ssh into each of your gears using those SSH URLs. Then tail the log on each:

tail -f immutant/logs/server.log

When the dust settles, you'll eventually see the gears discover each other, and you should see both gears logging recv messages, one getting the even numbers and one getting the odd. This is your automatic load-balanced message distribution.

Note also that the counters cache logged in the recv message is correct on both gears, even though it's only being updated by one. This is our cache replication at work.

Let's break stuff!

And see how robust our cluster is.

High Availability Daemons and Jobs

Of course, the send and job log entries should still only appear on our original gear, because those are our HA singletons. If that gear crashes, our daemon and job should migrate to the other gear. While logged into the gear running your singletons, run this:

immutant/bin/control stop

And watch the other gear's log to verify the daemon and job pick up right where they left off, fetching their counters from the replicated cache. That gear should be consuming all the queued messages, too. Now start the original gear back up:

immutant/bin/control start

Eventually, it'll start receiving half the messages again.

Web

You may be wondering about those web entries showing up in both logs. They are "health check" requests from the HAProxy web load balancer, automatically installed on your primary gear. You can always check the state of your cluster from HAProxy's perspective by visiting http://demo-$namespace.rhcloud.com/haproxy-status. If you see that page without intending to, it means something about your app is broken, so check immutant/logs/server.log for errors and make sure your app responds to a request for the root context, i.e. "/".

Let's try some web stuff. Use curl to hit your app while observing the logs on both gears:

curl http://demo-$namespace.rhcloud.com/xxxxxxxxxxxxxxxxxxxx
curl http://demo-$namespace.rhcloud.com/yyyyyyyyyyyyyyyyyyyy
curl http://demo-$namespace.rhcloud.com/zzzzzzzzzzzzzzzzzzzz

Use an obnoxious path to distinguish your request from the health checks. Repeat the command a few times to observe the gears taking turns responding to your request. Now try it in a browser, and you'll see the same gear handling your request every time you reload. This is because HAProxy is setting cookies in the response to enable session affinity, which your browser is probably sending back. And curl didn't.

Speaking of session affinity, let's break that while we're at it, by invoking our other web handler, the one that increments a counter in the user's web session: http://demo-$namespace.rhcloud.com/count

You should see the counter increment each time you reload your browser. (You'll need to give curl a cookie store to see it respond with anything other than "1 times")

Pay attention to which gear is responding to the /count request. Now stop that gear like you did before. When you reload your browser, you should see the other gear return the expected value. This is the automatic session replication provided by immutant.web.session/servlet-store.

Don't forget to restart that gear.

The Hat Trick

Hey, OpenShift is giving us 3 free gears, we may as well use 'em all, right?

rhc scale-cartridge immutant -a demo 3

When the third one finally comes up, there are a couple of things you may notice:

  • The health checks will disappear from the primary gear as HAProxy takes it out of the rotation when 2 or more other gears are available, ostensibly to mitigate the observer effect of the health checks.
  • Each cache key will only show up in the recv log messages on 2 of the 3 gears. This is because Immutant caches default to Infinispan's :distributed replication mode in a cluster. This enables Infinispan clusters to achieve "linear scalability" as entries are copied to a fixed number of cluster nodes (default 2) regardless of the cluster size. Distribution uses a consistent hashing algorithm to determine which nodes will store a given entry.

Now what?

Well, that was a lot to cover. I doubt many apps will use all these features, but I think it's nice to have a free playground on which to try them out, even with the resources as constrained as they are on a small gear.

Regardless, I'm pretty happy that Immutant is finally feature-complete on OpenShift now. :-)

Of course, I had a lot of help getting things to this point. Many folks on the OpenShift and JBoss teams were generous with their expertise, but the "three B's" deserve special mention: Ben, Bela, and Bill.

Thanks!

At-style Jobs in Immutant

[It's about time!]

We've always supported scheduling jobs using a cron-style syntax. We've now extended that to support a finer-grained at-style syntax:

(require '[immutant.jobs :as jobs])

(jobs/schedule "my-at-job"
                #(println "I fire 4 times with a 10ms delay between each, starting in 500ms.")
                :in 500   
                :every 10 
                :repeat 3)

This functionality is available in recent incremental builds, and will be in 0.9.0, which should be released todaytomorrow.

The Details

Instead of a specification string, at-style jobs are controlled by a set of options that are passed to the immutant.jobs/schedule function, and can be mixed and matched.

We won't go through all of the permutations of the options here - you can see the details in the docs. Instead, we'll look at a few examples of usage.

If you specify a job without a cron specification or any at options, it will fire once, immediately:

(jobs/schedule "fire-now"  
               #(println "I'll fire right now, once."))

Specifying a job with just a start option (either :at or :in) will still only fire once, but at the specified time:

;; you can specify :at as a Date
(jobs/schedule "fire-later"  
               #(println "I'll fire at a-java-util-date, once.")
               :at a-java-util-date)

#inst "2013-02-27T18:29:35.222-00:00" 1361989775222

;; or as ms since the epoch
(jobs/schedule "fire-later2"  
               #(println "I'll fire at 2013-02-27T18:29:35.222, once.")
               :at 1361989775222)

;; :in signifies "ms from now"
(jobs/schedule "fire-in"  
               #(println "I'll fire in 500 ms, once.")
               :in 500)

You can control repetition of the jobs with :every, :repeat, and :until:

;; fire every 500 ms, forever
(jobs/schedule "fire-forever"  
               #(println "I'll fire every 500 ms, forever.")
               :every 500)

;; fire every 500 ms until a time in the future
(jobs/schedule "fire-for-a-while"  
               #(println "I'll fire every 500 ms, until a-java-util-date-or-ms-since-epoch.")
               :every 500
               :until a-java-util-date-or-ms-since-epoch)

;; fire 6 times, every 500 ms
(jobs/schedule "fire-repeat"  
               #(println "I'll fire 6 times (:repeat + 1) on a 500 ms interval.")
               :every 500
               :repeat 5)

;; all of the above work with an :at or :in
(jobs/schedule "fire-forever-in-a-bit"  
               #(println "I'll fire every 500 ms, forever, starting in 2500 ms.")
               :every 500
               :in 2500)

At-style jobs can be unscheduled just like cron-style jobs by passing the name to immutant.jobs/unschedule, which will be called on your behalf when the application is undeployed if you don't do it. They can also be rescheduled by calling schedule again with the same name. So "forever" in the examples really means "until undeployed, unscheduled, or rescheduled".

Can I still use the cron syntax?

Absolutely! The only change that will affect you is the order of arguments to the schedule function have changed (the old order still works for now, but is deprecated, and will be removed in 1.0.0.Beta1):

(require '[immutant.jobs :as job])
;; the deprecated signature
(jobs/schedule "old-api-cron-job"  
                   "*/5 * * * * ?"
                   #(println "I fire every 5s, forever."))

;; the new signature
(jobs/schedule "new-api-cron-job"  
                   #(println "I fire every 5s, forever.")
                   "*/5 * * * * ?")

At-style jobs are fairly new - please give it a try and let us know if you have any issues or questions.

Image credit: quinnums

Getting Started: Scheduling Jobs

Note: this article is out of date. For more recent instructions on using scheduled jobs, see the tutorial.

This article covers job schedulding in Immutant, and is part of our getting started series of tutorials.

Jobs in Immutant are simply functions that execute on a recurring schedule. They fire asynchronously, outside of the thread where they are defined, and fire in the same runtime as the rest of the application, so have access to any shared state.

Jobs are built on top of the Quartz library, and support scheduling via a cron-like specification.

Why would I use this over quartz-clj or calling Quartz directly?

I'm glad you asked! There are several reasons:

  • Immutant abstracts away the complexity of Quartz's internals, so you don't have to worry about managing Schedulers and creating JobDetails, and provides enough functionality for a majority of use cases. For cases where you need advanced scheduling functionality, you can still use quartz-clj or the Quartz classes directly.
  • If you are using Immutant in a cluster, jobs that should fire only once per cluster (aka 'singleton jobs') are handled automatically (see below).
  • When your application is undeployed, your jobs are automatically unscheduled. Note that if you use quartz-clj or Quartz directly from your application, you'll need to clean up after yourself so you don't leave jobs lingering around since Immutant can't automatically unschedule them for you.

Scheduling Jobs

Scheduling a job is as simple as calling the schedule function from the immutant.jobs namespace:

(require '[immutant.jobs :as jobs])
(jobs/schedule "my-job-name" "*/5 * * * * ?" 
                #(println "I was called!"))

The schedule function requires three arguments:

  • name - the name of the job.
  • spec - the cron-style specification string (see below).
  • f - the zero argument function that will be invoked each time the job fires.

Job scheduling is dynamic, and can occur anywhere in your application code. Jobs that share the lifecycle of your application are idiomatically placed in immutant.clj.

You can safely call schedule multiple times with the same job name - the named job will rescheduled.

Cron Sytanx

The spec attribute should contain a crontab-like entry. This is similar to cron specifications used by Vixie cron, anacron and friends, but includes an additional field for specifying seconds. It is composed of 7 fields (6 are required):

SecondsMinutesHoursDay of MonthMonthDay of WeekYear
0-590-590-231-311-12 or JAN-DEC1-7 or SUN-SAT1970-2099 (optional)

For several fields, you may denote subdivision by using the forward-slash (/) character. To execute a job every 5 minutes, */5 in the minutes field would specify this condition.

Spans may be indicated using the dash (-) character. To execute a job Monday through Friday, MON-FRI should be used in the day-of-week field.

Multiple values may be separated using the comma (,) character. The specification of 1,15 in the day-of-month field would result in the job firing on the 1st and 15th of each month.

Either day-of-month or day-of-week must be specified using the ? character, since specifying both is contradictory.

See the Quartz cron specification for additional details.

Unscheduling Jobs

Jobs can be unscheduled via the unschedule function:

(require '[immutant.jobs :as jobs])
    
(jobs/unschedule "my-job-name")

The unschedule function requires one argument:

  • name - the name of a previously scheduled job.

If the given name resolves to an existing job, that job will be unscheduled and the call will return true, otherwise nil is returned.

Jobs are automatically unscheduled when your application is undeployed.

Clustering

When using Immutant in a cluster, you'll need to mark any jobs that should only be scheduled once for the entire cluster with the :singleton option:

(require '[immutant.jobs :as jobs])
(jobs/schedule "my-job-name" "*/5 * * * * ?" 
                #(println "I only fire on one node")
                :singleton true)

If :singleton is true, the job will be scheduled to run on only one node in the cluster at a time. If that node goes down, the job will automatically be scheduled on another node, giving you failover. If :singleton is false or not provided, the job will be scheduled to run on all nodes where the schedule call is executed.

Look for a future post in our Getting Started series on using Immutant in a cluster.

The Future

Currently, jobs can only be scheduled using CronTrigger functionality. We plan to add support for SimpleTrigger functionality at some point in the future, allowing you to do something similar to:

(require '[immutant.jobs :as jobs])
(jobs/schedule "my-at-job" (jobs/every "3s" :times 5)
                #(println "I fire 5 times, every 3 seconds"))

Since Immutant is still in a pre-alpha state, none of what I said above is set in stone. If anything does change, We'll update this post to keep it accurate.

If you have any feedback or questions, get in touch!