Communicating Sequential Processes (CSP) has become a popular solution for simplifying concurrent programming. In its modern implementations, it is the communication of green threads via channels, a uni-directional, possibly-bufferred, closeable queue of data.

Here, I’ll give instance definitions for the channel in various type theory abstractions using Clojure’s core.async and cats (cats.labs.channel). We’ll implement all of cats’ main protocols: Semigroup, Monoid, Functor, Applicative, Monad, MonadZero, and MonadPlus.

In cats, to create a type theory implementation, you must extend the type you want with the protocol Contextual, which contains a function to create a reified object with the actual implementation of the protocols:

In each of the implementations below,

  • I’ll show only the protocol implementations for context.
  • mv stands for Monad Value and is a channel.
  • core.async is imported as namespace a
  • cats.core is required and refers all values (such as mlet, bind, pure, etc)

Equality

These implementations below rely on satisfying laws of equality. Here is the equality I mean when comparing channels:

(defn chan-eq? [c1 c2]
  (= (a/<!! (a/into #{} c1))
     (a/<!! (a/into #{} c2))))

Note the use of sets here. This means that we do not care about the ordering of channel elements.

Channel order represents the coupling of time and data. Rich Hickey is always talking about decoupling time from applications. Hence, representing order in channel equality is against this idea. Therefore I chose an order-agnostic equality function. (It also has good properties for mappend below.)

Functor

A functor is a container type with a mapping function (fmap f functor-value), akin to list’s map:

Here, we are piping the channel through a transducer channel, which will apply (map f) to each element through the channel. pipe will handle closed channels and close the transducer channel when the input channel closes.

(fmap inc (a/to-chan [1 2 3])) ; => channel containing [2 3 4]

Semigroup & Monoid

A Semigroup is a type with an appending function (mappend mv1 mv2). A Monoid is a Semigroup with a zero element: (mzero). mappend must be associative and have mzero as its identity:

;; Monoid Laws
(= c (mappend c (mzero)))
(= c (mappend (mzero) c))
(= (mappend c1 (mappend c2 c3))
   (mappend (mappend c1 c2) c3))

To mappend, we a/merge channels, and the zero channel is an empty, closed channel. When merged, the zero channel will not change the contents of the output channel, therefore it is an identity.

Choosing a/merge is a personal choice. It does not preserve order, like the list’s mappend. But, since we chose an order-agnostic equality above, it satisfies the laws. Other choices for mappend are appending the inner items in pairs as they are received, or strict ordering concatentation. I chose to use merge because merge handles unbalanced channels, especially infinite streams of data.

(mappend (a/to-chan (range)) (a/to-chan [:data])) ; => chan #{:data 0 1 2 3 ...}

Applicative

An Applicative is a container type with two functions: (pure v) wraps a value into the applicative type. (fapply af av) applies functions which are wrapped in the applicative to values wrapped in the applicative. For channels this means apply a channel of functions to a channel of values, like

(fapply (pure inc) (pure 5)) ;=> chan #{6}
;; Applicative Laws
;; Identity
(= c (fapply (pure identity) c))
;; Composition Application
(let [comp' (fn [f] (fn [g] (comp f g)))]
  (= (fapply (fapply (fapply (pure comp') cf) cg) cv)
     (fapply cf (fapply cg cv))))
;; Homomorphism
(= (fapply (pure f) (pure v)) (pure (f v)))

In pure, we usually just wrap a value with a channel with the value on it and closed. We handle the case of (pure nil) which is used as a throwaway value. We can’t return an empty channel because then no application will be called, so we return a namespaced nil keyword. As long as one does not rely on the actual value inside (pure nil), it satisfies all laws.

fapply is defined using a/map, which simply takes an element from each channel (af and av), and applies the function to it, which returns the application of the function from af to the value from av. It handles infinite channels and closes when one of the input channels closes.

(fapply (a/to-chan (map (fn [n] #(+ % n)) (range))) ; chan [#(+ % 0) #(+ % 1) ...]
        (a/to-chan [1 2 3]))
;; => chan [1 3 5]

Monad

A Monad is a container type with two functions: (mreturn v) acts just like (pure v). (mbind f mv) binds a function of type a -> Monad b with a type Monad a. For channels, I believe this should act similar to lists, where bind will call f on each value in the list, and concatenate the results.

;; Monad Laws
(= (f v) (bind f (return v)))
(= c (bind return c))
(= (bind #(bind g (f %)) c)
   (bind g (bind f c)))

mreturn is exactly pure (and has the same ::nil shortcut).

mbind creates a comprehension of channel functions. We take an incoming channel of values mv and call pipeline-async that through the function #(a/pipe (bindf %1) %2). pipeline-async allows us to call a function on each value of a channel asynchronously and put any number of values on the result channel. Here we call f using bindf (bindf is defined to allow channel monad transformers), and put all the values of the returned channel onto the pipeline-async channel.

This will allow usage of channel’s bind for promise-like semantics as well as stream-like semantics. Here is a

;; Promise-like usage of channel
;; mlet is cats version of do syntax, it is a macro that reduces to bind/return
(mlet [query (create-query) ; => query is eventually a string
       resp (http/get-request (str "http://google.com/?" query))]
  (return (:body resp)))

And here is an example of channel comprehension akin to list comprehension using the stream-like mechanics of bind.

(mlet [a (a/to-chan [1 5])
       b (a/to-chan [a (* 2 a)])
       c (+ a b)]
  (return c))
;; => chan [2 3 10 15]

What’s cool with mlet and channels is you can mix in :when and :let statements:

(mlet [a (a/to-chan (range 1 10)
       :when (even? a)
       :let [b (* a 2)]]
  (return b)
;; => chan [4 8 12 16]

MonadZero & MonadPlus

MonadZero and MonadPlus are ways to use the Monoid and Semigroup interfaces in a monad context. Their implementations therefore follow similarly.

Monad Transformers

Monad transformers can be used with channel as the base monad. For instance, with the either monad:

(require '[cats.monad.either :as e]
         '[cats.labs.channel :as [chan]]
         '[cats.context :as ctx])

(def chaneither-m (e/either-t chan/context))

(ctx/with-context chaneither-m
  (mlet [x (a/to-chan [(e/right 1)])
         y (a/to-chan [(e/right 2)])]
    (return (+ x y))))
;; chan [(right 3)]

(ctx/with-context chaneither-m
  (mlet [x (a/to-chan [(e/right 1)])
         y (a/to-chan [(e/left "error!")])]
    (return (+ x y))))
;; chan [(left "error!")]

Conclusions

This post shows how some type theory protocols are implemented in cats.labs.channel. What’s so cool about these implementations is they work together in a way to use these abstractions to perform both promise-style and stream-style processing. The Monad interface can be used to concatenate operations in a serial way. The MonadPlus and Semigroup operations can be used to concatenated streams of data in a parallel fashion. With Functor and Applicative, one can do synchronous transformations.

Footnotes:

  • Recently, I submitted a PR to cats containing the semantics below. (follow-up PR)
  • There are multiple possible semantics for mappend and mbind, but these are my preferred ones given how channels are used. See the PR discussion.
  • This implementation of cats.labs.channel is released as [cats "1.3.0-SNAPSHOT"].