{-# LANGUAGE TypeFamilies #-} {-# LANGUAGE InstanceSigs #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE BlockArguments #-} {-# LANGUAGE NoMonomorphismRestriction #-} -- | Datatypes and definitions used by Churro library. -- -- Expand instances for additional documentation! module Control.Churro.Types where import Prelude hiding (id, (.)) import Control.Arrow import Control.Category import Control.Concurrent.Async (cancel, wait, Async, async) import Data.Void import Control.Exception (finally) -- $setup -- -- We import the library for testing, although this would be a circular import in the module itself. -- -- >>> import Control.Churro -- ** Data, Classes and Instances -- | The core datatype for the library. -- -- Parameters `t`, `i` and `o` represent the transport, input, and output types respectively. -- -- The items on transports are wrapped in `Maybe` to allow signalling of completion of a source. -- -- When building a program by composing Churros, the output Transport of one -- Churro is fed into the input Transports of other Churros. -- -- Type families are used to allow the in/out channels to have different types -- and prevent accidentally reading/writing from the wrong transport. -- -- Convenience types of `Source`, `Sink`, and `DoubleDipped` are also defined, -- although use is not required. -- data Churro a t i o = Churro { runChurro :: IO (In t (Maybe i), Out t (Maybe o), Async a) } type Source a t o = Churro a t Void o type Sink a t i = Churro a t i Void type DoubleDipped a t = Churro a t Void Void -- | The transport method is abstracted via the Transport class -- -- This allows use of pure or impure channels, such as: -- -- * Chan (Included in `Control.Churro.Transport.Chan`) -- * TChan -- * Seq -- * Unagi -- * Various buffered options -- -- Transports used in conjunction with Churros wrap items in Maybe so that once -- a source has been depleted it can signal completion with a Nothing item. -- -- The flex method returns two transports, so that channels such as unagi that -- create an in/outs pair can have a Transport instance. -- -- Channels like Chan that have a single channel act as in/out simply reuse the -- same channel in the pair returned. -- class Transport (t :: * -> *) where data In t :: * -> * data Out t :: * -> * flex :: IO (In t a, Out t a) -- ^ Create a new pair of transports. yank :: Out t a -> IO a -- ^ Yank an item of the Transport yeet :: In t a -> a -> IO () -- ^ Yeet an item onto the Transport -- | Covariant functor instance for Churro - Maps over the output. -- -- >>> let s = sourceList [1,2] -- >>> runWaitChan $ s >>> sinkPrint -- 1 -- 2 -- -- >>> runWaitChan $ fmap succ s >>> sinkPrint -- 2 -- 3 instance Transport t => Functor (Churro a t i) where fmap f c = Churro do (i,o,a) <- runChurro c (i',o') <- flex a' <- async do finally' (cancel a) do c2c f o i' wait a return (i,o',a') -- | The Category instance allows for the creation of Churro pipelines. -- -- All other examples of the form `a >>> b` use this instance. -- -- The `id` method creates a passthrough arrow. -- There isn't usually a reason to use `id` directly as it has no effect: -- -- >>> runWaitChan $ pure 1 >>> id >>> id >>> id >>> sinkPrint -- 1 instance (Transport t, Monoid a) => Category (Churro a t) where id = Churro do (i,o) <- flex a <- async mempty return (i,o,a) g . f = f >>>> g -- | Category style composition that allows for return type to change downstream. -- (>>>>) :: (Transport t, fo ~ gi) => Churro a1 t fi fo -> Churro a2 t gi go -> Churro a2 t fi go f >>>> g = Churro do (fi, fo, fa) <- runChurro f (gi, go, ga) <- runChurro g a <- async do c2c id fo gi b <- async do finally' (cancel a >> cancel fa >> cancel ga) do r <- wait ga cancel fa cancel a return r return (fi, go, b) -- | The Applicative instance allows for pairwise composition of Churro pipelines. -- Once again this is covariat and the composition occurs on the output transports of the Churros. -- -- The `pure` method allows for the creation of a Churro yielding a single item. -- -- TODO: Generalise () to a Monoid constraint. -- instance Transport t => Applicative (Churro () t Void) where pure = pure' f <*> g = buildChurro \_i o -> do (_fi, fo, fa) <- runChurro f (_gi, go, ga) <- runChurro g let prog :: IO () prog = do fx <- yank fo gx <- yank go case (fx, gx) of (Just f', Just g') -> (yeet o $ Just (f' g')) >> prog _ -> return () -- TODO: Should we cancel asyncs here in finally block? prog yeet o Nothing wait fa wait ga -- | More general variant of `pure` with Monoid constraint. pure' :: (Transport t, Monoid a) => o -> Churro a t i o pure' x = buildChurro \_i o -> yeet o (Just x) >> yeet o Nothing >> return mempty -- | The Arrow instance allows for building non-cyclic directed graphs of churros. -- -- The `arr` method allows for the creation of a that maps items with a pure function. -- This is equivalent to `fmap f id`. This is more general and exposed via arr`. -- -- >>> :set -XArrows -- >>> :{ -- let sect = process $ \x@(_x,_y,z) -> print x >> return z -- graph = -- proc i -> do -- j <- arr succ -< i -- k <- arr show -< j -- l <- arr succ -< j -- m <- arr (> 5) -< j -- n <- sect -< (k,l,m) -- o <- arr not -< n -- p <- delay 0.1 -< o -- sinkPrint -< p -- in -- runWaitChan $ sourceList [1,5,30] >>> graph -- :} -- ("2",3,False) -- ("6",7,True) -- ("31",32,True) -- True -- False -- False -- -- The other Arrow methods are also usable: -- -- >>> runWaitChan $ pure 1 >>> (arr show &&& arr succ) >>> sinkPrint -- ("1",2) instance Transport t => Arrow (Churro () t) where arr = arr' first c = Churro do (i,o,a) <- runChurro c (ai',ao') <- flex (bi',bo') <- flex let go = do is <- yank ao' yeet i (fmap fst is) os <- yank o yeet bi' $ (,) <$> os <*> fmap snd is case (is, os) of (Just _, Just _) -> go _ -> return () a' <- async do go yeet bi' Nothing wait a return (ai',bo',a') -- | More general version of `arr`. -- -- Useful when building pipelines that need to work with return types. arr' :: (Functor (cat a), Category cat) => (a -> b) -> cat a b arr' f = fmap f id -- ** Helpers -- | A helper to facilitate constructing a Churro that makes new input and output transports available for manipulation. -- -- The manipulations performed are carried out in the async action associated with the Churro -- buildChurro :: Transport t => (Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o buildChurro cb = buildChurro' \_o' i o -> cb i o -- | A version of `buildChurro` that also passes the original input to the callback so that you can reschedule items. -- -- Used by "retry" style functions. -- buildChurro' :: Transport t => (In t (Maybe i) -> Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o buildChurro' cb = Churro do (ai,ao) <- flex (bi,bo) <- flex a <- async do cb ai ao bi return (ai,bo,a) -- | Yeet all items from a list into a transport. -- yeetList :: (Foldable f, Transport t) => In t a -> f a -> IO () yeetList t = mapM_ (yeet t) -- | Yank all items from a Raw transport into a list. -- -- Won't terminate until the transport has been consumed. -- yankList :: Transport t => Out t (Maybe a) -> IO [a] yankList = flip yankAll (pure . pure) -- | Yank each item from a transport into a callback. -- yankAll :: (Transport t, Monoid a) => Out t (Maybe i) -> (i -> IO a) -> IO a yankAll c f = do x <- yank c case x of Nothing -> mempty Just y -> f y <> yankAll c f -- | Yank each raw item from a transport into a callback. -- -- The items are wrapped in Maybes and when all items are yanked, Nothing is fed to the callback. -- yankAll' :: (Transport t, Monoid b) => Out t (Maybe a) -> (Maybe a -> IO b) -> IO b yankAll' c f = do x <- yankAll c (f . Just) y <- f Nothing return (x <> y) -- | Yank then Yeet each item from one Transport into another. -- -- Raw items are used so `Nothing` should be Yeeted once the transport is depleted. -- c2c :: Transport t => (a -> b) -> Out t (Maybe a) -> In t (Maybe b) -> IO () c2c f o i = yankAll' o (yeet i . fmap f) -- | Flipped `finally`. finally' :: IO b -> IO a -> IO a finally' = flip finally