module Data.Machine.Concurrent.Buffer (
bufferConnect,
rollingConnect,
mediatedConnect, BufferRoom(..)
) where
#if defined(__GLASGOW_HASKELL__) && __GLASGOW_HASKELL__ < 710
import Control.Applicative ((<$>), (<*>))
#endif
import Control.Concurrent.Async.Lifted (wait, waitEither)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad (join, (>=>))
import Data.Machine.Concurrent.AsyncStep
import Data.Machine
import Data.Sequence (ViewL(..), (|>))
import qualified Data.Sequence as S
#if defined(__GLASGOW_HASKELL__) && __GLASGOW_HASKELL__ < 710
import Data.Traversable (traverse)
#endif
drain :: Monad m
=> MachineStep m k a
-> (MachineStep m k a -> m (MachineStep m k' a))
-> m (MachineStep m k' a)
drain z k = go z
where go Stop = return Stop
go (Yield o kd) = Yield o . MachineT . go <$> runMachineT kd
go aStep = k aStep
feedToBursting :: Monad m
=> MachineStep m k a
-> (Maybe (a, MachineT m k a) -> m (MachineStep m k b))
-> m (MachineStep m k b)
feedToBursting z k = go z
where go Stop = k Nothing
go (Await f kf ff) = return $
Await (\a -> go' (f a)) kf (go' ff)
go (Yield o kk) = k $ Just (o, kk)
go' step = MachineT $ runMachineT step >>= go
bufferConnect :: MonadBaseControl IO m
=> Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
bufferConnect n = mediatedConnect S.empty snoc view
where snoc acc x = (if S.length acc < n 1 then Vacancy else NoVacancy) $
acc |> x
view acc = case S.viewl acc of
EmptyL -> Nothing
x :< acc' -> Just (x, acc')
rollingConnect :: MonadBaseControl IO m
=> Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
rollingConnect n = mediatedConnect S.empty snoc view
where snoc acc x = Vacancy $ S.take (n1) acc |> x
view acc = case S.viewl acc of
EmptyL -> Nothing
x :< acc' -> Just (x, acc')
data BufferRoom a = NoVacancy a | Vacancy a deriving (Eq, Ord, Show)
mediatedConnect :: forall m t b k c. MonadBaseControl IO m
=> t -> (t -> b -> BufferRoom t) -> (t -> Maybe (b,t))
-> MachineT m k b -> ProcessT m b c -> MachineT m k c
mediatedConnect z snoc view src0 snk0 =
MachineT $ do srcFuture <- asyncRun src0
snkFuture <- asyncRun snk0
go z (Just srcFuture) snkFuture
where
go :: t
-> Maybe (AsyncStep m k b)
-> AsyncStep m (Is b) c
-> m (MachineStep m k c)
go acc src snk = maybe (Left <$> wait snk) (waitEither snk) src >>=
goStep acc . either (Right . (,src)) (Left . (,snk))
goAsync :: t
-> Maybe (MachineT m k b)
-> ProcessT m b c
-> m (MachineStep m k c)
goAsync acc src snk =
join $ go acc <$> traverse asyncRun src <*> asyncRun snk
goStep :: t -> Either (MachineStep m k b, AsyncStep m (Is b) c)
(MachineStep m (Is b) c, Maybe (AsyncStep m k b))
-> m (MachineStep m k c)
goStep acc step = case step of
Left (Stop, snk) -> go acc Nothing snk
Left (Await g kg fg, snk) ->
asyncAwait g kg fg (MachineT . flip (go acc) snk . Just)
Left (Yield o k, snk) -> case snoc acc o of
Vacancy acc' -> asyncRun k >>= flip (go acc') snk . Just
NoVacancy acc' ->
let go' snk' = do src' <- asyncRun k
goStep acc' (Right (snk', Just src'))
in wait snk >>= flip drain go'
Right (Stop, _) -> return Stop
Right (Yield o k, src) -> do
return $ Yield o (MachineT $ asyncRun k >>= go acc src)
Right (Await f Refl ff, src) ->
case view acc of
Nothing -> maybe (goAsync acc Nothing ff) (wait >=> demandSrc) src
Just (x, acc') -> asyncRun (f x) >>= go acc' src
where demandSrc = flip feedToBursting go'
go' Nothing = goAsync acc Nothing ff
go' (Just (o, k)) = goAsync acc (Just k) (f o)