Using Datomic With Immutant Redux

A month ago, we covered how to use Datomic with Immutant. Now that Immutant 2.1.0 has been released, the process of using Immutant messaging and Datomic in the same application is a bit simpler, so we're covering just that here.

With Immutant 2.1.0, org.immutant/messaging can now work with the HornetQ 2.3.x brought in by Datomic, you just have to have the proper dependency inclusions/exclusions for it to work. The minimum to get this working is:

:dependencies [[org.immutant/immutant "2.1.0"]
               ;; Datomic transitively brings in HornetQ 2.3.17.Final, which
               ;; overrides the HornetQ 2.4.5.Final from org.immutant/messaging
               [com.datomic/datomic-pro "0.9.5206"]
               ;; org.immutant/messaging requires this, but Datomic doesn't
               ;; bring it in, so we have to depend on it explicitly
               [org.hornetq/hornetq-jms-server "2.3.17.Final"
                :exclusions [org.jboss.jbossts.jts/jbossjts-jacorb]]]

But doing just that leads to lots of warnings in Leiningen's pedantic report. Both Datomic and Immutant have large dependency trees, so conflicts are inevitable. If you want to get rid of those warnings, we've figured that out for you as well:

:dependencies [[org.immutant/immutant "2.1.0"
                :exclusions [org.hornetq/hornetq-server
                             org.hornetq/hornetq-jms-server
                             org.hornetq/hornetq-journal
                             org.hornetq/hornetq-commons]]
               [com.datomic/datomic-pro "0.9.5206"
                :exclusions [org.slf4j/slf4j-nop
                             joda-time
                             commons-codec
                             org.jboss.logging/jboss-logging]]
               [org.hornetq/hornetq-jms-server "2.3.17.Final"
                :exclusions [org.jboss.spec.javax.transaction/jboss-transaction-api_1.1_spec
                             org.jboss.logging/jboss-logging
                             org.jboss/jboss-transaction-spi
                             org.jgroups/jgroups
                             org.jboss.jbossts.jts/jbossjts-jacorb]]]

Get In Touch

If you have any questions, issues, or other feedback about Datomic with Immutant, you can always find us on #immutant on freenode or our mailing list.

Using Datomic With Immutant

Note: now that Immutant 2.1.0 is out, we've published another Datomic post covering using it with 2.1.0 specifically.

Immutant 2.x is just a set of libraries, usable from a standard Clojure application. The Datomic peer client is also a library, so in theory we can use Immutant and Datomic together in the same application. But in pratice, it's not that simple.

The sticking point is HornetQ - Datomic uses HornetQ to connect peers to the transactor and depends on HornetQ 2.3.x, while Immutant depends on 2.4.x. We should be able to resolve this with the proper dependency exclusions - the API that Datomic uses in 2.3.x is available in 2.4.x, but there are two issues that prevent that from working with the current stable release of Immutant (2.0.2):

  1. Immutant 2.0.x uses the JMS 2.0 API, which didn't appear in HornetQ until 2.4.0, so 2.4.x is required.
  2. The Datomic transactor is running HornetQ 2.3.x, and a 2.4.x client can't connect to a 2.3.x server, so 2.3.x is required.

Hence our pickle.

But, all is not lost - with Immutant 2.0.x, you have two (non-awesome) options for using Datomic, and if you are willing to use recent Immutant incremental builds (and are willing to upgrade to Immutant 2.1.0 when it is released in a few weeks), you have a third, more palatable option.

Option 1 - No Immutant Messaging

The first option for using Datomic with Immutant 2.0.x is to not use Immutant messaging. This requires either depending individually on the Immutant libraries you are using:

  :dependencies [[com.datomic/datomic-pro "0.9.5206"]
                 [org.immutant/web "2.0.2"]
                 [org.immutant/scheduling "2.0.2"]
                 ...]

or using the catch-all artifact, and excluding messaging:

  :dependencies [[com.datomic/datomic-pro "0.9.5206"]
                 [org.immutant/immutant "2.0.2"
                  :exclusions [org.immutant/messaging]]
                 ...]

But this means you can't use any of the Immutant messaging features, which isn't great.

Option 2 - In WildFly Only

