Immutant as a Datomic Storage Service

In this post, I'm going to show you how to configure an Immutant cluster as a Datomic storage service and test it with an example application.

Datomic prescribes a unique architecture in which database concepts -- transactions, queries, and storage -- are separated. It treats external storage engines much the same way a traditional database treats a file system. And one of its supported engines is Infinispan, which just so happens to be included in Immutant!

But there's a catch. Immutant only provides in-process access to Infinispan. And the Datomic transactor is a standalone application meant to run in its own process, so it expects to access Infinispan via a HotRod server, which is not provided by Immutant...

...until now!

As of version 1.1.0 of the lein-immutant plugin, it's possible to overlay the HotRod modules and configuration onto an installation of a recent Immutant incremental build.

I need to stress incremental build here. This effort exposed a bug in Infinispan 5.3.0, the version included in the official Immutant 1.0.x releases, that prevented the durability of any data written via HotRod. So we upgraded to Infinispan 6.0, but that's currently only available in our incremental builds.

Installing an incremental build is easy. Just pass LATEST (or a specific build number) for the version:

$ lein immutant install LATEST

Why?

Before we get started, we should have reasons for doing so.

If you already have some idea of the benefits of either Datomic or Immutant or Infinispan, the benefits of combining them may be apparent, but let's be specific:

  • Simpler deployment There is no external storage service process to manage: it's contained within Immutant.
  • Simpler configuration Because your Datomic peers are colocated with their storage service, the connection URI is always the same on any node in the cluster: datomic:inf://localhost:11222/<DB_NAME>
  • Robust high availability As we'll see below, once a transactor connects to a HotRod peer, it becomes "topology-aware" and remains connected as long as there is at least one node in the cluster, whether the original peer crashes or not.
  • Linear scalability Infinispan's distributed replication limits copies of writes to a fixed number of nodes (default 2) regardless of cluster size, so database capacity is increased by increasing the size of your cluster. A consistent hashing algorithm is used to determine which nodes will store a given entry, and the HotRod client is aware of the hash distribution, enabling it to use the most appropriate remote server for each operation.

HotRod

Using version 1.1.0 (or higher) of the lein-immutant plugin, here's how to add HotRod to your Immutant:

$ lein immutant overlay hotrod

That's it! The next time you run Immutant, you'll have a HotRod service awaiting connections on port 11222 with a cache named datomic configured for synchronous, distributed replication, persisting its contents to $IMMUTANT_HOME/jboss/standalone/data/infinispan/polyglot/datomic.dat

Datomic

Datomic comes in two flavors: Free and Pro. Infinispan storage support is only included in Datomic Pro, so you'll need to obtain a license. BTW, much thanks to Stuart Halloway for hooking me up with one to get this stuff working.

UPDATE 11/8/2013: Datomic recently announced a Datomic Pro Starter Edition featuring support for all storages, including Infinispan, and a perpetual license.

I've been testing successfully with version 0.8.4218, available here. Once you download and unzip it, cd to its directory and install the peer library to your local Maven repo:

$ bin/maven-install

This isn't required for Immutant itself, but we'll need it for our example app described below. Finally, install your license key. Use the provided sample template for Infinispan:

$ cp config/samples/inf-transactor-template.properties config/immutant.properties

And update the license-key property in config/immutant.properties. That's it! You're ready to run!

In one shell, fire up Immutant:

$ lein immutant run

And in another shell, fire up the Datomic transactor:

$ bin/transactor config/immutant.properties

If you don't see any error stack traces in either shell, yay!

Counting on ACID

To demonstrate some features, we need an example app. Here's one: https://github.com/jcrossley3/datomic-counter:

$ git clone git@github.com:jcrossley3/datomic-counter.git
$ cd datomic-counter

As its name implies, it counts. But it does this using a pretty cool Datomic feature: transaction functions.

