OpenShift, PostgreSQL and Poorsmatic

Today we'll get a Clojure application running in Immutant on OpenShift, persisting its data to a PostgreSQL database. We'll use Poorsmatic, the app I built in my recent talk at Clojure/Conj 2012.

Poorsmatic, a "poor man's Prismatic", is a truly awful content discovery service that merely returns URL's from Twitter that contain at least one occurrence of the search term used to find the tweets containing the URL's in the first place.

Got that? Don't worry. It doesn't matter.

Because Poorsmatic was contrived to be a pretty good example of many of Immutant's features, including topics, queues, XA transactions, HA services, and a few other things. In my talk I used Datomic as my database, but here we'll try a different approach, using Lobos for database migrations, the Korma DSL, and OpenShift's PostgreSQL cartridge for persistence.

Create an app on OpenShift

To get started on OpenShift you'll need an account, the command line tools installed, and a domain setup. Below you'll see references to $namespace -- this corresponds to your domain name.

Once you've setup your domain, create an app. Call it poorsmatic.

$ rhc app create -a poorsmatic -t jbossas-7

We're specifying the jbossas-7 OpenShift cartridge. That will create a sample Java application in the poorsmatic/ directory. But we don't want that. Instead, we'll use the Immutant Quickstart to add the Immutant modules to AS7 and replace the Java app with a Clojure app:

cd poorsmatic
rm -rf pom.xml src
git remote add quickstart -m master git://github.com/openshift-quickstart/immutant-quickstart.git
git pull --no-commit -s recursive -X theirs quickstart master
git add -A .
git commit -m "Add Immutant modules and setup Clojure project"

At this point, we could git push, and after a couple of minutes hit http://poorsmatic-$namespace.rhcloud.com to see a static welcome page. Instead, we'll configure our database and add the Poorsmatic source files before pushing.

Add the PostgreSQL cartridge

To add a PostgreSQL database to our app, we add a cartridge:

$ rhc cartridge add postgresql-8.4 -a poorsmatic

And boom, we have a database. We have to tweak it just a bit, though. So we're going to log into our app using the ssh URI from the output of the app create command (available via rhc app show -a poorsmatic or from the My Applications tab of the web UI). Here's the URI it gave me:

$ ssh a4117d5ebac04c5f8114f7a96eba2737@poorsmatic-jimi.rhcloud.com

Once logged in, we need to modify PostgreSQL's default configuration to enable distributed transactions, which Poorsmatic uses. We're going to set max_prepared_transactions to 10 and then restart the database:

$ perl -p -i -e 's/#(max_prepared_transactions).*/\1 = 10/' postgresql-8.4/data/postgresql.conf
$ pg_ctl restart -D $PWD/postgresql-8.4/data -m fast
$ exit

If you forget to do this, you'll see errors referencing max_prepared_transactions in the logs.

Add the Poorsmatic source to your app

We'll use git to pull in the Poorsmatic source code. You can use the same technique to get your own apps deployed to OpenShift:

$ git pull -s recursive -X theirs git://github.com/jcrossley3/poorsmatic.git korma-lobos

Note that we specified the korma-lobos branch.

Configure the app to use PostgreSQL

You'll see Leiningen profiles in project.clj that determine which database both the lobos and korma libraries will use. One of these profiles, :openshift, refers to the name of the PostgreSQL datasource configured in your .openshift/config/standalone.xml provided by the quickstart.

We'll activate the :openshift profile in deployments/your-clojure-application.clj:

{
 :root (System/getenv "OPENSHIFT_REPO_DIR")
 :context-path "/"
 :swank-port 24005
 :nrepl-port 27888

 :lein-profiles [:openshift]
}

Add your Twitter credentials

Finally, because Poorsmatic accesses Twitter's streaming API, you must create an account at http://dev.twitter.com and add a file called resources/twitter-creds that contains your OAuth credentials in a simple Clojure vector:

["app-key" "app-secret" "user-token" "user-token-secret"]

You may be concerned about storing sensitive information with your app, but remember that OpenShift secures your git repo with ssh public/private key pairs and only those people whose public keys you've associated with your app have access to it.

Push!

Now we can commit our changes and push:

$ git add -A .
$ git commit -m "Database config and twitter creds"
$ git push

And now we wait. The first push will take a few minutes. Immutant will be installed and started, your app deployed, the app's dependencies fetched, the database schema installed, etc. You should login to your app and view the logs while your app boots:

$ ssh a4117d5ebac04c5f8114f7a96eba2737@poorsmatic-jimi.rhcloud.com
$ tail_all

