CSP Channels in Type Theory
Communicating Sequential Processes (CSP) has become a popular solution for simplifying concurrent programming. In its modern implementations, it is the communication of green threads via channels, a uni-directional, possibly-bufferred, closeable queue of data.
Here, I’ll give instance definitions for the channel in various type theory abstractions using Clojure’s core.async and cats (cats.labs.channel
). We’ll implement all of cats’ main protocols: Semigroup, Monoid, Functor, Applicative, Monad, MonadZero, and MonadPlus.
In cats, to create a type theory implementation, you must extend the type you want with the protocol Contextual
, which contains a function to create a reified object with the actual implementation of the protocols:
In each of the implementations below,
- I’ll show only the protocol implementations for
context
. mv
stands for Monad Value and is a channel.core.async
is imported as namespacea
cats.core
is required and refers all values (such asmlet
,bind
,pure
, etc)
Equality
These implementations below rely on satisfying laws of equality. Here is the equality I mean when comparing channels:
(defn chan-eq? [c1 c2]
(= (a/<!! (a/into #{} c1))
(a/<!! (a/into #{} c2))))
Note the use of sets here. This means that we do not care about the ordering of channel elements.
Channel order represents the coupling of time and data. Rich Hickey is always talking about decoupling time from applications. Hence, representing order in channel equality is against this idea. Therefore I chose an order-agnostic equality function. (It also has good properties for mappend
below.)
Functor
A functor is a container type with a mapping function (fmap f functor-value)
, akin to list’s map
:
Here, we are piping the channel through a transducer channel, which will apply (map f)
to each element through the channel. pipe
will handle closed channels and close the transducer channel when the input channel closes.
(fmap inc (a/to-chan [1 2 3])) ; => channel containing [2 3 4]
Semigroup & Monoid
A Semigroup is a type with an appending function (mappend mv1 mv2)
. A Monoid is a Semigroup with a zero element: (mzero)
. mappend
must be associative and have mzero
as its identity:
;; Monoid Laws
(= c (mappend c (mzero)))
(= c (mappend (mzero) c))
(= (mappend c1 (mappend c2 c3))
(mappend (mappend c1 c2) c3))
To mappend
, we a/merge
channels, and the zero channel is an empty, closed channel. When merged, the zero channel will not change the contents of the output channel, therefore it is an identity.
Choosing a/merge
is a personal choice. It does not preserve order, like the list’s mappend
. But, since we chose an order-agnostic equality above, it satisfies the laws. Other choices for mappend
are appending the inner items in pairs as they are received, or strict ordering concatentation. I chose to use merge because merge handles unbalanced channels, especially infinite streams of data.
(mappend (a/to-chan (range)) (a/to-chan [:data])) ; => chan #{:data 0 1 2 3 ...}
Applicative
An Applicative is a container type with two functions: (pure v)
wraps a value into the applicative type. (fapply af av)
applies functions which are wrapped in the applicative to values wrapped in the applicative. For channels this means apply a channel of functions to a channel of values, like
(fapply (pure inc) (pure 5)) ;=> chan #{6}
;; Applicative Laws
;; Identity
(= c (fapply (pure identity) c))
;; Composition Application
(let [comp' (fn [f] (fn [g] (comp f g)))]
(= (fapply (fapply (fapply (pure comp') cf) cg) cv)
(fapply cf (fapply cg cv))))
;; Homomorphism
(= (fapply (pure f) (pure v)) (pure (f v)))
In pure
, we usually just wrap a value with a channel with the value on it and closed. We handle the case of (pure nil)
which is used as a throwaway value. We can’t return an empty channel because then no application will be called, so we return a namespaced nil keyword. As long as one does not rely on the actual value inside (pure nil)
, it satisfies all laws.
fapply
is defined using a/map
, which simply takes an element from each channel (af
and av
), and applies the function to it, which returns the application of the function from af
to the value from av
. It handles infinite channels and closes when one of the input channels closes.
(fapply (a/to-chan (map (fn [n] #(+ % n)) (range))) ; chan [#(+ % 0) #(+ % 1) ...]
(a/to-chan [1 2 3]))
;; => chan [1 3 5]
Monad
A Monad is a container type with two functions: (mreturn v)
acts just like (pure v)
. (mbind f mv)
binds a function of type a -> Monad b
with a type Monad a
. For channels, I believe this should act similar to lists, where bind will call f
on each value in the list, and concatenate the results.
;; Monad Laws
(= (f v) (bind f (return v)))
(= c (bind return c))
(= (bind #(bind g (f %)) c)
(bind g (bind f c)))
mreturn
is exactly pure
(and has the same ::nil
shortcut).
mbind
creates a comprehension of channel functions. We take an incoming channel of values mv
and call pipeline-async
that through the function #(a/pipe (bindf %1) %2)
. pipeline-async
allows us to call a function on each value of a channel asynchronously and put any number of values on the result channel. Here we call f
using bindf
(bindf
is defined to allow channel monad transformers), and put all the values of the returned channel onto the pipeline-async
channel.
This will allow usage of channel’s bind
for promise-like semantics as well as stream-like semantics. Here is a
;; Promise-like usage of channel
;; mlet is cats version of do syntax, it is a macro that reduces to bind/return
(mlet [query (create-query) ; => query is eventually a string
resp (http/get-request (str "http://google.com/?" query))]
(return (:body resp)))
And here is an example of channel comprehension akin to list comprehension using the stream-like mechanics of bind
.
(mlet [a (a/to-chan [1 5])
b (a/to-chan [a (* 2 a)])
c (+ a b)]
(return c))
;; => chan [2 3 10 15]
What’s cool with mlet
and channels is you can mix in :when
and :let
statements:
(mlet [a (a/to-chan (range 1 10)
:when (even? a)
:let [b (* a 2)]]
(return b)
;; => chan [4 8 12 16]
MonadZero & MonadPlus
MonadZero and MonadPlus are ways to use the Monoid and Semigroup interfaces in a monad context. Their implementations therefore follow similarly.
Monad Transformers
Monad transformers can be used with channel as the base monad. For instance, with the either monad:
(require '[cats.monad.either :as e]
'[cats.labs.channel :as [chan]]
'[cats.context :as ctx])
(def chaneither-m (e/either-t chan/context))
(ctx/with-context chaneither-m
(mlet [x (a/to-chan [(e/right 1)])
y (a/to-chan [(e/right 2)])]
(return (+ x y))))
;; chan [(right 3)]
(ctx/with-context chaneither-m
(mlet [x (a/to-chan [(e/right 1)])
y (a/to-chan [(e/left "error!")])]
(return (+ x y))))
;; chan [(left "error!")]
Conclusions
This post shows how some type theory protocols are implemented in cats.labs.channel
. What’s so cool about these implementations is they work together in a way to use these abstractions to perform both promise-style and stream-style processing. The Monad interface can be used to concatenate operations in a serial way. The MonadPlus and Semigroup operations can be used to concatenated streams of data in a parallel fashion. With Functor and Applicative, one can do synchronous transformations.
Footnotes:
- Recently, I submitted a PR to cats containing the semantics below. (follow-up PR)
- There are multiple possible semantics for
mappend
andmbind
, but these are my preferred ones given how channels are used. See the PR discussion. - This implementation of
cats.labs.channel
is released as[cats "1.3.0-SNAPSHOT"]
.