module Data.Machine.Concurrent (module Data.Machine,
(>~>), (<~<),
bufferConnect, rollingConnect,
fanout, fanoutSteps,
wye, tee, scatter, splitSum, mergeSum,
splitProd) where
#if defined(__GLASGOW_HASKELL__) && __GLASGOW_HASKELL__ < 710
import Control.Applicative
#endif
import Control.Concurrent.Async.Lifted
import Control.Monad (join)
import Control.Monad.Trans.Control
import Data.Machine hiding (tee, wye)
import Data.Machine.Concurrent.AsyncStep
import Data.Machine.Concurrent.Buffer
import Data.Machine.Concurrent.Fanout
import Data.Machine.Concurrent.Scatter
import Data.Machine.Concurrent.Wye
import Data.Machine.Concurrent.Tee
(<~<) :: MonadBaseControl IO m
=> ProcessT m b c -> MachineT m k b -> MachineT m k c
mp <~< ma = racers ma mp
(>~>) :: MonadBaseControl IO m
=> MachineT m k b -> ProcessT m b c -> MachineT m k c
ma >~> mp = mp <~< ma
infixl 7 >~>
waitEither' :: MonadBaseControl IO m
=> Maybe (Async (StM m a)) -> Async (StM m b)
-> m (Either a b)
waitEither' Nothing y = Right <$> wait y
waitEither' (Just x) y = waitEither x y
racers :: forall m k a b. MonadBaseControl IO m
=> MachineT m k a -> ProcessT m a b -> MachineT m k b
racers src snk = MachineT . join $
go <$> (Just <$> asyncRun src) <*> asyncRun snk
where go :: Maybe (AsyncStep m k a)
-> AsyncStep m (Is a) b
-> m (MachineStep m k b)
go srcA snkA =
waitEither' srcA snkA >>= \n -> case n of
Left (Stop :: MachineStep m k a) -> go Nothing snkA
Left (Yield o k) -> wait snkA >>= \m -> case m of
(Stop :: MachineStep m (Is a) b) -> return Stop
Yield o' k' -> return . Yield o' . MachineT . flushDown k' $
\f -> join $ go <$> (Just <$> asyncRun k)
<*> asyncRun (f o)
Await f Refl _ -> join $ go <$> (Just <$> asyncRun k)
<*> asyncRun (f o)
Left (Await g kg fg) -> asyncAwait g kg fg $
MachineT . flip go snkA . Just
Right (Stop :: MachineStep m (Is a) b) -> return Stop
Right (Yield o k) -> asyncRun k >>=
return . Yield o . MachineT . go srcA
Right (Await f Refl ff) -> case srcA of
Nothing -> asyncRun ff >>= go Nothing
Just src' -> wait src' >>= \m -> case m of
Stop -> return Stop
Yield o k -> join $ go <$> (Just <$> asyncRun k)
<*> asyncRun (f o)
a -> feedUp (encased a) $ \o k -> join $
go <$> (Just <$> asyncRun k) <*> asyncRun (f o)
flushDown :: ProcessT m a b
-> ((a -> ProcessT m a b) -> m (MachineStep m k b))
-> m (MachineStep m k b)
flushDown m k = runMachineT m >>= \s -> case s of
Stop -> return Stop
Yield o m' -> return . Yield o . MachineT $ flushDown m' k
Await f Refl _ -> k f
feedUp :: MachineT m k a
-> (a -> MachineT m k a -> m (MachineStep m k b))
-> m (MachineStep m k b)
feedUp m k = runMachineT m >>= \s -> case s of
Stop -> return Stop
Yield o m' -> k o m'
Await g kg fg -> return $ awaitStep g kg fg (MachineT . flip feedUp k)