An Immutant application deployed to a WildFly application server doesn't directly use any on the HornetQ APIs, and instead uses the JMS API to communicate with the HornetQ provided by WildFly. That HornetQ is ClassLoader-isolated, which means your application can bring in its own version of HornetQ (in this case, 2.3.x via Datomic), which can be used without issue.

But this means you have to do all of your development against an application running inside WildFly, which isn't a great development experience. With our "dev" war, you can still have a REPL-driven process, but it is definitely more painful than out-of-container development.

Option 3 - Use Recent Incrementals, i.e. 2.1.0

For the soon-to-be-released Immutant 2.1.0, we're working on supporting Red Hat JBoss Enterprise Application Platform (which is a mouthful, so we'll just call it EAP). EAP is the commercialized version of Red Hat's open source JBoss Application Server (now known as WildFly), and the current version (6.4.0) is based off an older WildFly that uses HornetQ 2.3.x. We'll cover what EAP support really means in a future blog post - what matters today is that changes we've made in Immutant to support EAP allow you to use Immutant messaging with Datomic both in and out of WildFly (and soon EAP).

The only issues with this option is you have to use a recent incremental build of Immutant until we release 2.1.0, and do a few dependency exclusions/inclusions to make Immutant messaging and Datomic play nicely. Luckily, we've figured that out for you! The bare minimum to get things working is:

:dependencies [[org.immutant/immutant "2.x.incremental.602"]
               ;; Datomic transitively brings in HornetQ 2.3.17.Final, which
               ;; overrides the HornetQ 2.4.5.Final from org.immutant/messaging
               [com.datomic/datomic-pro "0.9.5206"]
               ;; org.immutant/messaging requires this, but Datomic doesn't
               ;; bring it in, so we have to depend on it explicitly
               [org.hornetq/hornetq-jms-server "2.3.17.Final"
                :exclusions [org.jboss.jbossts.jts/jbossjts-jacorb]]]

Now that you have it working, you'll probably notice that Leiningen's pedantic report is chock full of warnings. Both Datomic and Immutant have large dependency trees, so conflicts are inevitable. If you want to get rid of those warnings, we've figured that out for you as well:

:dependencies [[org.immutant/immutant "2.x.incremental.602"
                :exclusions [org.hornetq/hornetq-server
                             org.hornetq/hornetq-jms-server
                             org.hornetq/hornetq-journal
                             org.hornetq/hornetq-commons]]
               [com.datomic/datomic-pro "0.9.5206"
                :exclusions [org.slf4j/slf4j-nop
                             joda-time
                             commons-codec
                             org.jboss.logging/jboss-logging]]
               [org.hornetq/hornetq-jms-server "2.3.17.Final"
                :exclusions [org.jboss.spec.javax.transaction/jboss-transaction-api_1.1_spec
                             org.jboss.logging/jboss-logging
                             org.jboss/jboss-transaction-spi
                             org.jgroups/jgroups
                             org.jboss.jbossts.jts/jbossjts-jacorb]]]

Note again that this option currently requires you to run a recent incremental build (#602 or newer), which requires relying on our incremental repo:

:repositories [["Immutant incremental builds" "http://downloads.immutant.org/incremental/"]]

Get In Touch

If you have any questions, issues, or other feedback about Datomic with Immutant, you can always find us on #immutant on freenode or our mailing lists.

Using ActiveMQ Artemis With Immutant

Artemis is a "new" JMS2-compatible message broker based on a merging of the ActiveMQ and HornetQ codebases, and represents the future of both projects.

Immutant is built on an abstraction layer called WunderBoss in order to share much of the implementation with our sibling project, TorqueBox. An additional advantage of this layer is it allows us (in theory) to easily switch out the underlying messaging implementation while keeping the same Clojure API. With the release of Artemis 1.0.0, we've now taken the opportunity to test that theory.

wunderboss-artemis

With only a couple of changes to WunderBoss that allow us to share the bulk of the existing JMS2 implementation between the HornetQ and Artemis adapters and make it easier to exclude the HornetQ dependencies, we're now able to provide wunderboss-artemis. It allows you to use Artemis instead of HornetQ as the message broker in an embedded application (it doesn't yet support Artemis if you are deploying to WildFly).

