{-# LANGUAGE ExplicitForAll #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Dataflow.Vertices (
statefulVertex,
statelessVertex,
outputTVar
) where
import Control.Concurrent.STM.TVar (TVar, modifyTVar')
import Control.Monad.STM (atomically)
import Control.Monad.Trans.Class (lift)
import Data.Typeable (Typeable)
import Dataflow.Primitives (Dataflow (..), Edge, StateRef,
Timestamp (..), Vertex (..),
newState, registerFinalizer,
registerVertex)
import Prelude
statefulVertex :: Typeable i =>
state
-> (StateRef state -> Timestamp -> i -> Dataflow ())
-> (StateRef state -> Timestamp -> Dataflow ())
-> Dataflow (Edge i)
statefulVertex initState callback finalizer = do
stateRef <- newState initState
registerFinalizer $ finalizer stateRef
registerVertex $ StatefulVertex stateRef callback
statelessVertex :: Typeable i => (Timestamp -> i -> Dataflow ()) -> Dataflow (Edge i)
statelessVertex callback = registerVertex $ StatelessVertex callback
{-# NOINLINE outputTVar #-}
outputTVar :: Typeable o => (o -> w -> w) -> TVar w -> Dataflow (Edge o)
outputTVar op register = statelessVertex $ \_ x -> Dataflow $ lift $ atomically $ modifyTVar' register (op x)