I went to the Boulder Clojurians meetup tonight where we discussed core.async. One of the topics of discussion was helper functions that help you work with core.async. I’ve worked with it a few times in the past, and here are a few of the things I seem to do (almost) every time.

Callback to Async

Sometimes, you have a function that is asynchronous via a callback, such as with http-kit.client, or when working with nodejs. To convert these functions into channel-based operations can be tedious, so lets just use a function for it!

Imagine a function (f a b callback) where callback is an (fn [value]). Let’s create a function (or macro) so we can do (go (<! (cb->async f a b)))

A first cut using a function:

(defn cb->async [f & args]
(let [c (chan)
cb #(go (>! c %))]
(apply f (concat args [cb]))
c))

This is a pretty nice tool for working with callback-based functions. But we can make it better! First, we can use async/onto-chan instead of #(go (>! c %)). Secondly, we can “inline” the call by using a macro.

Here’s a revised implementation, this time for use with nodejs.

(defmacro node->async [f & args]
`(let [c# (chan)]
(~f ~@args #(a/onto-chan c# [{:error %1 :value %2}]))
c))

Both these implementations are valid, and the function form is more readable. You can also determine which “put” function is more readable: (onto-chan c [%]) or (go (>! c %)).

Async as Promise

core.async provides a lot of functionality for comparatively little syntactical overhead. But it doesn’t always provide helpers for all the use-cases. One case I seem to use a lot (especially in clojurescript) is promises. I like the flexibility of core.async, but I want more promise-like helpers.

Luckily, the library recently released core.async/promise-chan, which acts like a promise, containing only ever one value.

(defn promise-map
"Map the value inside a promise-chan with f.
Return a promise-chan with the mapped value on it."
[pc f]
;; We use the transducer form of promise-chan to
;; map the value inside to a new promise-chan
(a/pipe pc
(a/promise-chan (map f))))
(defn then
"Compose a promise-chan with a
function of the eventual value that returns a promise-chan.
Return a promise-chan with the eventual result on it."
[pc f]
(let [out (a/promise-chan)]
;; We use pipeline-async to allow an asynchronous operation
;; (f %1) returns a promise-chan
;; we pipe that to the channel passed as arg 2
;; See pipeline-async docs for more details
(a/pipeline-async 1 out #(a/pipe (f %1) %2) pc)
out))

Astute observers will note that promise-map is the promise functor’s fmap function, while then is the promise monad’s bind function.

Don’t rewrite the wheel

There are so many goodies in the core.async library that you probably don’t need to write a bunch of generic helper functions.

  • For connecting tasks, checkout pipe and the pipeline-* family of functions.
  • mult and tap help with stream processing while pub and sub do what you think they do.
  • split is useful for splitting channels, such as errors from successful values.