module Data.Machine.Concurrent.Fanout (fanout, fanoutSteps) where
import Control.Arrow (first, second)
import Control.Concurrent.Async.Lifted (Async, async, wait)
import Control.Monad (foldM)
import Control.Monad.Trans.Control (MonadBaseControl, StM)
import Data.Machine (Step(..), MachineT(..), encased, stopped, ProcessT, Is(..))
import Data.Machine.Concurrent.AsyncStep (MachineStep)
import Data.Maybe (catMaybes)
#if defined(__GLASGOW_HASKELL__) && __GLASGOW_HASKELL__ < 710
import Data.Monoid (Monoid, mempty, mconcat)
#endif
import Data.Semigroup (Semigroup(sconcat))
import Data.List.NonEmpty (NonEmpty((:|)))
#if defined(__GLASGOW_HASKELL__) && __GLASGOW_HASKELL__ < 710
import Data.Coerce (coerce)
#endif
feed :: forall m a b. MonadBaseControl IO m
=> a -> ProcessT m a b
-> m (Async (StM m (MachineStep m (Is a) b)))
feed x m = async $ runMachineT m >>= \(v :: MachineStep m (Is a) b) ->
case v of
Await f Refl _ -> runMachineT (f x)
s -> return s
mapAccumLM :: MonadBaseControl IO m
=> (acc -> x -> m (acc, y)) -> acc -> [Async (StM m x)]
-> m (acc, [y])
mapAccumLM f z = fmap (second ($ [])) . foldM aux (z,id)
where aux (acc,ys) x = do (yielded, nxt) <- wait x >>= f acc
return $ (yielded, (nxt:) . ys)
flushYields :: Monad m
=> Step k o (MachineT m k o) -> m ([o], Maybe (MachineT m k o))
flushYields = go id
where go rs (Yield o s) = runMachineT s >>= go ((o:) . rs)
go rs Stop = return (rs [], Nothing)
go rs s = return (rs [], Just $ encased s)
fanout :: (MonadBaseControl IO m, Semigroup r)
=> [ProcessT m a r] -> ProcessT m a r
fanout [] = stopped
fanout xs = encased $ Await (MachineT . aux) Refl (fanout xs)
where aux y = do (rs,xs') <- mapM (feed y) xs >>= mapAccumLM yields []
let nxt = fanout $ catMaybes xs'
case rs of
[] -> runMachineT nxt
(r:rs') -> return $ Yield (sconcat $ r :| rs') nxt
yields rs Stop = return (rs,Nothing)
yields rs y@Yield{} = first (++ rs) <$> flushYields y
yields rs a@Await{} = return (rs, Just $ encased a)
fanoutSteps :: (MonadBaseControl IO m, Monoid r)
=> [ProcessT m a r] -> ProcessT m a r
fanoutSteps [] = stopped
fanoutSteps xs = encased $ Await (MachineT . aux) Refl (fanoutSteps xs)
where aux y = do (rs,xs') <- mapM (feed y) xs >>= mapAccumLM yields []
let nxt = fanoutSteps $ catMaybes xs'
if null rs
then return $ Yield mempty nxt
else return $ Yield (mconcat rs) nxt
yields rs Stop = return (rs,Nothing)
yields rs y@Yield{} = first (++rs) <$> flushYields y
yields rs a@Await{} = return (rs, Just $ encased a)