Before we deploy it, let's highlight a few things. First, notice that we added dependencies in project.clj for our storage service and the Datomic peer library (that we installed above):

  :dependencies [[org.clojure/clojure "1.5.1"]
                 [org.infinispan/infinispan-client-hotrod "6.0.0.CR1"]
                 [com.datomic/datomic-pro "0.8.4218"]]
  :immutant {:init counter.core/start}

And we configured an :init function to be called when the app deploys: start from the counter.core namespace. It has two duties: 1) initialize our Datomic database and b) schedule a job to increment a counter every couple of seconds.

(defn start
  "Initialize the database and schedule the counter"
  []
  (try
    (db/init)
    (job/schedule "counter" job, :every [2 :seconds], :singleton false)
    (catch Exception e
      (println "Check the transactor, retrying in 10 seconds")
      (future (Thread/sleep 10000) (start)))))

We log and increment the counter in the counter.core/job function. Note the job sets :singleton false so it'll run on all nodes in a cluster, introducing the potential for race conditions as multiple processes attempt to increment a shared counter. We'll deal with those using a transaction function.

We naively assume any exception is due to the transactor not being around, so we log a warning, wait a bit, and retry. We do that in a separate thread so as not to lock up the Immutant deployer. Now let's take a closer look at the init function from counter.db:

(defn init
  "Create the database, load the schema, initialize counter, and
  define transaction function named :increment"
  []
  (when (d/create-database uri)
    @(d/transact @conn (read-string (slurp (io/resource "schema.dtm"))))
    @(d/transact @conn [{:db/id :counter :value 0}
                        {:db/id (d/tempid :db.part/user)
                         :db/ident :increment
                         :db/fn (d/function
                                 {:lang "clojure"
                                  :params '[db]
                                  :code '(let [v (:value (d/entity db :counter))]
                                           (println "inc" v)
                                           [{:db/id :counter
                                             :value (inc v)}])})}])))

Note that we rely on the return value of create-database to ensure that only one node in our cluster loads our very simple schema and initializes our counter.

The :increment entity is an example of a Datomic transaction function. It runs inside the transactor, which serializes all writes to the database, and eliminates the potential for the race conditions mentioned above. Note that the println output should appear in the transactor's stdout.

Go ahead and deploy it:

$ lein immutant deploy

Monitor the shells in which you started Immutant and the Datomic transactor to see the expected output once the counting commences.

Cluster Time!

Now the fun begins. I'm going to assume you don't have a spare server lying around, virtual or otherwise, that is discoverable via multicast, on which you can install immutant and deploy the app, but if you did, you'd simply pass the --clustered option when you fire up the Immutants on both hosts:

$ lein immutant run --clustered

But it's never that easy, is it? ;)

Instead, I'm going to show you how to create a cluster of Immutants on your laptop using a port offset. First, kill the Immutant and transactor processes (Ctrl-c) you started earlier, make sure you have the app deployed, clean the runtime state, and replicate our Immutant installation:

$ lein immutant deploy /path/to/datomic-counter
$ rm -rf ~/.immutant/current/jboss/standalone/data
$ cp -R ~/.immutant/current/ /tmp/node1
$ cp -R ~/.immutant/current/ /tmp/node2

We're going to use two system properties to simulate our cluster:

  • jboss.node.name Every node in a cluster must have a unique identifier, which defaults to hostname, which will be the same for both of our Immutants, so we'll override it on one of them.
  • jboss.socket.binding.port-offset To avoid port conflicts, all socket bindings will be incremented by this value, which defaults to 0.

In one shell, fire up node1:

$ IMMUTANT_HOME=/tmp/node1 lein immutant run --clustered -Djboss.node.name=node1

And in another shell, fire up node2:

$ IMMUTANT_HOME=/tmp/node2 lein immutant run --clustered -Djboss.socket.binding.port-offset=100

Assuming you have lots of RAM and disk and generous ulimit settings, you can fire up as many of these as you like, e.g.

