{-# LANGUAGE CPP #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE Rank2Types #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE ScopedTypeVariables #-} #ifndef MIN_VERSION_base #define MIN_VERSION_base(x,y,z) 0 #endif ----------------------------------------------------------------------------- -- | -- Module : Data.Machine.Process -- Copyright : (C) 2012 Edward Kmett -- License : BSD-style (see the file LICENSE) -- -- Maintainer : Edward Kmett <ekmett@gmail.com> -- Stability : provisional -- Portability : Rank 2 Types, GADTs -- ---------------------------------------------------------------------------- module Data.Machine.Process ( -- * Processes Process , ProcessT , Automaton(..) , AutomatonM(..) , process -- ** Common Processes , (<~), (~>) , echo , supply , prepended , filtered , dropping , taking , droppingWhile , takingWhile , buffered , flattened , fold , fold1 , scan , scan1 , scanMap , asParts , sinkPart_ , autoM , final , finalOr , intersperse , largest , smallest , sequencing , mapping , traversing , reading , showing , strippingPrefix ) where import Control.Category import Control.Arrow (Kleisli(..)) import Control.Monad (liftM) import Data.Foldable hiding (fold) import Data.Machine.Is import Data.Machine.Plan import Data.Machine.Type import Data.Monoid import Data.Void import Prelude #if !(MIN_VERSION_base(4,8,0)) hiding (id, (.), foldr) #else hiding (id, (.)) #endif -- $setup -- >>> import Data.Machine.Source infixr 9 <~ infixl 9 ~> ------------------------------------------------------------------------------- -- Processes ------------------------------------------------------------------------------- -- | A @'Process' a b@ is a stream transducer that can consume values of type @a@ -- from its input, and produce values of type @b@ for its output. type Process a b = Machine (Is a) b -- | A @'ProcessT' m a b@ is a stream transducer that can consume values of type @a@ -- from its input, and produce values of type @b@ and has side-effects in the -- 'Monad' @m@. type ProcessT m a b = MachineT m (Is a) b -- | An 'Automaton' can be automatically lifted into a 'Process' class Automaton k where auto :: k a b -> Process a b instance Automaton (->) where auto = mapping instance Automaton Is where auto Refl = echo class AutomatonM x where autoT :: Monad m => x m a b -> ProcessT m a b instance AutomatonM Kleisli where autoT (Kleisli k) = autoM k -- | The trivial 'Process' that simply repeats each input it receives. -- -- This can be constructed from a plan with -- @ -- echo :: Process a a -- echo = repeatedly $ do -- i <- await -- yield i -- @ -- -- Examples: -- -- >>> run $ echo <~ source [1..5] -- [1,2,3,4,5] -- echo :: Process a a echo = loop where loop = encased (Await (\t -> encased (Yield t loop)) Refl stopped) {-# INLINABLE echo #-} -- | A 'Process' that prepends the elements of a 'Foldable' onto its input, then repeats its input from there. prepended :: Foldable f => f a -> Process a a prepended = before echo . traverse_ yield -- | A 'Process' that only passes through inputs that match a predicate. -- -- This can be constructed from a plan with -- @ -- filtered :: (a -> Bool) -> Process a a -- filtered p = repeatedly $ do -- i <- await -- when (p i) $ yield i -- @ -- -- Examples: -- -- >>> run $ filtered even <~ source [1..5] -- [2,4] -- filtered :: (a -> Bool) -> Process a a filtered p = loop where loop = encased $ Await (\a -> if p a then encased (Yield a loop) else loop) Refl stopped {-# INLINABLE filtered #-} -- | A 'Process' that drops the first @n@, then repeats the rest. -- -- This can be constructed from a plan with -- @ -- dropping n = before echo $ replicateM_ n await -- @ -- -- Examples: -- -- >>> run $ dropping 3 <~ source [1..5] -- [4,5] -- dropping :: Int -> Process a a dropping = loop where loop cnt | cnt <= 0 = echo | otherwise = encased (Await (\_ -> loop (cnt - 1)) Refl stopped) {-# INLINABLE dropping #-} -- | A 'Process' that passes through the first @n@ elements from its input then stops -- -- This can be constructed from a plan with -- @ -- taking n = construct . replicateM_ n $ await >>= yield -- @ -- -- Examples: -- -- >>> run $ taking 3 <~ source [1..5] -- [1,2,3] -- taking :: Int -> Process a a taking = loop where loop cnt | cnt <= 0 = stopped | otherwise = encased (Await (\v -> encased $ Yield v (loop (cnt - 1))) Refl stopped) {-# INLINABLE taking #-} -- | A 'Process' that passes through elements until a predicate ceases to hold, then stops -- -- This can be constructed from a plan with -- @ -- takingWhile :: (a -> Bool) -> Process a a -- takingWhile p = repeatedly $ await >>= \v -> if p v then yield v else stop -- @ -- -- Examples: -- -- >>> run $ takingWhile (< 3) <~ source [1..5] -- [1,2] -- takingWhile :: (a -> Bool) -> Process a a takingWhile p = loop where loop = encased $ Await (\a -> if p a then encased (Yield a loop) else stopped) Refl stopped {-# INLINABLE takingWhile #-} -- | A 'Process' that drops elements while a predicate holds -- -- This can be constructed from a plan with -- @ -- droppingWhile :: (a -> Bool) -> Process a a -- droppingWhile p = before echo loop where -- loop = await >>= \v -> if p v then loop else yield v -- @ -- -- Examples: -- -- >>> run $ droppingWhile (< 3) <~ source [1..5] -- [3,4,5] -- droppingWhile :: (a -> Bool) -> Process a a droppingWhile p = loop where loop = encased $ Await (\a -> if p a then loop else encased (Yield a echo)) Refl stopped {-# INLINABLE droppingWhile #-} -- | Chunk up the input into `n` element lists. -- -- Avoids returning empty lists and deals with the truncation of the final group. -- -- An approximation of this can be constructed from a plan with -- @ -- buffered :: Int -> Process a [a] -- buffered = repeatedly . go [] where -- go acc 0 = yield (reverse acc) -- go acc n = do -- i <- await <|> yield (reverse acc) *> stop -- go (i:acc) $! n-1 -- @ -- -- Examples: -- -- >>> run $ buffered 3 <~ source [1..6] -- [[1,2,3],[4,5,6]] -- -- >>> run $ buffered 3 <~ source [1..5] -- [[1,2,3],[4,5]] -- -- >>> run $ buffered 3 <~ source [] -- [] -- buffered :: Int -> Process a [a] buffered n = begin where -- The buffer is empty, if we don't get anything -- then we shouldn't yield at all. begin = encased $ Await (\v -> loop (v:) (n - 1)) Refl stopped -- The buffer (a diff list) contains elements, and -- we're at the requisite number, yield the -- buffer and restart loop dl 0 = encased $ Yield (dl []) begin -- The buffer contains elements and we're not yet -- done, continue waiting, but if we don't receive -- anything, then yield what we have and stop. loop dl r = encased $ Await (\v -> loop (dl . (v:)) (r - 1)) Refl (finish dl) -- All data has been retrieved, emit and stop. finish dl = encased $ Yield (dl []) stopped {-# INLINABLE buffered #-} -- | Build a new 'Machine' by adding a 'Process' to the output of an old 'Machine' -- -- @ -- ('<~') :: 'Process' b c -> 'Process' a b -> 'Process' a c -- ('<~') :: 'Process' c d -> 'Data.Machine.Tee.Tee' a b c -> 'Data.Machine.Tee.Tee' a b d -- ('<~') :: 'Process' b c -> 'Machine' k b -> 'Machine' k c -- @ (<~) :: Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c mp <~ ma = MachineT $ runMachineT mp >>= \v -> case v of Stop -> return Stop Yield o k -> return $ Yield o (k <~ ma) Await f Refl ff -> runMachineT ma >>= \u -> case u of Stop -> runMachineT $ ff <~ stopped Yield o k -> runMachineT $ f o <~ k Await g kg fg -> return $ Await (\a -> encased v <~ g a) kg (encased v <~ fg) {-# INLINABLE (<~) #-} -- | Flipped ('<~'). (~>) :: Monad m => MachineT m k b -> ProcessT m b c -> MachineT m k c ma ~> mp = mp <~ ma {-# INLINABLE (~>) #-} -- | Feed a 'Process' some input. -- -- Examples: -- -- >>> run $ supply [1,2,3] echo <~ source [4..6] -- [1,2,3,4,5,6] -- supply :: forall f m a b . (Foldable f, Monad m) => f a -> ProcessT m a b -> ProcessT m a b supply = foldr go id where go :: a -> (ProcessT m a b -> ProcessT m a b) -> ProcessT m a b -> ProcessT m a b go x r m = MachineT $ do v <- runMachineT m case v of Stop -> return Stop Await f Refl _ -> runMachineT $ r (f x) Yield o k -> return $ Yield o (go x r k) {-# INLINABLE supply #-} -- | -- Convert a machine into a process, with a little bit of help. -- -- @ -- choose :: 'Data.Machine.Tee.T' a b x -> (a, b) -> x -- choose t = case t of -- 'Data.Machine.Tee.L' -> 'fst' -- 'Data.Machine.Tee.R' -> 'snd' -- -- 'process' choose :: 'Data.Machine.Tee.Tee' a b c -> 'Data.Machine.Process.Process' (a, b) c -- 'process' choose :: 'Data.Machine.Tee.Tee' a b c -> 'Data.Machine.Process.Process' (a, b) c -- 'process' ('const' 'id') :: 'Data.Machine.Process.Process' a b -> 'Data.Machine.Process.Process' a b -- @ process :: Monad m => (forall a. k a -> i -> a) -> MachineT m k o -> ProcessT m i o process f (MachineT m) = MachineT (liftM f' m) where f' (Yield o k) = Yield o (process f k) f' Stop = Stop f' (Await g kir h) = Await (process f . g . f kir) Refl (process f h) -- | -- Construct a 'Process' from a left-scanning operation. -- -- Like 'fold', but yielding intermediate values. -- -- It may be useful to consider this alternative signature -- @ -- 'scan' :: (a -> b -> a) -> a -> Process b a -- @ -- -- For stateful 'scan' use 'auto' with "Data.Machine.Mealy" machine. -- This can be constructed from a plan with -- @ -- scan :: Category k => (a -> b -> a) -> a -> Machine (k b) a -- scan func seed = construct $ go seed where -- go cur = do -- yield cur -- next <- await -- go $! func cur next -- @ -- -- Examples: -- -- >>> run $ scan (+) 0 <~ source [1..5] -- [0,1,3,6,10,15] -- -- >>> run $ scan (\a _ -> a + 1) 0 <~ source [1..5] -- [0,1,2,3,4,5] -- scan :: Category k => (a -> b -> a) -> a -> Machine (k b) a scan func seed = let step t = t `seq` encased $ Yield t $ encased $ Await (step . func t) id stopped in step seed {-# INLINABLE scan #-} -- | -- 'scan1' is a variant of 'scan' that has no starting value argument -- -- This can be constructed from a plan with -- @ -- scan1 :: Category k => (a -> a -> a) -> Machine (k a) a -- scan1 func = construct $ await >>= go where -- go cur = do -- yield cur -- next <- await -- go $! func cur next -- @ -- -- Examples: -- -- >>> run $ scan1 (+) <~ source [1..5] -- [1,3,6,10,15] -- scan1 :: Category k => (a -> a -> a) -> Machine (k a) a scan1 func = let step t = t `seq` encased $ Yield t $ encased $ Await (step . func t) id stopped in encased $ Await step id stopped {-# INLINABLE scan1 #-} -- | -- Like 'scan' only uses supplied function to map and uses Monoid for -- associative operation -- -- Examples: -- -- >>> run $ mapping getSum <~ scanMap Sum <~ source [1..5] -- [0,1,3,6,10,15] -- scanMap :: (Category k, Monoid b) => (a -> b) -> Machine (k a) b scanMap f = scan (\b a -> mappend b (f a)) mempty {-# INLINABLE scanMap #-} -- | -- Construct a 'Process' from a left-folding operation. -- -- Like 'scan', but only yielding the final value. -- -- It may be useful to consider this alternative signature -- @ -- 'fold' :: (a -> b -> a) -> a -> Process b a -- @ -- -- This can be constructed from a plan with -- @ -- fold :: Category k => (a -> b -> a) -> a -> Machine (k b) a -- fold func seed = construct $ go seed where -- go cur = do -- next <- await <|> yield cur *> stop -- go $! func cur next -- @ -- -- Examples: -- -- >>> run $ fold (+) 0 <~ source [1..5] -- [15] -- -- >>> run $ fold (\a _ -> a + 1) 0 <~ source [1..5] -- [5] -- fold :: Category k => (a -> b -> a) -> a -> Machine (k b) a fold func = let step t = t `seq` encased $ Await (step . func t) id (encased $ Yield t stopped) in step {-# INLINABLE fold #-} -- | -- 'fold1' is a variant of 'fold' that has no starting value argument -- -- This can be constructed from a plan with -- @ -- fold1 :: Category k => (a -> a -> a) -> Machine (k a) a -- fold1 func = construct $ await >>= go where -- go cur = do -- next <- await <|> yield cur *> stop -- go $! func cur next -- @ -- -- Examples: -- -- >>> run $ fold1 (+) <~ source [1..5] -- [15] -- fold1 :: Category k => (a -> a -> a) -> Machine (k a) a fold1 func = let step t = t `seq` encased $ Await (step . func t) id (encased $ Yield t stopped) in encased $ Await step id stopped {-# INLINABLE fold1 #-} -- | Break each input into pieces that are fed downstream -- individually. -- -- This can be constructed from a plan with -- @ -- asParts :: Foldable f => Process (f a) a -- asParts = repeatedly $ await >>= traverse_ yield -- @ -- -- Examples: -- -- >>> run $ asParts <~ source [[1..3],[4..6]] -- [1,2,3,4,5,6] -- asParts :: Foldable f => Process (f a) a asParts = let step = encased $ Await (foldr (\b s -> encased (Yield b s)) step) id stopped in step {-# INLINABLE asParts #-} -- | Break each input into pieces that are fed downstream -- individually. -- -- Alias for @asParts@ -- flattened :: Foldable f => Process (f a) a flattened = asParts {-# INLINABLE flattened #-} -- | @sinkPart_ toParts sink@ creates a process that uses the -- @toParts@ function to break input into a tuple of @(passAlong, -- sinkPart)@ for which the second projection is given to the supplied -- @sink@ 'ProcessT' (that produces no output) while the first -- projection is passed down the pipeline. sinkPart_ :: Monad m => (a -> (b,c)) -> ProcessT m c Void -> ProcessT m a b sinkPart_ p = go where go m = MachineT $ runMachineT m >>= \v -> case v of Stop -> return Stop Yield o _ -> absurd o Await f Refl ff -> return $ Await (\x -> let (keep,sink) = p x in encased . Yield keep $ go (f sink)) Refl (go ff) -- | Apply a monadic function to each element of a 'ProcessT'. -- -- This can be constructed from a plan with -- @ -- autoM :: Monad m => (a -> m b) -> ProcessT m a b -- autoM :: (Category k, Monad m) => (a -> m b) -> MachineT m (k a) b -- autoM f = repeatedly $ await >>= lift . f >>= yield -- @ -- -- Examples: -- -- >>> runT $ autoM Left <~ source [3, 4] -- Left 3 -- -- >>> runT $ autoM Right <~ source [3, 4] -- Right [3,4] -- autoM :: (Category k, Monad m) => (a -> m b) -> MachineT m (k a) b autoM f = loop where loop = encased (Await (\t -> MachineT (flip Yield loop `liftM` f t)) id stopped) {-# INLINABLE autoM #-} -- | -- Skip all but the final element of the input -- -- This can be constructed from a plan with -- @ -- 'final' :: 'Process' a a -- final :: Category k => Machine (k a) a -- final = construct $ await >>= go where -- go prev = do -- next <- await <|> yield prev *> stop -- go next -- @ -- -- Examples: -- -- >>> runT $ final <~ source [1..10] -- [10] -- >>> runT $ final <~ source [] -- [] -- final :: Category k => Machine (k a) a final = let step x = encased (Await step id (emit x)) emit x = encased (Yield x stopped) in encased $ Await step id stopped {-# INLINABLE final #-} -- | -- Skip all but the final element of the input. -- If the input is empty, the default value is emitted -- -- This can be constructed from a plan with -- @ -- 'finalOr' :: a -> 'Process' a a -- finalOr :: Category k => a -> Machine (k a) a -- finalOr = construct . go where -- go prev = do -- next <- await <|> yield prev *> stop -- go next -- @ -- -- Examples: -- -- >>> runT $ finalOr (-1) <~ source [1..10] -- [10] -- >>> runT $ finalOr (-1) <~ source [] -- [-1] -- finalOr :: Category k => a -> Machine (k a) a finalOr = let step x = encased (Await step id (emit x)) emit x = encased (Yield x stopped) in step {-# INLINABLE finalOr #-} -- | -- Intersperse an element between the elements of the input -- -- @ -- 'intersperse' :: a -> 'Process' a a -- @ intersperse :: Category k => a -> Machine (k a) a intersperse sep = construct $ await >>= go where go cur = do yield cur next <- await yield sep go next -- | -- Return the maximum value from the input largest :: (Category k, Ord a) => Machine (k a) a largest = fold1 max {-# INLINABLE largest #-} -- | -- Return the minimum value from the input smallest :: (Category k, Ord a) => Machine (k a) a smallest = fold1 min {-# INLINABLE smallest #-} -- | -- Convert a stream of actions to a stream of values -- -- This can be constructed from a plan with -- @ -- sequencing :: Monad m => (a -> m b) -> ProcessT m a b -- sequencing :: (Category k, Monad m) => MachineT m (k (m a)) a -- sequencing = repeatedly $ do -- ma <- await -- a <- lift ma -- yield a -- @ -- -- Examples: -- -- >>> runT $ sequencing <~ source [Just 3, Nothing] -- Nothing -- -- >>> runT $ sequencing <~ source [Just 3, Just 4] -- Just [3,4] -- sequencing :: (Category k, Monad m) => MachineT m (k (m a)) a sequencing = autoM id {-# INLINABLE sequencing #-} -- | -- Apply a function to all values coming from the input -- -- This can be constructed from a plan with -- @ -- mapping :: Category k => (a -> b) -> Machine (k a) b -- mapping f = repeatedly $ await >>= yield . f -- @ -- -- Examples: -- -- >>> runT $ mapping (*2) <~ source [1..3] -- [2,4,6] -- mapping :: Category k => (a -> b) -> Machine (k a) b mapping f = loop where loop = encased (Await (\t -> encased (Yield (f t) loop)) id stopped) {-# INLINABLE mapping #-} -- | -- Apply an effectful to all values coming from the input. -- -- Alias to 'autoM'. traversing :: (Category k, Monad m) => (a -> m b) -> MachineT m (k a) b traversing = autoM -- | -- Parse 'Read'able values, only emitting the value if the parse succceeds. -- This 'Machine' stops at first parsing error reading :: (Category k, Read a) => Machine (k String) a reading = repeatedly $ do s <- await case reads s of [(a, "")] -> yield a _ -> stop -- | -- Convert 'Show'able values to 'String's showing :: (Category k, Show a) => Machine (k a) String showing = mapping show {-# INLINABLE showing #-} -- | -- 'strippingPrefix' @mp mb@ Drops the given prefix from @mp@. It stops if @mb@ -- did not start with the prefix given, or continues streaming after the -- prefix, if @mb@ did. strippingPrefix :: (Eq b, Monad m) => MachineT m (k a) b -> MachineT m (k a) b -> MachineT m (k a) b strippingPrefix mp mb = MachineT $ runMachineT mp >>= \v -> case v of Stop -> runMachineT mb Yield b k -> verify b k mb Await f ki ff -> return $ Await (\a -> strippingPrefix (f a) mb) ki (strippingPrefix ff mb) where verify b nxt cur = runMachineT cur >>= \u -> case u of Stop -> return Stop Yield b' nxt' | b == b' -> runMachineT $ strippingPrefix nxt nxt' | otherwise -> return Stop Await f ki ff -> return $ Await (MachineT . verify b nxt . f) ki (MachineT $ verify b nxt ff)