Eventually, you should see a log message saying Deployed "your-clojure-application.clj", at which point you can go to http://poorsmatic-$namespace.rhcloud.com, enter bieber and then watch your server.log fill up with more meaningless drivel than you ever dreamed possible.

And you may even see some bieber tweets. ;-)

Reload the web page to see the scraped URL's and their counts.

The REPL

You may have noticed the nREPL and Swank ports configured in the deployment descriptor above. They are not externally accessible. They can only be accessed via an ssh tunnel secured with your private key.

Run the following:

$ rhc port-forward -a poorsmatic

Depending on your OS, this may not work. If it doesn't, try the -L option:

$ ssh -L 27888:127.11.205.129:27888 a4117d5ebac04c5f8114f7a96eba2737@poorsmatic-jimi.rhcloud.com

But replace 127.11.205.129 with whatever rhc port-forward told you (or ssh to your instance and echo $OPENSHIFT_INTERNAL_IP). And obviously, you should use the ssh URI associated with your own app.

Once the tunnel is established, you can then connect to the remote REPL at 127.0.0.1:27888 using whatever REPL client you prefer.

Tune in next time...

Immutant's clustering capabilities yield some of its coolest features, e.g. load-balanced message distribution, highly-available services and scheduled jobs, etc. But clustering is a pain to configure when multicast is disabled. OpenShift aims to simplify that, but it's not quite there yet. In a future post, I hope to demonstrate those clustering features by creating a scaled OpenShift application, letting it deal with all the murky cluster configuration for you.

Stay tuned!

Creating some Tutorials

After our last release, Dan asked if we had a tutorial to help a newbie get started on Immutant.

Good question.

Well, we do have one, sort of, in the form of some blog posts written over the course of a few months in our News section. They're all helpfully tagged with getting-started, in reverse chronological order...

Well, no, I guess we don't have one. :(

So we decided to take some of those old blog posts, update them, and organize them into a suite of tutorials that are easy to navigate, and we promise to keep them in sync as the project evolves.

Unlike the blog posts, they're living documents, of course, and though each is focused on a particular aspect of Immutant, there will be some overlap between them and our manual and api docs, which we consider more of a reference.

Please don't hesitate to let us know if the tutorials make your initial experience with Immutant that much smoother. And we'd love to hear suggestions for additional ones.

Getting Started: Caching

This is the next tutorial in our getting started series: an exploration of Immutant's caching features. JBoss AS7 -- and therefore, Immutant -- comes with the Infinispan data grid baked right in, obviating the need to manage a separate caching service like Memcached for your applications.

Infinispan is a state-of-the-art, high-speed, low-latency, distributed data grid. It is capable of efficiently replicating key-value stores -- essentially souped-up ConcurrentMap implementations -- across a cluster. But it can also serve as a capable in-memory data cache, too: providing features such as write-through/write-behind persistence, multiple eviction policies, and transactions.

Clustering Modes

Infinispan caches adhere to a particular mode of operation. In a non-clustered, standalone Immutant, :local is the only supported mode. But when clustered, you have other options.

  • :local -- This is what you get in non-clustered mode, roughly equivalent to a hash map with write-through/write-behind persistence, JTA/XA support, MVCC (non-blocking, thread-safe reads even during concurrent writes), and JMX manageability.
  • :invalidated -- This is the default clustered mode. It doesn't actually share any data at all, so it's very "bandwidth friendly". Whenever data is changed in a cache, other caches in the cluster are notified that their copies are now stale and should be evicted from memory.
  • :replicated -- In this mode, entries added to any cache instance will be copied to all other cache instances in the cluster, and can then be retrieved locally from any instance. Though simple, it's impractical for clusters of any significant size (>10), and its capacity is equal to the amount of RAM in its smallest peer.
  • :distributed -- This mode is what enables Infinispan clusters to achieve "linear scalability". Cache entries are copied to a fixed number of cluster nodes (default is 2) regardless of the cluster size. Distribution uses a consistent hashing algorithm to determine which nodes will store a given entry.

immutant.cache/InfinispanCache

The first thing you must understand about Immutant's InfinispanCache is that it's mutable. This is sensible in a clustered environment, because the local process benefits from fast reads of data that may have been put there by a remote process. We effectively shift the responsibility of "sane data management", i.e. MVCC, from Clojure to Infinispan.

The second thing to know is that every Immutant cache has a cluster-scoped name and a mode. When you call immutant.cache/cache, the name is required, and it may refer to an existing cache that is already populated. The mode argument (one of :local, :invalidated, :replicated, or :distributed) is optional, defaulting to :invalidated if clustered and :local otherwise.