$ cp -R ~/.immutant/current/ /tmp/node3
$ IMMUTANT_HOME=/tmp/node3 lein immutant run --clustered \
  -Djboss.node.name=node3 \
  -Djboss.socket.binding.port-offset=200

At this point, your Immutants should be complaining about the lack of a running transactor, so go back to your transactor shell and restart it:

$ bin/transactor config/immutant.properties

After a few seconds, you should see expected log output from the two "nodes" and the transactor.

To see some failover, recall that the transactor is configured to connect to port 11222, which is the node1 Immutant. The node2 Immutant with the port offset is listening on port 11322. Go back to your node1 shell and hit Ctrl-c to kill it. Observe that the transactor doesn't miss a beat because it's already aware of node2's membership in the cluster. The transactor only needed node1 up long enough to bootstrap itself into the cluster. The database remains consistent as long as there's at least one node present thereafter.

Now restart node1 and kill the transactor using Ctrl-c. You should see some errors in the output of the Immutants, but they'll recover gracefully whenever you restart the transactor, whose output should pick up right where it left off when you killed it.

Feedback

It's still early days, obviously, and I can't yet articulate the trade-offs, but I feel confident enough after my limited testing to invite others to kick the tires, especially those with more Datomic expertise than me. I suspect there are opportunities for tuning, and we need to define some best practices around deployment, maybe come up with some Docker container images, for example.

As always, feel free to reach out in the usual ways and feedback us!

Getting Started: Caching

This is the next tutorial in our getting started series: an exploration of Immutant's caching features. JBoss AS7 -- and therefore, Immutant -- comes with the Infinispan data grid baked right in, obviating the need to manage a separate caching service like Memcached for your applications.

Infinispan is a state-of-the-art, high-speed, low-latency, distributed data grid. It is capable of efficiently replicating key-value stores -- essentially souped-up ConcurrentMap implementations -- across a cluster. But it can also serve as a capable in-memory data cache, too: providing features such as write-through/write-behind persistence, multiple eviction policies, and transactions.

Clustering Modes

Infinispan caches adhere to a particular mode of operation. In a non-clustered, standalone Immutant, :local is the only supported mode. But when clustered, you have other options.

  • :local -- This is what you get in non-clustered mode, roughly equivalent to a hash map with write-through/write-behind persistence, JTA/XA support, MVCC (non-blocking, thread-safe reads even during concurrent writes), and JMX manageability.
  • :invalidated -- This is the default clustered mode. It doesn't actually share any data at all, so it's very "bandwidth friendly". Whenever data is changed in a cache, other caches in the cluster are notified that their copies are now stale and should be evicted from memory.
  • :replicated -- In this mode, entries added to any cache instance will be copied to all other cache instances in the cluster, and can then be retrieved locally from any instance. Though simple, it's impractical for clusters of any significant size (>10), and its capacity is equal to the amount of RAM in its smallest peer.
  • :distributed -- This mode is what enables Infinispan clusters to achieve "linear scalability". Cache entries are copied to a fixed number of cluster nodes (default is 2) regardless of the cluster size. Distribution uses a consistent hashing algorithm to determine which nodes will store a given entry.

immutant.cache/InfinispanCache

The first thing you must understand about Immutant's InfinispanCache is that it's mutable. This is sensible in a clustered environment, because the local process benefits from fast reads of data that may have been put there by a remote process. We effectively shift the responsibility of "sane data management", i.e. MVCC, from Clojure to Infinispan.

The second thing to know is that every Immutant cache has a cluster-scoped name and a mode. When you call immutant.cache/cache, the name is required, and it may refer to an existing cache that is already populated. The mode argument (one of :local, :invalidated, :replicated, or :distributed) is optional, defaulting to :invalidated if clustered and :local otherwise.

Because the cache implements many core Clojure interfaces, functions that typically return immutable copies will actually affect the cache contents:

