immutant.pipeline documentation

Provides functions for creating and managing pipelines. A pipeline
is a composition of functions ("steps"), where each function is
passed the result of the previous function, dereferenced if needed.
It is built on top of Immutant's messaging subsystem, allowing each
step to have multiple processing threads, and to be automatically
load balanced across a cluster.

The `pipeline` function takes a unique (within the scope of the
application) name, one or more single-arity functions, and optional
kwarg options, returning a function that places its argument onto
the pipeline when called. The resulting pipeline-fn optionally
returns a delay that can be used to retrieve the result of the
pipeline execution.

Each function can be optionally be wrapped with metadata that
provides options for how that particular function is handled (see
the 'step' fn below).

Example:

(require '[immutant.pipeline :as pl])

(defn calculate-foo [m]
  ...
  (assoc m :foo value))

(defn save-data [m]
  ...)

;; create a pipeline
(defonce foo-pipeline
  (pl/pipeline "foo" ;; pipelines must be named
    (pl/step calculate-foo :concurrency 5) ;; run this step with 5 threads
    (pl/step #(update-in % [:bar] + (:foo %)) :name :update-bar) ;; give this step a name
    save-data ;; a 'vanilla' step
    :concurrency 2 ;; run all steps with 2 threads (unless overridden)
    :error-handler (fn [ex m] ;; do something special with errors, and retry
                     ...
                     (pl/*pipeline* m :step *current-step*))))

;; put data onto the front pipeline
(foo-pipeline {:bar 1 :ham "biscuit"})

;; put data onto the pipeline at a given step
(foo-pipeline {:bar 1 :foo 42 :ham "gravy"} :step :update-bar)

;; get the result
(deref (foo-pipeline {:bar 1 :ham "biscuit"}) 1000 ::timeout!)

;; optional - it will automatically be stopped on undeploy
(pl/stop foo-pipeline)

*current-step*

var

The name of the current pipeline step. Will be bound within the
pipeline steps and error handlers.

*next-step*

var

The name of the next pipeline step. Will be bound within the
pipeline steps and error handlers.

*pipeline*

var

The currently active pipeline fn. Will be bound within the
pipeline steps and error handlers.

fanout

(fanout xs)
A function that takes a seq and places each item in it on the pipeline at the next step.
This halts pipeline execution for the current message, but
continues execution for each seq element. Note that a pipeline that
uses this function cannot be correctly derefenced, since the deref
will only get the first value to finish the pipeline.

halt

Halts pipeline processing for a given message if returned from any
handler function.

pipeline

(pipeline pl-name & args)
Creates a pipeline function.

It takes a unique (within the scope of the application) name, one
or more single-arity functions, and optional kwarg options, and
returns a function that places its argument onto the pipeline when
called. If :result-ttl is > -1, it returns a delayed value that can
be derefed to get the result of the pipeline execution.

The following kwarg options are supported, and must follow the step
functions [default]:

:concurrency         the number of threads to use for *each* step. Can be
                     overridden on a per-step basis - see the 'step'
                     function. [1]
:error-handler       a function that will be called when any step raises
                     an exception. It will be passed the exception and
                     the argument to the step. Without an error-handler,
                     the default HornetQ retry semantics will be
                     used. Can be overridden on a per-step basis - see
                     the 'step' function. [nil]
:result-ttl          the time-to-live for the final pipeline result,
                     in ms. Set to 0 for "forever", -1 to disable
                     returning the result via a delay [1 hour]
:step-deref-timeout  the amount of time to wait when dereferencing
                     the result of a step that returns a delay,
                     in ms. Can be overridden on a per-step basis -
                     see the 'step' function. [10 seconds]
:durable             whether messages persist across restarts [true]

During the execution of each step and each error-handler call, the
following vars are bound:

*pipeline*      the pipeline (as a fn) that is being executed
*current-step*  the name of the currently executing step
*next-step*     the name of the next step in the pipeline

This function is *not* idempotent. Attempting to create a pipeline
with the same name as an existing pipeline will raise an error.

step

(step f & {:as opts})
Wraps the given function with the given options, returning a function.

The following options are supported [default]:

:name                a name to use for the step [the current index of the fn]
:concurrency         the number of threads to use, overriding the pipeline
                     setting [1]
:error-handler       an error handler function that can override the
                     pipeline setting [nil]
:step-deref-timeout  the amount of time to wait when dereferencing
                     the result of the step if it returns a delay,
                     in ms. Overrides the pipeline setting [10 seconds]
:fanout?             applies the fanout fn to the result of the step.
                     See fanout for more details [false]

stop

(stop pl & args)
Destroys a pipeline. Typically not necessary since it will be done
for you when your app is undeployed. This will fail with a warning
if any messages are yet to be delivered unless ':force true' is
passed. Returns true on success.