module Test.Dataflow (
runDataflow,
runDataflowMany
) where
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO,
readTVarIO)
import Control.Monad (foldM_)
import Control.Monad.IO.Class (MonadIO (..))
import Control.Monad.STM (atomically)
import Control.Monad.Trans.Class (lift)
import Dataflow (Edge, compile, execute,
modifyState, readState,
statefulVertex, writeState)
import Dataflow.Primitives (Dataflow (Dataflow))
import Prelude
runDataflow :: MonadIO io => (Edge o -> Dataflow (Edge i)) -> [i] -> io [o]
runDataflow dataflow inputs = head <$> runDataflowMany dataflow [inputs]
runDataflowMany :: MonadIO io => (Edge o -> Dataflow (Edge i)) -> [[i]] -> io [[o]]
runDataflowMany dataflow inputs =
liftIO $ do
out <- newTVarIO []
program <- compile (dataflow =<< outputTVarNestedList out)
foldM_ (flip execute) program inputs
reverse <$> readTVarIO out
where
outputTVarNestedList register =
statefulVertex []
(\sref _ x -> modifyState sref (x :))
(\sref _ -> do
state <- readState sref
Dataflow $ lift $ atomically $ modifyTVar' register (reverse state :)
writeState sref []
)