module FRP.NetWire.Concurrent
(
(~*~),
(~&~),
(~+~)
)
where
import Control.Applicative
import Control.Arrow
import Control.Concurrent
import Control.Concurrent.STM
import Control.DeepSeq
import FRP.NetWire.Tools
import FRP.NetWire.Wire
(~*~) :: Wire IO a c -> Wire IO b d -> Wire IO (a, b) (c, d)
w1' ~*~ w2' =
mkGen $ \ws (x', y') -> do
(xVar, thr1) <- forkWire w1' ws x'
(yVar, thr2) <- forkWire w2' ws y'
(mx, w1) <- takeMVar xVar
(my, w2) <- takeMVar yVar
mapM_ killThread [thr1, thr2]
return (liftA2 (,) mx my, w1 ~*~ w2)
infixr 3 ~*~
(~&~) :: Wire IO a b -> Wire IO a c -> Wire IO a (b, c)
w1' ~&~ w2' = arr dup >>> w1' ~*~ w2'
infixr 3 ~&~
(~+~) :: NFData b => Wire IO a b -> Wire IO a b -> Wire IO a b
w1' ~+~ w2' =
mkGen $ \ws x' -> do
x1Var <- newEmptyTMVarIO
x2Var <- newEmptyTMVarIO
thr1 <- forkIO (toGen w1' ws x' >>= atomically . putTMVar x1Var)
thr2 <- forkIO (toGen w2' ws x' >>= atomically . putTMVar x2Var)
let res1 = do (mx, w1) <- takeTMVar x1Var; check (isRight mx); return (mx, w1 ~+~ w2')
res2 = do (mx, w2) <- takeTMVar x2Var; check (isRight mx); return (mx, w1' ~+~ w2)
noRes = do (mx1, w1) <- takeTMVar x1Var
(mx2, w2) <- takeTMVar x2Var
check (isLeft mx1 && isLeft mx2)
return (mx2, w1 ~+~ w2)
atomically (res1 <|> res2 <|> noRes) <* mapM_ killThread [thr1, thr2]
forkWire :: Wire IO a b -> WireState IO -> a -> IO (MVar (Output b, Wire IO a b), ThreadId)
forkWire w' ws x' = do
resultVar <- newEmptyMVar
thr <- forkIO (toGen w' ws x' >>= putMVar resultVar)
return (resultVar, thr)
isLeft :: Either e a -> Bool
isLeft = either (const True) (const False)
isRight :: Either e a -> Bool
isRight = either (const False) (const True)