If you want to give it a try, you just need to depend on a recent incremental build (#585 or newer) and make a few adjustments to your :dependencies:

:dependencies [[org.immutant/messaging "2.x.incremental.585"
                :exclusions [org.projectodd.wunderboss/wunderboss-messaging-hornetq]]
               [org.projectodd.wunderboss/wunderboss-artemis "0.1.0"]]

If you then use messaging and see something like the following in your log output, you're all set!

14:12:44.471 INFO  [org.apache.activemq.artemis.core.server] (main) AMQ221020: Started Acceptor at localhost:5445 for protocols [CORE]
14:12:44.471 INFO  [org.apache.activemq.artemis.core.server] (main) AMQ221007: Server is now live
14:12:44.471 INFO  [org.apache.activemq.artemis.core.server] (main) AMQ221001: Apache ActiveMQ Artemis Message Broker version 1.0.0 [nodeID=2107a7c3-0a1c-11e5-955d-71ef037c4451]

Artemis is brand new - once it matures a bit, we may provide an immutant-artemis lib that would bring in wunderboss-artemis and provide an Artemis management namespace similar to the immutant.messaging.hornetq we currently provide.

As always, if you have any issues or feedback, feel free to get in touch.

Using Transit with Immutant 2

Out of the box, Immutant 2 has support for several data serialization strategies for use with messaging and caching, namely: EDN, Fressian, JSON, and none (which falls back to Java serialization). But what if you want to use another strategy? Luckily, this isn't a closed set - Immutant allows us to add new strategies. We took advantage of that and have created a separate project that brings Transit support to Immutant - immutant-transit.

What is Transit?

From the Transit format page:

Transit is a format and set of libraries for conveying values between applications written in different programming languages.

It's similar in purpose to EDN, but leverages the speed of the optimized JSON readers that most platforms provide.

What does immutant-transit offer over using Transit directly?

immutant-transit provides an Immutant codec for Transit that allows for transparent encoding and decoding of Transit data when using Immutant's messaging and caching functionality. Without it, you would need to set up the encode/decode logic yourself.

Usage

Note: immutant-transit won't work with Immutant 2.0.0-alpha1 - you'll need to use an incremental build (#298 or newer).

First, we need to add org.immutant/immutant-transit to our application's dependencies:

  :dependencies [[org.clojure/clojure "1.6.0"]
                 [org.immutant/immutant "2.x.incremental.298"]
                 [org.immutant/immutant-transit "0.2.2"]]

If you don't have com.cognitect/transit-clj in your dependencies, immutant-transit will transitively bring in version 0.8.259. We've tested against 0.8.255 and 0.8.259, so if you're running another version and are seeing issues, let us know.

Now, we need to register the Transit codec with Immutant:

  (ns your.app
    (:require [immutant.codecs.transit :as it]))

  (it/register-transit-codec)

This will register a vanilla JSON Transit codec that encodes to a byte[] under the name :transit with the content-type application/transit+json (Immutant uses the content-type to identify the encoding for messages sent via HornetQ).

To use the codec, provide it as the :encoding option wherever an encoding is used:

  (immutant.messaging/publish some-queue {:a :message} :encoding :transit)

  (def transit-cache (immutant.caching/with-codec some-cache :transit))
  (immutant.caching/compare-and-swap! transit-cache a-key a-function)

If you need to change the underlying format that Transit uses, or need to provide custom read/write handlers, you can pass them as options to register-transit-codec:

  (it/register-transit-codec
    :type :json-verbose
    :read-handlers my-read-handlers
    :write-handlers my-write-handlers)

The content-type will automatically be generated based on the :type, and will be of the form application/transit+<:type>.

You can also override the name and content-type:

  (it/register-transit-codec
    :name :transit-with-my-handlers
    :content-type "application/transit+json+my-stuff"
    :read-handlers my-read-handlers
    :write-handlers my-write-handlers)

For more examples, see the example project.

Why is this a separate project from Immutant?

Transit's format and implementation are young, and are still in flux. We're currently developing this as a separate project so we can make releases independent of Immutant proper that track changes to Transit. Once Transit matures a bit, we'll likely roll this in to Immutant itself.

If you are interested in adding a codec of your own, take a look at the immutant-transit source and at the immutant.codecs namespace to see how it's done.

Get In Touch

If you have any questions, issues, or other feedback about mmutant-transit, you can always find us on #immutant on freenode or our mailing lists.

Messaging with The Deuce

[mailboxes]

In this installment of our series on getting started with Immutant 2, we'll take a detailed look at the API of our library for messaging, and show a few examples of usage.

If you're coming from Immutant 1.x, you may notice that the artifact has been renamed (org.immutant/immutant-messaging is now org.immutant/messaging), and the API has changed a bit. We'll point out the notable API changes as we go.

The API

The messaging API is backed by HornetQ, which is an implementation of JMS. JMS provides two primary destination types: queues and topics. Queues represent point-to-point destinations, and topics publish/subscribe.

To use a destination, we need to get a reference to one via the queue or topic functions, depending on the type required. This will create the destination if it does not already exist. This is a bit different than the 1.x API, which provided a single start function for this, and determined the type of destination based on conventions around the provided name. In 2.x, we've removed those naming conventions.

Once we have a reference to a destination, we can operate on it with the following functions:

  • publish - sends a message to the destination
  • receive - receives a single message from the destination
  • listen - registers a function to be called each time a message arrives at the destination

If the destination is a queue, we can do synchronous messaging (request-response):

  • respond - registers a function that receives each request, and the returned value will be sent back to the requester
  • request - sends a message to the responder

Finally, to deregister listeners, responders, and destinations, we provide a single stop function. This is another difference from 1.x - the unlisten and stop functions have been collapsed to stop.

Some Examples

The following code fragments were tested against 2.x.incremental.133. You should follow the instructions in the getting started post to set up a project using Immutant 2.x, and add [org.immutant/messaging "2.x.incremental.133"] and [cheshire "5.3.1"] to the project dependencies (we'll be encoding some messages as JSON in our examples below, so we'll go ahead and add cheshire while we're at it). Then, fire up a REPL, and require the immutant.messaging namespace to follow along:

(require '[immutant.messaging :refer :all])

First, let's create a queue:

(queue "my-queue")

That will create the queue in the HornetQ broker for us. We'll need a reference to that queue to operate on it. Let's go ahead and store that reference in a var:

(def q (queue "my-queue"))

We can call queue any number of times - if the queue already exists, we're just grabbing a reference to it.

Now, let's register a listener on our queue. Let's just print every message we get:

(def listener (listen q println))

We can publish to that queue, and see that the listener gets called:

(publish q {:hi :there})

You'll notice that we're publishing a map there - we can publish pretty much any data structure as a message. By default, that message will be encoded using edn. We also support other encodings, namely: :clojure, :fressian, :json, and :text. We can choose a different encoding by passing an :encoding option to publish:

(publish q {:hi :there} :encoding :json)

Out of the box, we provide full support for the :clojure, :edn, and :text encodings. If you want to use :fressian or :json, you'll need to add org.clojure/data.fressian or cheshire to your dependencies to enable them, respectively.

We passed our options to publish as keyword arguments, but they can also be passed as a map:

(publish q {:hi :there} {:encoding :json})

This holds true for any of the messaging functions that take options.

We're also passing the destination reference to publish instead of the destination name. That's a departure from 1.x, where you could just pass the destination name. Since we no longer have conventions about how queues and topics should be named, we can no longer determine the type of the destination from the name alone.

We can deregister the listener by either passing it to stop or calling .close on it:

(stop listener)
;; identical to
(.close listener)

Now let's take a look at synchronous messaging. Let's create a new queue for this (you'll want to use a dedicated queue for each responder) and register a responder that just increments the request:

(def sync-q (queue "sync"))

(def responder (respond sync-q inc))

Then, we make a request, which returns a Future that we can dereference:

@(request sync-q 1)

The responder is just a fancy listener, and can be deregistered the same way as a listener.

That's not all...

That was just a brief introduction to the messaging API. There are features we've yet to cover (durable topic subscriptions, connection/session sharing, transactional sessions, remote connections), but it's getting late, so we'll save those for another time.

Try it out!

As always, we'd love to incorporate your feedback. Find us via our community page and join the fun!

Thanks to John Lillis for the image, used under CC BY-NC-ND