Because the cache implements many core Clojure interfaces, functions that typically return immutable copies will actually affect the cache contents:

UPDATE 3/22/2012: Due to feedback from our Clojure/West talk we no longer alter the Immutant caches through the core Clojure functions as shown below. See the latest docs for current info and examples.
  user> (def cache (immutant.cache/cache "test"))
  #'user/cache
  user> cache
  {}
  user> (assoc cache :a 1)
  {:a 1}
  user> (merge cache {:b 2, :c 3})
  {:c 3, :a 1, :b 2}
  user> (dissoc cache :c)
  {:a 1, :b 2}
  user> cache
  {:a 1, :b 2}
  user> (empty cache)
  {}
  user> cache
  {}

Further, the InfinispanCache supports a variety of put methods, some that expose the ConcurrentMap features of atomically storing entries based on their presence or absence. These all take lifespan options for time-to-live and max-idle expiration policies.

Memoization

Memoization is an optimization technique associating a cache of calculated values with a potentially expensive function, incurring the expense only once, with subsequent calls retrieving the result from the cache. The keys of the cache are the arguments passed to the function.

Standards for caching and memoization in Clojure are emerging in the form of core.cache and core.memoize, respectively. Because the InfinispanCache implements clojure.core.cache/CacheProtocol it can act as an underlying implementation for clojure.core.memoize/PluggableMemoization. Immutant includes a higher-order memo function for doing exactly that:

(immutant.cache/memo a-slow-function "a name" :distributed)

An Example

Let's ammend the example from our clustering tutorial to demonstrate a replicated cache. We'll create a simple web app with a single request to which we'll pass an integer. The request handler will pass that number to a very slow increment function: it'll sleep for that number of seconds before returning its increment. For us, this sleepy function represents a particularly time-consuming operation that will benefit from memoization.

Of course we'll need a project.clj

(defproject example "1.0.0-SNAPSHOT"
  :dependencies [[org.clojure/clojure "1.3.0"]])

Next, the Immutant application bootstrap file, immutant.clj, into which we'll put all our code for this example.

(ns example.init
  (:use [ring.util.response]
        [ring.middleware.params])
  (:require [immutant.cache :as cache]
            [immutant.web :as web]))

;; Our slow function
(defn slow-inc [t]
  (Thread/sleep (* t 1000))
  (inc t))

;; Our memoized version of the slow function
(def memoized-inc (cache/memo slow-inc "sleepy" :replicated))

;; Our Ring handler
(defn handler [{params :params}]
  (let [t (Integer. (get params "t" 1))]
    (response (str "value=" (memoized-inc t) "\n"))))

;; Start up our web app
(web/start "/" (wrap-params handler))

Make sure you have a recent version of Immutant:

$ lein immutant install

And cd to the directory containing the above two files and deploy your app:

$ lein immutant deploy

Now bring up your simulated cluster. In one shell, run:

$ lein immutant run --clustered -Djboss.node.name=one -Djboss.server.data.dir=/tmp/one

In another shell, run:

$ lein immutant run --clustered -Djboss.node.name=two -Djboss.server.data.dir=/tmp/two -Djboss.socket.binding.port-offset=100

You should have one server listening on port 8080 and another on 8180. So in yet another shell, run this:

$ curl "http://localhost:8080/example/?t=5"

With any luck, that should return 6 after about 5 seconds. Now run it again and it should return 6 immediately. Now for the moment of truth: change the port to 8180. That should return 6 immediately, too! Each unique value of t should only sleep t seconds the first time called on any peer in the cluster.

Here's another trick. Fire off a request with t=20 or so, and wait a few seconds, but before it completes hit the same server again with the same t value. You'll notice that the second request will not have to sleep for the full 20 seconds; it returns immediately after the first completes.

Caveats

Though tested and toyed with, this is seriously Alpha code, and the API is still coagulating, especially with respect to the options for the cache and memo functions, which should probably include :ttl and :idle themselves, for example. Other options may be introduced as more of Infinispan's features are exposed, e.g. transactions and persistence.

By the way, we've recently gotten a decent start on our user manual and api docs, so take a gander there for more details on caching or any of the other Immutant components. And remember to pester us in the usual ways if you have any questions.

Happy hacking!

Getting Started: Simulated Clustering

For this installment of our getting started series we'll experiment a bit with clustering, one of the primary benefits provided by the JBoss AS7 application server, upon which Immutant is built. AS7 features a brand new way of configuring and managing clusters called Domain Mode, but unfortunately its documentation is still evolving. If you insist, try this or possibly this.

