Using Transit with Immutant 2

Out of the box, Immutant 2 has support for several data serialization strategies for use with messaging and caching, namely: EDN, Fressian, JSON, and none (which falls back to Java serialization). But what if you want to use another strategy? Luckily, this isn't a closed set - Immutant allows us to add new strategies. We took advantage of that and have created a separate project that brings Transit support to Immutant - immutant-transit.

What is Transit?

From the Transit format page:

Transit is a format and set of libraries for conveying values between applications written in different programming languages.

It's similar in purpose to EDN, but leverages the speed of the optimized JSON readers that most platforms provide.

What does immutant-transit offer over using Transit directly?

immutant-transit provides an Immutant codec for Transit that allows for transparent encoding and decoding of Transit data when using Immutant's messaging and caching functionality. Without it, you would need to set up the encode/decode logic yourself.

Usage

Note: immutant-transit won't work with Immutant 2.0.0-alpha1 - you'll need to use an incremental build (#298 or newer).

First, we need to add org.immutant/immutant-transit to our application's dependencies:

  :dependencies [[org.clojure/clojure "1.6.0"]
                 [org.immutant/immutant "2.x.incremental.298"]
                 [org.immutant/immutant-transit "0.2.2"]]

If you don't have com.cognitect/transit-clj in your dependencies, immutant-transit will transitively bring in version 0.8.259. We've tested against 0.8.255 and 0.8.259, so if you're running another version and are seeing issues, let us know.

Now, we need to register the Transit codec with Immutant:

  (ns your.app
    (:require [immutant.codecs.transit :as it]))

  (it/register-transit-codec)

This will register a vanilla JSON Transit codec that encodes to a byte[] under the name :transit with the content-type application/transit+json (Immutant uses the content-type to identify the encoding for messages sent via HornetQ).

To use the codec, provide it as the :encoding option wherever an encoding is used:

  (immutant.messaging/publish some-queue {:a :message} :encoding :transit)

  (def transit-cache (immutant.caching/with-codec some-cache :transit))
  (immutant.caching/compare-and-swap! transit-cache a-key a-function)

If you need to change the underlying format that Transit uses, or need to provide custom read/write handlers, you can pass them as options to register-transit-codec:

  (it/register-transit-codec
    :type :json-verbose
    :read-handlers my-read-handlers
    :write-handlers my-write-handlers)

The content-type will automatically be generated based on the :type, and will be of the form application/transit+<:type>.

You can also override the name and content-type:

  (it/register-transit-codec
    :name :transit-with-my-handlers
    :content-type "application/transit+json+my-stuff"
    :read-handlers my-read-handlers
    :write-handlers my-write-handlers)

For more examples, see the example project.

Why is this a separate project from Immutant?

Transit's format and implementation are young, and are still in flux. We're currently developing this as a separate project so we can make releases independent of Immutant proper that track changes to Transit. Once Transit matures a bit, we'll likely roll this in to Immutant itself.

If you are interested in adding a codec of your own, take a look at the immutant-transit source and at the immutant.codecs namespace to see how it's done.

Get In Touch

If you have any questions, issues, or other feedback about mmutant-transit, you can always find us on #immutant on freenode or our mailing lists.

Clustering Immutants on OpenShift

Lately I've been spending a lot of time on OpenShift, building and testing a cartridge for Immutant that will properly form a cluster across multiple OpenShift gears. In this post, I'll go through the steps of deploying a simple application that demonstrates all of the Immutant clustering features running on the three small gears you get for free on OpenShift.

Here are the features we'll be demonstrating:

  • Load-balanced message distribution with automatic peer discovery
  • Replicated caching
  • Highly-Available, long-running daemons
  • HA scheduled jobs
  • Web load balancing and session replication

If you haven't already, go set up an OpenShift account and update your rhc gem to the latest version. I used 1.12.4 for this article. Below you'll see references to $namespace -- this corresponds to your OpenShift domain name, set by running rhc setup.

Note: If this is the first time you've used OpenShift, you'll need to visit the console and accept the usage agreement before running the rhc command.

Create a scaled OpenShift app

The Immutant cartridge is available here: https://github.com/immutant/openshift-immutant-cart. As described in its README, we create our app using the following command:

rhc app-create -s demo https://raw.github.com/immutant/openshift-immutant-cart/master/metadata/manifest.yml

We're calling our app demo and we're passing the -s option to make our app scalable. Notice that we're passing a raw URL to the cartridge's manifest.yml.

Small gears are pretty slow, but when app-create finally completes, you'll have a bare-bones, standard Leiningen application beneath the demo/ directory. At this point, you might tail your app's logs or ssh into your gear:

rhc tail demo
rhc ssh demo

The critical log file for Immutant on OpenShift is immutant/logs/server.log. Monitor this file until you eventually see the line, Deployed "your-clojure-application.clj". Then point a browser at http://demo-$namespace.rhcloud.com to see a simple welcome page.

Now we'll put some meat on our bare-bones app!

Push Me, Pull You

Typically, you will add the remote git repository for your real application to the local OpenShift repository you just created. We're going to use https://github.com/immutant/cluster-demo as our "real" application.

git remote add upstream -m master git@github.com:immutant/cluster-demo.git

Deployment of your app to OpenShift amounts to pulling from your real repository and pushing to OpenShift's.

git pull -s recursive -X theirs upstream master
git push

While waiting for that to complete, run rhc tail demo in another shell to monitor your log. This time, the Deployed "your-clojure-application.clj" message is going to scroll off the screen as the cluster-demo app starts logging its output. Eventually, the app should settle into a steady state looking something like this:

The cluster-demo app

If you can ignore the inconsistent thread identifiers in the above output, you'll notice there are exactly four types of messages: send, recv, web, and job. Noting the timestamps in the left column, a send is logged every 5 seconds, as is its corresponding recv, a web logged every 2 seconds, and a job every 20 seconds.

The cluster-demo app is comprised of the following:

  • A message queue named /queue/msg
  • A distributed cache named counters
  • A listener for the queue that prints the received message and the current contents of the cache
  • An HA daemon named counter that queues a cached value and increments it every 5 seconds
  • An HA scheduled job named ajob that increments another counter in the cache every 20 seconds
  • A web request handler mounted at / that logs its :path-info and returns the current values of the two cached counters
  • Another request handler mounted at /count that increments a counter in the user's web session.

All the code (~60 lines) is contained in a single file.

Programming is hard, let's build a cluster!

Now we're ready to form a cluster by adding a gear to our app:

rhc scale-cartridge immutant -a demo 2

Again, this will take a few minutes, and it may return an error even though the operation actually succeeded. You can run the following to see the definitive state of your gears:

rhc show-app --gears

This also gives you the SSH URLs for your two gears. Fire up two shells and ssh into each of your gears using those SSH URLs. Then tail the log on each:

tail -f immutant/logs/server.log

When the dust settles, you'll eventually see the gears discover each other, and you should see both gears logging recv messages, one getting the even numbers and one getting the odd. This is your automatic load-balanced message distribution.

Note also that the counters cache logged in the recv message is correct on both gears, even though it's only being updated by one. This is our cache replication at work.

Let's break stuff!

And see how robust our cluster is.

High Availability Daemons and Jobs

Of course, the send and job log entries should still only appear on our original gear, because those are our HA singletons. If that gear crashes, our daemon and job should migrate to the other gear. While logged into the gear running your singletons, run this:

immutant/bin/control stop

And watch the other gear's log to verify the daemon and job pick up right where they left off, fetching their counters from the replicated cache. That gear should be consuming all the queued messages, too. Now start the original gear back up:

immutant/bin/control start

Eventually, it'll start receiving half the messages again.

Web

You may be wondering about those web entries showing up in both logs. They are "health check" requests from the HAProxy web load balancer, automatically installed on your primary gear. You can always check the state of your cluster from HAProxy's perspective by visiting http://demo-$namespace.rhcloud.com/haproxy-status. If you see that page without intending to, it means something about your app is broken, so check immutant/logs/server.log for errors and make sure your app responds to a request for the root context, i.e. "/".

Let's try some web stuff. Use curl to hit your app while observing the logs on both gears:

curl http://demo-$namespace.rhcloud.com/xxxxxxxxxxxxxxxxxxxx
curl http://demo-$namespace.rhcloud.com/yyyyyyyyyyyyyyyyyyyy
curl http://demo-$namespace.rhcloud.com/zzzzzzzzzzzzzzzzzzzz

Use an obnoxious path to distinguish your request from the health checks. Repeat the command a few times to observe the gears taking turns responding to your request. Now try it in a browser, and you'll see the same gear handling your request every time you reload. This is because HAProxy is setting cookies in the response to enable session affinity, which your browser is probably sending back. And curl didn't.

Speaking of session affinity, let's break that while we're at it, by invoking our other web handler, the one that increments a counter in the user's web session: http://demo-$namespace.rhcloud.com/count

You should see the counter increment each time you reload your browser. (You'll need to give curl a cookie store to see it respond with anything other than "1 times")

Pay attention to which gear is responding to the /count request. Now stop that gear like you did before. When you reload your browser, you should see the other gear return the expected value. This is the automatic session replication provided by immutant.web.session/servlet-store.

Don't forget to restart that gear.

The Hat Trick

Hey, OpenShift is giving us 3 free gears, we may as well use 'em all, right?

rhc scale-cartridge immutant -a demo 3

When the third one finally comes up, there are a couple of things you may notice:

  • The health checks will disappear from the primary gear as HAProxy takes it out of the rotation when 2 or more other gears are available, ostensibly to mitigate the observer effect of the health checks.
  • Each cache key will only show up in the recv log messages on 2 of the 3 gears. This is because Immutant caches default to Infinispan's :distributed replication mode in a cluster. This enables Infinispan clusters to achieve "linear scalability" as entries are copied to a fixed number of cluster nodes (default 2) regardless of the cluster size. Distribution uses a consistent hashing algorithm to determine which nodes will store a given entry.

Now what?

Well, that was a lot to cover. I doubt many apps will use all these features, but I think it's nice to have a free playground on which to try them out, even with the resources as constrained as they are on a small gear.

Regardless, I'm pretty happy that Immutant is finally feature-complete on OpenShift now. :-)

Of course, I had a lot of help getting things to this point. Many folks on the OpenShift and JBoss teams were generous with their expertise, but the "three B's" deserve special mention: Ben, Bela, and Bill.

Thanks!

Overlay Screencast

I put together a quick screencast showing how to overlay the latest incremental releases of both Immutant and TorqueBox into a single app server capable of deploying both Clojure and Ruby apps.

Further, I show how those Clojure and Ruby apps can seamlessly exchange messages and share caches using the respective messaging and caching libraries from both Immutant and TorqueBox.

Enjoy!

Introducing Distributed XA Transaction Support

We're as happy as a bear in a koi pond to announce support for Distributed (XA) Transactions in Immutant.

Messaging and Caching resources in Immutant are now automatically transactional and XA capable. And we make it easy for you to create DataSources for your XA compliant SQL databases so that you can then define transactions incorporating all three types of resources in your Clojure applications.

Some Background

X/Open XA is a standard specification for allowing multiple, independent resources to participate in a single, distributed transaction using a two-phase commit (2PC) protocol.

Say your application stores data in more than one place, perhaps an Oracle database and a Postgres database. When a function in your application writes data to those two databases, XA can ensure that it doesn't leave your data in an inconsistent state when the Oracle database fails. ;-)

To accomplish this, the commit and rollback methods are invoked not on any single resource like a JDBC or JMS connection but on a TransactionManager instead. Its job is to coordinate the commit or rollback of each resource involved in a particular transaction.

Defining an XA Transaction

Let's start with an example:

  (ns xa.example
    (:require [immutant.xa :as xa]
              [immutant.cache :as cache]
              [immutant.messaging :as msg]
              [clojure.java.jdbc :as sql]))
  
  (defn do-three-things []
    (xa/transaction
     (sql/with-connection {:datasource my-ds}
       (sql/insert-records :things {:name "foo"}))
     (cache/put my-cache :a 1)
     (msg/publish "/queue/test" "success!")))

The do-three-things function will insert a record into a SQL database, write an entry to a cache and publish a message to a queue, all within a single transaction. When it completes, either all of those things will have happened or none will, depending on whether an exception is tossed from the body passed to xa/transaction.

By the way, don't let that {:datasource my-ds} spec throw you just yet. We're going to discuss that in a minute.

So the xa/transaction macro starts a transaction, executes its body, and then commits the transaction unless an exception is caught, in which case the transaction is rolled back.

Transaction Scope

I lied a little. The xa/transaction macro is really just an alias for the required macro in the immutant.xa.transaction namespace, which is one of six macros matching the transaction attributes for JEE Container-Managed Persistence: required, requires-new, not-supported, supports, mandatory, and never. According to that spec, required is the default, so we alias it in the main immutant.xa namespace.

These macros allow you to control the scope of your transactions when your functions call each other. For example,

  (ns xa.example ...)
  (defn foo []
    (xa/transaction
     (do-three-things)))

Here, we have one function, foo, defining a transaction that calls another function, do-three-things that, as you recall from above, seemingly defines another transaction. Or does it? In fact, the required macro won't start a new transaction if there's already one associated with the current thread. It'll simply include its body in the transaction started by the caller. If we really wanted a new transaction, we'd call requires-new inside do-three-things.

Here's another example:

  (ns xa.example
    (:require [immutant.xa.transaction :as tx]))
  
  (tx/required           ; begin tx #1
   (one)
   (tx/not-supported     ; suspend tx #1
    (two))
   (tx/requires-new      ; begin tx #2
    (three))             ; commit tx #2
   (throw (Exception.))) ; rollback tx #1

Here we have a function, one, running within a transaction that is suspended prior to calling the function two, that runs completely outside of any transaction, after which a second transaction is started before calling the function, three.

We then toss an exception (we could've also called tx/set-rollback-only) that causes everything we did in one to rollback. The exception does not affect what we did in two or three, however.

Incidentally, any exception tossed in two or three would also rollback the actions of one since all the macros re-throw whatever they catch.

Creating an XA DataSource

So now we'll discuss that {:datasource my-ds} spec from the first example.

To include your database in a distributed transaction, you need to create an XA DataSource for it. Do this using the immutant.xa/datasource function. It will expect the appropriate JDBC driver for your database to be available in the classpath, so you'll need to add one of the following to your Leiningen project.clj:

  (defproject foo "1.0.0-SNAPSHOT"
    :dependencies [[com.h2database/h2 "1.3.160"]              ; H2
                   [org.clojars.gukjoon/ojdbc "1.4"]          ; Oracle
                   [org.clojars.kjw/mysql-connector "5.1.11"] ; MySQL
                   [postgresql "9.0-801.jdbc4"]               ; Postgres
                   [net.sourceforge.jtds/jtds "1.2.4"]        ; MS SQL Server
                   [java.jdbc "0.2.2"]])

The comments on the right indicate the database types we currently support, and the versions above have been successfully tested on Immutant.

With the driver available, all that's left is to create the DataSource. Here are some examples from our integration tests:

  (defonce my-ds (xa/datasource "h2" {:adapter "h2" :database "mem:foo"}))
  (defonce my-ds (xa/datasource "oracle" 
                                {:adapter "oracle"
                                 :host "oracle.cpct4icp7nye.us-east-1.rds.amazonaws.com"
                                 :username "myuser"
                                 :password "mypassword"
                                 :database "mydb"}))
  (defonce my-ds (xa/datasource "mysql" 
                                {:adapter "mysql"
                                 :host "mysql.cpct4icp7nye.us-east-1.rds.amazonaws.com"
                                 :username "myuser"
                                 :password "mypassword"
                                 :database "mydb"}))
  (defonce my-ds (xa/datasource "postgres" 
                                {:adapter "postgresql"
                                 :username "myuser"
                                 :password "mypassword"
                                 :database "mydb"}))
  (defonce my-ds (xa/datasource "mssql" 
                                {:adapter "mssql"
                                 :host "mssql.cpct4icp7nye.us-east-1.rds.amazonaws.com"
                                 :username "myuser"
                                 :password "mypassword"
                                 :database "mydb"}))

To use one of those in a clojure.java.jdbc connection spec, you should associate it with the :datasource key, like so:

  (jdbc/with-connection {:datasource my-ds}
    (jdbc/create-table :things [:name "varchar(50)"]))

This should of course work with any Clojure SQL library built on clojure.java.jdbc, e.g. Korma, ClojureQL, Lobos, etc.

See the manual for more details.

Conclusion

XA is not for every application. It's useful when you have multiple JDBC backends or you need to synchronize your JDBC and JMS (HornetQ messaging) calls. Transactional data-grids (Infinispan caching) are also often handy, so we feel good about making all of these resources automatically transactional in Immutant, not to mention providing clean Clojure interfaces for them.

Distributed transactions are available now in our latest incremental builds and will of course be included in our upcoming 0.2.0 release of Immutant expected in the next week or so. As always, feel free to find us in the normal channels if you have any questions.

Getting Started: Caching

This is the next tutorial in our getting started series: an exploration of Immutant's caching features. JBoss AS7 -- and therefore, Immutant -- comes with the Infinispan data grid baked right in, obviating the need to manage a separate caching service like Memcached for your applications.

Infinispan is a state-of-the-art, high-speed, low-latency, distributed data grid. It is capable of efficiently replicating key-value stores -- essentially souped-up ConcurrentMap implementations -- across a cluster. But it can also serve as a capable in-memory data cache, too: providing features such as write-through/write-behind persistence, multiple eviction policies, and transactions.

Clustering Modes

Infinispan caches adhere to a particular mode of operation. In a non-clustered, standalone Immutant, :local is the only supported mode. But when clustered, you have other options.

  • :local -- This is what you get in non-clustered mode, roughly equivalent to a hash map with write-through/write-behind persistence, JTA/XA support, MVCC (non-blocking, thread-safe reads even during concurrent writes), and JMX manageability.
  • :invalidated -- This is the default clustered mode. It doesn't actually share any data at all, so it's very "bandwidth friendly". Whenever data is changed in a cache, other caches in the cluster are notified that their copies are now stale and should be evicted from memory.
  • :replicated -- In this mode, entries added to any cache instance will be copied to all other cache instances in the cluster, and can then be retrieved locally from any instance. Though simple, it's impractical for clusters of any significant size (>10), and its capacity is equal to the amount of RAM in its smallest peer.
  • :distributed -- This mode is what enables Infinispan clusters to achieve "linear scalability". Cache entries are copied to a fixed number of cluster nodes (default is 2) regardless of the cluster size. Distribution uses a consistent hashing algorithm to determine which nodes will store a given entry.

immutant.cache/InfinispanCache

The first thing you must understand about Immutant's InfinispanCache is that it's mutable. This is sensible in a clustered environment, because the local process benefits from fast reads of data that may have been put there by a remote process. We effectively shift the responsibility of "sane data management", i.e. MVCC, from Clojure to Infinispan.

The second thing to know is that every Immutant cache has a cluster-scoped name and a mode. When you call immutant.cache/cache, the name is required, and it may refer to an existing cache that is already populated. The mode argument (one of :local, :invalidated, :replicated, or :distributed) is optional, defaulting to :invalidated if clustered and :local otherwise.

Because the cache implements many core Clojure interfaces, functions that typically return immutable copies will actually affect the cache contents:

UPDATE 3/22/2012: Due to feedback from our Clojure/West talk we no longer alter the Immutant caches through the core Clojure functions as shown below. See the latest docs for current info and examples.
  user> (def cache (immutant.cache/cache "test"))
  #'user/cache
  user> cache
  {}
  user> (assoc cache :a 1)
  {:a 1}
  user> (merge cache {:b 2, :c 3})
  {:c 3, :a 1, :b 2}
  user> (dissoc cache :c)
  {:a 1, :b 2}
  user> cache
  {:a 1, :b 2}
  user> (empty cache)
  {}
  user> cache
  {}

Further, the InfinispanCache supports a variety of put methods, some that expose the ConcurrentMap features of atomically storing entries based on their presence or absence. These all take lifespan options for time-to-live and max-idle expiration policies.

Memoization

Memoization is an optimization technique associating a cache of calculated values with a potentially expensive function, incurring the expense only once, with subsequent calls retrieving the result from the cache. The keys of the cache are the arguments passed to the function.

Standards for caching and memoization in Clojure are emerging in the form of core.cache and core.memoize, respectively. Because the InfinispanCache implements clojure.core.cache/CacheProtocol it can act as an underlying implementation for clojure.core.memoize/PluggableMemoization. Immutant includes a higher-order memo function for doing exactly that:

(immutant.cache/memo a-slow-function "a name" :distributed)

An Example

Let's ammend the example from our clustering tutorial to demonstrate a replicated cache. We'll create a simple web app with a single request to which we'll pass an integer. The request handler will pass that number to a very slow increment function: it'll sleep for that number of seconds before returning its increment. For us, this sleepy function represents a particularly time-consuming operation that will benefit from memoization.

Of course we'll need a project.clj

(defproject example "1.0.0-SNAPSHOT"
  :dependencies [[org.clojure/clojure "1.3.0"]])

Next, the Immutant application bootstrap file, immutant.clj, into which we'll put all our code for this example.

(ns example.init
  (:use [ring.util.response]
        [ring.middleware.params])
  (:require [immutant.cache :as cache]
            [immutant.web :as web]))

;; Our slow function
(defn slow-inc [t]
  (Thread/sleep (* t 1000))
  (inc t))

;; Our memoized version of the slow function
(def memoized-inc (cache/memo slow-inc "sleepy" :replicated))

;; Our Ring handler
(defn handler [{params :params}]
  (let [t (Integer. (get params "t" 1))]
    (response (str "value=" (memoized-inc t) "\n"))))

;; Start up our web app
(web/start "/" (wrap-params handler))

Make sure you have a recent version of Immutant:

$ lein immutant install

And cd to the directory containing the above two files and deploy your app:

$ lein immutant deploy

Now bring up your simulated cluster. In one shell, run:

$ lein immutant run --clustered -Djboss.node.name=one -Djboss.server.data.dir=/tmp/one

In another shell, run:

$ lein immutant run --clustered -Djboss.node.name=two -Djboss.server.data.dir=/tmp/two -Djboss.socket.binding.port-offset=100

You should have one server listening on port 8080 and another on 8180. So in yet another shell, run this:

$ curl "http://localhost:8080/example/?t=5"

With any luck, that should return 6 after about 5 seconds. Now run it again and it should return 6 immediately. Now for the moment of truth: change the port to 8180. That should return 6 immediately, too! Each unique value of t should only sleep t seconds the first time called on any peer in the cluster.

Here's another trick. Fire off a request with t=20 or so, and wait a few seconds, but before it completes hit the same server again with the same t value. You'll notice that the second request will not have to sleep for the full 20 seconds; it returns immediately after the first completes.

Caveats

Though tested and toyed with, this is seriously Alpha code, and the API is still coagulating, especially with respect to the options for the cache and memo functions, which should probably include :ttl and :idle themselves, for example. Other options may be introduced as more of Infinispan's features are exposed, e.g. transactions and persistence.

By the way, we've recently gotten a decent start on our user manual and api docs, so take a gander there for more details on caching or any of the other Immutant components. And remember to pester us in the usual ways if you have any questions.

Happy hacking!