Messaging with The Deuce

[mailboxes]

In this installment of our series on getting started with Immutant 2, we'll take a detailed look at the API of our library for messaging, and show a few examples of usage.

If you're coming from Immutant 1.x, you may notice that the artifact has been renamed (org.immutant/immutant-messaging is now org.immutant/messaging), and the API has changed a bit. We'll point out the notable API changes as we go.

The API

The messaging API is backed by HornetQ, which is an implementation of JMS. JMS provides two primary destination types: queues and topics. Queues represent point-to-point destinations, and topics publish/subscribe.

To use a destination, we need to get a reference to one via the queue or topic functions, depending on the type required. This will create the destination if it does not already exist. This is a bit different than the 1.x API, which provided a single start function for this, and determined the type of destination based on conventions around the provided name. In 2.x, we've removed those naming conventions.

Once we have a reference to a destination, we can operate on it with the following functions:

  • publish - sends a message to the destination
  • receive - receives a single message from the destination
  • listen - registers a function to be called each time a message arrives at the destination

If the destination is a queue, we can do synchronous messaging (request-response):

  • respond - registers a function that receives each request, and the returned value will be sent back to the requester
  • request - sends a message to the responder

Finally, to deregister listeners, responders, and destinations, we provide a single stop function. This is another difference from 1.x - the unlisten and stop functions have been collapsed to stop.

Some Examples