We'll save Domain Mode with respect to Immutant for a future post. It's not required for clustering, but it is an option for easier cluster management. In this post, we'll reveal a trick to simulate a cluster on your development box so that you can experiment with Immutant clustering features, which we should probably enumerate now:

  • Automatic load balancing and failover of message consumers
  • HTTP session replication
  • Fine-grained, dynamic web-app configuration and control via mod_cluster
  • Efficiently-replicated distributed caching via Infinispan
  • Singleton scheduled jobs
  • Automatic failover of singleton daemons

Running an Immutant

As you know, installing Immutant is simple:

$ lein plugin install lein-immutant 0.4.1
$ lein immutant install

And running an Immutant is, too:

$ lein immutant run

By passing the --clustered option, you configure the Immutant as a node that will discover other nodes (via multicast, by default) to form a cluster:

$ lein immutant run --clustered

From the first line of its output, you can see what that command is really running:

$ $JBOSS_HOME/bin/standalone.sh --server-config=standalone-ha.xml

Any options passed to lein immutant run are forwarded to standalone.sh, so run the following to see what those are:

$ lein immutant run --help

Simulating a Cluster

TL;DR

To run two immutant instances on a single machine, fire up two shells and...

In one shell, run:

$ lein immutant run --clustered -Djboss.node.name=one -Djboss.server.data.dir=/tmp/one

In another shell, run:

$ lein immutant run --clustered -Djboss.node.name=two -Djboss.server.data.dir=/tmp/two -Djboss.socket.binding.port-offset=100

Boom, you're a cluster!

Details

Each cluster node requires a unique name, which is usually derived from the hostname, but since our Immutants are on the same host, we set the jboss.node.name property uniquely.

Each Immutant will attempt to persist its runtime state to the same files. Hijinks will ensue. We prevent said hijinks by setting the jboss.server.data.dir property uniquely.

JBoss listens for various types of connections on a few ports. One obvious solution to the potential conflicts is to bind each Immutant to a different interface, which we could specify using the -b option.

But rather than go through a platform-specific example of creating an IP alias, I'm going to take advantage of another JBoss feature: the jboss.socket.binding.port-offset property will cause each default port number to be incremented by a specified amount.

So for the second Immutant, I set the offset to 100, resulting in its HTTP service, for example, listening on 8180 instead of the default 8080, on which the first Immutant is listening.

Deploy an Application

With any luck at all, you have two Immutants running locally, both hungry for an app to deploy, so let's create one.

We've been over how to deploy an application before, but this time we're gonna keep it real simple: create a new directory and add two files.

First, you'll need a project.clj

(defproject example "1.0.0-SNAPSHOT"
  :dependencies [[org.clojure/clojure "1.3.0"]])

Next, the Immutant application bootstrap file, immutant.clj, into which we'll put all our code for this example.