UPDATE 3/22/2012: Due to feedback from our Clojure/West talk we no longer alter the Immutant caches through the core Clojure functions as shown below. See the latest docs for current info and examples.
  user> (def cache (immutant.cache/cache "test"))
  #'user/cache
  user> cache
  {}
  user> (assoc cache :a 1)
  {:a 1}
  user> (merge cache {:b 2, :c 3})
  {:c 3, :a 1, :b 2}
  user> (dissoc cache :c)
  {:a 1, :b 2}
  user> cache
  {:a 1, :b 2}
  user> (empty cache)
  {}
  user> cache
  {}

Further, the InfinispanCache supports a variety of put methods, some that expose the ConcurrentMap features of atomically storing entries based on their presence or absence. These all take lifespan options for time-to-live and max-idle expiration policies.

Memoization

Memoization is an optimization technique associating a cache of calculated values with a potentially expensive function, incurring the expense only once, with subsequent calls retrieving the result from the cache. The keys of the cache are the arguments passed to the function.

Standards for caching and memoization in Clojure are emerging in the form of core.cache and core.memoize, respectively. Because the InfinispanCache implements clojure.core.cache/CacheProtocol it can act as an underlying implementation for clojure.core.memoize/PluggableMemoization. Immutant includes a higher-order memo function for doing exactly that:

(immutant.cache/memo a-slow-function "a name" :distributed)

An Example

Let's ammend the example from our clustering tutorial to demonstrate a replicated cache. We'll create a simple web app with a single request to which we'll pass an integer. The request handler will pass that number to a very slow increment function: it'll sleep for that number of seconds before returning its increment. For us, this sleepy function represents a particularly time-consuming operation that will benefit from memoization.

Of course we'll need a project.clj

(defproject example "1.0.0-SNAPSHOT"
  :dependencies [[org.clojure/clojure "1.3.0"]])

Next, the Immutant application bootstrap file, immutant.clj, into which we'll put all our code for this example.

(ns example.init
  (:use [ring.util.response]
        [ring.middleware.params])
  (:require [immutant.cache :as cache]
            [immutant.web :as web]))

;; Our slow function
(defn slow-inc [t]
  (Thread/sleep (* t 1000))
  (inc t))

;; Our memoized version of the slow function
(def memoized-inc (cache/memo slow-inc "sleepy" :replicated))

;; Our Ring handler
(defn handler [{params :params}]
  (let [t (Integer. (get params "t" 1))]
    (response (str "value=" (memoized-inc t) "\n"))))

;; Start up our web app
(web/start "/" (wrap-params handler))

Make sure you have a recent version of Immutant:

$ lein immutant install

And cd to the directory containing the above two files and deploy your app:

$ lein immutant deploy

Now bring up your simulated cluster. In one shell, run:

$ lein immutant run --clustered -Djboss.node.name=one -Djboss.server.data.dir=/tmp/one

In another shell, run:

$ lein immutant run --clustered -Djboss.node.name=two -Djboss.server.data.dir=/tmp/two -Djboss.socket.binding.port-offset=100

You should have one server listening on port 8080 and another on 8180. So in yet another shell, run this:

$ curl "http://localhost:8080/example/?t=5"

With any luck, that should return 6 after about 5 seconds. Now run it again and it should return 6 immediately. Now for the moment of truth: change the port to 8180. That should return 6 immediately, too! Each unique value of t should only sleep t seconds the first time called on any peer in the cluster.

Here's another trick. Fire off a request with t=20 or so, and wait a few seconds, but before it completes hit the same server again with the same t value. You'll notice that the second request will not have to sleep for the full 20 seconds; it returns immediately after the first completes.

Caveats

Though tested and toyed with, this is seriously Alpha code, and the API is still coagulating, especially with respect to the options for the cache and memo functions, which should probably include :ttl and :idle themselves, for example. Other options may be introduced as more of Infinispan's features are exposed, e.g. transactions and persistence.

By the way, we've recently gotten a decent start on our user manual and api docs, so take a gander there for more details on caching or any of the other Immutant components. And remember to pester us in the usual ways if you have any questions.

Happy hacking!