The following code fragments were tested against 2.x.incremental.133. You should follow the instructions in the getting started post to set up a project using Immutant 2.x, and add [org.immutant/messaging "2.x.incremental.133"] and [cheshire "5.3.1"] to the project dependencies (we'll be encoding some messages as JSON in our examples below, so we'll go ahead and add cheshire while we're at it). Then, fire up a REPL, and require the immutant.messaging namespace to follow along:

(require '[immutant.messaging :refer :all])

First, let's create a queue:

(queue "my-queue")

That will create the queue in the HornetQ broker for us. We'll need a reference to that queue to operate on it. Let's go ahead and store that reference in a var:

(def q (queue "my-queue"))

We can call queue any number of times - if the queue already exists, we're just grabbing a reference to it.

Now, let's register a listener on our queue. Let's just print every message we get:

(def listener (listen q println))

We can publish to that queue, and see that the listener gets called:

(publish q {:hi :there})

You'll notice that we're publishing a map there - we can publish pretty much any data structure as a message. By default, that message will be encoded using edn. We also support other encodings, namely: :clojure, :fressian, :json, and :text. We can choose a different encoding by passing an :encoding option to publish:

(publish q {:hi :there} :encoding :json)

Out of the box, we provide full support for the :clojure, :edn, and :text encodings. If you want to use :fressian or :json, you'll need to add org.clojure/data.fressian or cheshire to your dependencies to enable them, respectively.

We passed our options to publish as keyword arguments, but they can also be passed as a map:

(publish q {:hi :there} {:encoding :json})

This holds true for any of the messaging functions that take options.

We're also passing the destination reference to publish instead of the destination name. That's a departure from 1.x, where you could just pass the destination name. Since we no longer have conventions about how queues and topics should be named, we can no longer determine the type of the destination from the name alone.

We can deregister the listener by either passing it to stop or calling .close on it:

(stop listener)
;; identical to
(.close listener)

Now let's take a look at synchronous messaging. Let's create a new queue for this (you'll want to use a dedicated queue for each responder) and register a responder that just increments the request:

(def sync-q (queue "sync"))

(def responder (respond sync-q inc))

Then, we make a request, which returns a Future that we can dereference:

@(request sync-q 1)

The responder is just a fancy listener, and can be deregistered the same way as a listener.

That's not all...

That was just a brief introduction to the messaging API. There are features we've yet to cover (durable topic subscriptions, connection/session sharing, transactional sessions, remote connections), but it's getting late, so we'll save those for another time.

Try it out!

As always, we'd love to incorporate your feedback. Find us via our community page and join the fun!

Thanks to John Lillis for the image, used under CC BY-NC-ND

Schedule Some Time with The Deuce

[timeclock]

In this installment of our series on getting started with Immutant 2, we'll take a detailed look at the API of our library for scheduling jobs, and show a few examples of usage.

If you're coming from Immutant 1.x, you'll notice that the namespace and artifact have been renamed (what used to be immutant.jobs and org.immutant/immutant-jobs is now immutant.scheduling and org.immutant/scheduling), and the API has changed a bit. It's still based on Quartz 2.2, though.

The API

At first glance, the API for immutant.scheduling appears bigger than it really is, but there are only two essential functions:

  • schedule - for scheduling your jobs
  • stop - for canceling them

The remainder of the namespace is syntactic sugar: functions that can be composed to create the specification for when your job should run.

Your "job" will take the form of a plain ol' Clojure function taking no arguments. The schedule function takes your job and a specification map as arguments. The map determines when your function gets called. It may contain any of the following keys:

  • :in - a period after which your function will be called
  • :at - an instant in time after which your function will be called
  • :every - the period between calls
  • :until - stops the calls at a specific time
  • :limit - limits the calls to a specific count
  • :cron - calls your function according to a Quartz-style cron spec

For each key there is a corresponding "sugar function". We'll see those in the examples below.

Units for periods (:in and :every) are milliseconds, but can also be represented as a keyword or a vector of multiplier/keyword pairs, e.g. [1 :week, 4 :days, 2 :hours, 30 :minutes, 59 :seconds]. Both singular and plural keywords are valid.

Time values (:at and :until) can be a java.util.Date, a long representing milliseconds-since-epoch, or a String in HH:mm format. The latter will be interpreted as the next occurence of HH:mm:00 in the currently active timezone.

Two additional options may be passed in the spec map:

  • :id - a unique identifier for the scheduled job
  • :singleton - a boolean denoting the job's behavior in a cluster [true]

In Immutant 1.x, a name for the job was required. In Immutant 2, the :id is optional, and if not provided, a UUID will be generated. If schedule is called with an :id for a job that has already been scheduled, the prior job will be replaced.

The return value from schedule is a map of the options with any missing defaults filled in, including a generated id if necessary. This result can be passed to stop to cancel the job.

Some Examples

The following code fragments were tested against 2.x.incremental.119. You should read through the getting started post and require the immutant.scheduling namespace at a REPL to follow along:

(require '[immutant.scheduling :refer :all])

We'll need a job to schedule. Here's one!

(defn job []
  (prn 'fire!))

Let's schedule it:

(schedule job)

That was pretty useless. Without a spec, the job will be immediately called asynchronously on one of the Quartz scheduler's threads. Instead, let's have it run in 5 minutes:

(schedule job (in 5 :minutes))

And maybe run again every second after that:

(schedule job
  (-> (in 5 :minutes)
    (every :second)))

But no more than 60 times:

(schedule job
  (-> (in 5 :minutes)
    (every :second)
    (limit 60)))

We could also anticipate getting stupid bored about halfway through, and schedule another job to cancel the first one:

(let [it (schedule job
           (-> (in 5 :minutes)
             (every :second)
             (limit 60)))]
  (schedule #(stop it) (in 5 :minutes, 30 :seconds)))

Of course, you can bring your own job id's if you like:

(schedule job (-> (id :purge) (every 30 :minutes)))
(schedule job (-> (id :purge) (every :hour)))  ; reschedule
(stop (id :purge))

If a job is successfully canceled, stop returns true.

It's Just Maps

Ultimately, the spec passed to schedule is just a map, and the sugar functions are just assoc'ing keys corresponding to their names. The map can be passed either explicitly or via keyword arguments, so all of the following are equivalent:

(schedule job (-> (in 5 :minutes) (every :day)))
(schedule job {:in [5 :minutes], :every :day})
(schedule job :in [5 :minutes], :every :day)

Supports Joda clj-time

If you're using the clj-time library in your project, you can load the immutant.scheduling.joda namespace. This will extend org.joda.time.DateTime instances to the AsTime protocol, enabling them to be used as arguments to at and until, e.g.

(require '[clj-time.core :refer [today-at plus hours]])

(let [t (today-at 9 00)]
  (schedule job
    (-> (at t)
      (every 2 :hours)
      (until (plus t (hours 8))))))

It also provides the function, schedule-seq. Inspired by chime-at, it takes not a specification map but a sequence of times, as might be returned from clj-time.periodic/periodic-seq, subject to the application of any of Clojure's core sequence-manipulating functions.

When defining complex recurring schedules, this presents an interesting alternative to traditional cron specs. For example, consider a job that must run at 10am every weekday. Here's how we'd schedule that with a Quartz-style cron spec:

(schedule job (cron "0 0 10 ? * MON-FRI"))

And here's the same schedule using a lazy sequence:

(require '[immutant.scheduling.joda :refer [schedule-seq]]
         '[clj-time.core            :refer [today-at days]]
         '[clj-time.periodic        :refer [periodic-seq]]
         '[clj-time.predicates      :refer [weekday?]])

(schedule-seq job
  (->> (periodic-seq (today-at 10 0) (days 1))
    (filter weekday?)))

So each has trade-offs, of course. The cron spec is more concise, but also arguably more error-prone, e.g. what is that ? for!?

One very cool thing about the sequence is that I can test it without actually scheduling it. On the other hand, my cron spec test is going to take more than a week to run! ;)

Try it out!

As always, we'd love to incorporate your feedback. Find us via our community page and join the fun!

Thanks to Phil Hart for the image, used under CC BY-NC-SA

Untangling the Web in The Deuce

Our org.immutant/web library has changed quite a bit in Immutant 2, both its API and its foundation: the Undertow web server. Among other things, this gives us much better performance (~35% more throughput than v1.1.1) and built-in support for websockets.

We've given a lot of thought to the API, specifically argument names, types, order, and return values. We're reasonably happy with what we have at this point, but still very much open to suggestions for improvements.

The Web API

The API for immutant.web is small, just two functions and a convenient macro:

  • run - runs your handler in a specific environment, responding to web requests matching a given host, port, and path. The handler may be either a Ring function, Servlet instance, or Undertow HttpHandler.
  • run-dmc - runs your handler in Development Mode (the 'C' is silent)
  • stop - stops your handler[s]

The following code fragments were tested against 2.x.incremental.96. You should read through the getting started post and require the immutant.web namespace at a REPL to follow along:

(require '[immutant.web :refer :all])

Common Usage

First, we'll need a Ring handler. Yours is probably fancier, but this one will do:

(defn app [request]
  {:status 200
   :body "Hello world!"})

To make the app available at http://localhost:8080/, do this:

(run app)

Which, if we make the default values explicit, is equivalent to this:

(run app {:host "localhost" :port 8080 :path "/"})

Or, since run takes options as either an explicit map or keyword arguments (kwargs), this:

(run app :host "localhost" :port 8080 :path "/")

The options passed to run determine the URL used to invoke your handler: http://{host}:{port}{path}

To replace your app handler with another, just call run again with the same options, and it'll replace the old handler with the new:

(run (fn [_] {:status 200 :body "hi!"}))

To stop the handler, do this:

(stop)

Which is equivalent to this:

(stop {:host "localhost" :port 8080 :path "/"})

Or, if you prefer kwargs, this:

(stop :host "localhost" :port 8080 :path "/")

Alternatively, you can save run's return value and pass it to stop to stop your handler.

(def server (run app {:port 4242 :path "/hello"}))
...
(stop server)

That's pretty much all there is to it.

You don't even really need to stop your handlers if you're content to just let the JVM exit, but it can be handy at a REPL.

Advanced Usage

The run function returns a map that includes the options passed to it, so you can thread run calls together, useful when your application runs multiple handlers. For example,

(def everything (-> (run hello)
                  (assoc :path "/howdy")
                  (->> (run howdy))
                  (merge {:path "/" :port 8081})
                  (->> (run ola))))

The above actually creates two Undertow web server instances: one serving requests for the hello and howdy handlers on port 8080, and one serving ola responses on port 8081.

You can stop all three apps (and shutdown the two web servers) like so:

(stop everything)

Alternatively, you could stop only the ola app like so:

(stop {:path "/" :port 8081})

You could even omit :path since "/" is the default. And because ola was the only app running on the web server listening on port 8081, it will be shutdown automatically.

Handler Types

Though the handlers you run will typically be Ring functions, you can also pass any valid implementation of javax.servlet.Servlet or io.undertow.server.HttpHandler. For an example of the former, here's a very simple Pedestal service running on Immutant:

(ns testing.hello.service
  (:require [io.pedestal.service.http :as http]
            [io.pedestal.service.http.route.definition :refer [defroutes]]
            [ring.util.response :refer [response]]
            [immutant.web :refer [run]]))

(defn home-page [request] (response "Hello World!"))
(defroutes routes [[["/" {:get home-page}]]])
(def service {::http/routes routes})

(defn start [options]
  (run (::http/servlet (http/create-servlet service)) options))

Development Mode

The run-dmc macro resulted from a desire to provide a no-fuss way to enjoy all the benefits of REPL-based development. Before calling run, run-dmc will first ensure that your Ring handler is var-quoted and wrapped in the reload and stacktrace middleware from the ring-devel library (which must be included among your [:profiles :dev :dependencies] in project.clj). It'll then open your app in a browser.

Both run and run-dmc accept the same options. You can even mix them within a single threaded call.

The Websocket API

Also included in the org.immutant/web library is the immutant.websocket namespace, which includes a Channel protocol and the create-handler function. It accepts a map of callback functions, invoked asynchronously during the lifecycle of a websocket. The valid websocket event keywords and their corresponding callback signatures are as follows:

  :on-message (fn [channel message])
  :on-open    (fn [channel])
  :on-close   (fn [channel {:keys [code reason]}])
  :on-error   (fn [channel throwable])
  :fallback   (fn [request] (response ...))

To create your websocket endpoint, pass the result from create-handler to immutant.web/run. Here's an example that asynchronously returns the upper-cased equivalent of whatever message it receives:

(ns whatever
  (:require [immutant.web :as web]
            [immutant.websocket :as ws]
            [clojure.string :refer [upper-case]]))

(defn create-websocket []
  (web/run
    (ws/create-handler {:on-message (fn [c m] (ws/send! c (upper-case m)))})
    {:path "/websocket"}))

Another function, immutant.websocket/create-servlet, can be used to create a JSR 356 Endpoint. The channel passed to the callbacks is an instance of javax.websocket.Session, extended to the immutant.websocket.Channel protocol.

Try it out!

We'd love to hear some feedback on this stuff. Find us on our community page and join the fun!

Getting Started With Immutant 2

We're hard at work on Immutant 2.0, and have a ways to go before we reach feature parity with 1.x. But if you are interested in playing with what we have now, then this article is for you!

Getting Immutant

Our CI server publishes an incremental release for each successful build. In order to use an incremental build, you'll need to add a repository and dependencies to your project.clj. Currently, only our scheduling and web artifacts contain implementations, so let's add those:

(defproject some-project "1.2.3"
  ...
  :dependencies [...
                 [org.immutant/scheduling "2.x.incremental.BUILD_NUM"]
                 [org.immutant/web        "2.x.incremental.BUILD_NUM"]]
  :repositories [["Immutant 2.x incremental builds"
                  "http://downloads.immutant.org/incremental/"]])

replacing BUILD_NUM with the build number for the version you want to use. You can get the build number from our builds page - the latest build number is 62 as of this article.

Edit: build 62 has been garbage collected - be sure to grab the latest build from the builds page.

Note: We're bringing in the artifacts piecemeal above, but we also provide an aggregate artifact that brings in all of the Immutant dependencies in one shot if you'd rather use that: org.immutant/immutant.

That's it! If you are used to Immutant 1.x, you'll notice that there is no download or install step - Immutant 2 is usable as a set of libraries that you consume just like any other Clojure library.

What's in those artifacts?

Along with the artifacts, each CI build publishes the API docs for all of the Immutant namespaces. You can see the docs for a specific build (build #62 in this case), or the latest build.

If you start playing with any of that API, be aware that it is currently in a pre-alpha state, and may change dramatically at any time.

What's next?

We plan on publishing articles in the near future focusing on using the web and scheduling namespaces, followed by articles on websockets, messaging, and caching as soon as we get those implementations off of the ground. We'll also cover using those same namespaces within a WildFly container, taking advantage of the clustering features the container provides.

Once we have most of our namespaces in semi-decent shape, we'll release our first alpha, probably in mid to late June.

As always, if you have any questions, comments, or other feedback, feel free to get in touch.

Creating some Tutorials

After our last release, Dan asked if we had a tutorial to help a newbie get started on Immutant.

Good question.

Well, we do have one, sort of, in the form of some blog posts written over the course of a few months in our News section. They're all helpfully tagged with getting-started, in reverse chronological order...

Well, no, I guess we don't have one. :(

So we decided to take some of those old blog posts, update them, and organize them into a suite of tutorials that are easy to navigate, and we promise to keep them in sync as the project evolves.

Unlike the blog posts, they're living documents, of course, and though each is focused on a particular aspect of Immutant, there will be some overlap between them and our manual and api docs, which we consider more of a reference.

Please don't hesitate to let us know if the tutorials make your initial experience with Immutant that much smoother. And we'd love to hear suggestions for additional ones.