(ns example.init
  (:require [immutant.messaging :as messaging]
            [immutant.daemons :as daemon])

;; Create a message queue
(messaging/start "/queue/msg")
;; Define a consumer for our queue
(def listener (messaging/listen "/queue/msg" #(println "received:" %)))

;; Controls the state of our daemon
(def done (atom false))

;; Our daemon's start function
(defn start []
  (reset! done false)
  (loop [i 0]
    (Thread/sleep 1000)
    (when-not @done
      (println "sending:" i)
      (messaging/publish "/queue/msg" i)
      (recur (inc i)))))

;; Our daemon's stop function
(defn stop []
  (reset! done true))

;; Register the daemon
(daemon/start "counter" start stop :singleton true)

We've defined a message queue, a message listener, and a daemon service that, once started, publishes messages to the queue every second.

Daemons require a name (for referencing as a JMX MBean), a start function to be invoked asynchronously, and a stop function that will be automatically invoked when your app is undeployed, allowing you to cleanly teardown any resources used by your service. Optionally, you can declare the service to be a singleton. This means it will only be started on one node in your cluster, and should that node crash, it will be automatically started on another node, essentially giving you a robust, highly-available service.

In the same directory that contains your files, run this:

$ lein immutant deploy

Because both Immutants are monitoring the same deployment directory, this should trigger both to deploy the app.

Now watch the output of the shells in which your Immutants are running. You should see the daemon start up on only one of them, but both should be receiving messages. This is the automatic load balancing of message consumers.

Now kill the Immutant running the daemon. Watch the other one to see that the daemon will start there within seconds. There's your automatic failover. Restart the killed Immutant to see him start to receive messages again. It's fun, right? :)

Whew!

So that's probably enough to show for now. Give it a try, and let us know if it worked for you the very first time. If it doesn't, please reach out to us in the usual ways and we'll be happy to get you going. Above all, have fun!

Getting Started: Scheduling Jobs

Note: this article is out of date. For more recent instructions on using scheduled jobs, see the tutorial.

This article covers job schedulding in Immutant, and is part of our getting started series of tutorials.

Jobs in Immutant are simply functions that execute on a recurring schedule. They fire asynchronously, outside of the thread where they are defined, and fire in the same runtime as the rest of the application, so have access to any shared state.

Jobs are built on top of the Quartz library, and support scheduling via a cron-like specification.

Why would I use this over quartz-clj or calling Quartz directly?

I'm glad you asked! There are several reasons:

  • Immutant abstracts away the complexity of Quartz's internals, so you don't have to worry about managing Schedulers and creating JobDetails, and provides enough functionality for a majority of use cases. For cases where you need advanced scheduling functionality, you can still use quartz-clj or the Quartz classes directly.
  • If you are using Immutant in a cluster, jobs that should fire only once per cluster (aka 'singleton jobs') are handled automatically (see below).
  • When your application is undeployed, your jobs are automatically unscheduled. Note that if you use quartz-clj or Quartz directly from your application, you'll need to clean up after yourself so you don't leave jobs lingering around since Immutant can't automatically unschedule them for you.

Scheduling Jobs

Scheduling a job is as simple as calling the schedule function from the immutant.jobs namespace:

(require '[immutant.jobs :as jobs])
(jobs/schedule "my-job-name" "*/5 * * * * ?" 
                #(println "I was called!"))

The schedule function requires three arguments:

  • name - the name of the job.
  • spec - the cron-style specification string (see below).
  • f - the zero argument function that will be invoked each time the job fires.

Job scheduling is dynamic, and can occur anywhere in your application code. Jobs that share the lifecycle of your application are idiomatically placed in immutant.clj.

You can safely call schedule multiple times with the same job name - the named job will rescheduled.

Cron Sytanx

The spec attribute should contain a crontab-like entry. This is similar to cron specifications used by Vixie cron, anacron and friends, but includes an additional field for specifying seconds. It is composed of 7 fields (6 are required):

SecondsMinutesHoursDay of MonthMonthDay of WeekYear
0-590-590-231-311-12 or JAN-DEC1-7 or SUN-SAT1970-2099 (optional)

For several fields, you may denote subdivision by using the forward-slash (/) character. To execute a job every 5 minutes, */5 in the minutes field would specify this condition.

Spans may be indicated using the dash (-) character. To execute a job Monday through Friday, MON-FRI should be used in the day-of-week field.

Multiple values may be separated using the comma (,) character. The specification of 1,15 in the day-of-month field would result in the job firing on the 1st and 15th of each month.

Either day-of-month or day-of-week must be specified using the ? character, since specifying both is contradictory.

See the Quartz cron specification for additional details.

Unscheduling Jobs

Jobs can be unscheduled via the unschedule function:

(require '[immutant.jobs :as jobs])
    
(jobs/unschedule "my-job-name")

The unschedule function requires one argument:

  • name - the name of a previously scheduled job.

If the given name resolves to an existing job, that job will be unscheduled and the call will return true, otherwise nil is returned.

Jobs are automatically unscheduled when your application is undeployed.

Clustering

When using Immutant in a cluster, you'll need to mark any jobs that should only be scheduled once for the entire cluster with the :singleton option:

(require '[immutant.jobs :as jobs])
(jobs/schedule "my-job-name" "*/5 * * * * ?" 
                #(println "I only fire on one node")
                :singleton true)

If :singleton is true, the job will be scheduled to run on only one node in the cluster at a time. If that node goes down, the job will automatically be scheduled on another node, giving you failover. If :singleton is false or not provided, the job will be scheduled to run on all nodes where the schedule call is executed.

Look for a future post in our Getting Started series on using Immutant in a cluster.

The Future

Currently, jobs can only be scheduled using CronTrigger functionality. We plan to add support for SimpleTrigger functionality at some point in the future, allowing you to do something similar to:

(require '[immutant.jobs :as jobs])
(jobs/schedule "my-at-job" (jobs/every "3s" :times 5)
                #(println "I fire 5 times, every 3 seconds"))

Since Immutant is still in a pre-alpha state, none of what I said above is set in stone. If anything does change, We'll update this post to keep it accurate.

If you have any feedback or questions, get in touch!