{-# LANGUAGE Arrows #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE TypeFamilies #-}
module FRP.Rhine.Schedule.Concurrently where
import Control.Concurrent
import Control.Monad (void)
import Data.IORef
import Control.Monad.Trans.Class
import Control.Monad.Trans.MSF.Except
import Control.Monad.Trans.MSF.Maybe
import Control.Monad.Trans.MSF.Writer
import FRP.Rhine.Clock
import FRP.Rhine.Schedule
concurrently
:: ( Clock IO cl1, Clock IO cl2
, Time cl1 ~ Time cl2
)
=> Schedule IO cl1 cl2
concurrently = Schedule $ \cl1 cl2 -> do
iMVar <- newEmptyMVar
mvar <- newEmptyMVar
_ <- launchSubthread cl1 Left iMVar mvar
_ <- launchSubthread cl2 Right iMVar mvar
initTime <- takeMVar iMVar
_ <- takeMVar iMVar
return (constM $ takeMVar mvar, initTime)
where
launchSubthread cl leftright iMVar mvar = forkIO $ do
(runningClock, initTime) <- initClock cl
putMVar iMVar initTime
reactimate $ runningClock >>> second (arr leftright) >>> arrM (putMVar mvar)
concurrentlyWriter
:: ( Monoid w
, Clock (WriterT w IO) cl1
, Clock (WriterT w IO) cl2
, Time cl1 ~ Time cl2
)
=> Schedule (WriterT w IO) cl1 cl2
concurrentlyWriter = Schedule $ \cl1 cl2 -> do
iMVar <- lift newEmptyMVar
mvar <- lift newEmptyMVar
_ <- launchSubthread cl1 Left iMVar mvar
_ <- launchSubthread cl2 Right iMVar mvar
(initTime, w1) <- lift $ takeMVar iMVar
(_ , w2) <- lift $ takeMVar iMVar
tell w1
tell w2
return (constM (WriterT $ takeMVar mvar), initTime)
where
launchSubthread cl leftright iMVar mvar = lift $ forkIO $ do
((runningClock, initTime), w) <- runWriterT $ initClock cl
putMVar iMVar (initTime, w)
reactimate $ runWriterS runningClock >>> proc (w', (time, tag_)) ->
arrM (putMVar mvar) -< ((time, leftright tag_), w')
concurrentlyExcept
:: ( Clock (ExceptT e IO) cl1
, Clock (ExceptT e IO) cl2
, Time cl1 ~ Time cl2
)
=> Schedule (ExceptT e IO) cl1 cl2
concurrentlyExcept = Schedule $ \cl1 cl2 -> do
(iMVar, mvar, errorref) <- lift $ do
iMVar <- newEmptyMVar
mvar <- newEmptyMVar
errorref <- newIORef Nothing
_ <- launchSubThread cl1 Left iMVar mvar errorref
_ <- launchSubThread cl2 Right iMVar mvar errorref
return (iMVar, mvar, errorref)
catchAndDrain mvar $ do
initTime <- ExceptT $ takeMVar iMVar
_ <- ExceptT $ takeMVar iMVar
let runningSchedule = constM $ do
eTick <- lift $ takeMVar mvar
case eTick of
Right tick -> return tick
Left e -> do
lift $ writeIORef errorref $ Just e
throwE e
return (runningSchedule, initTime)
where
launchSubThread cl leftright iMVar mvar errorref = forkIO $ do
initialised <- runExceptT $ initClock cl
case initialised of
Right (runningClock, initTime) -> do
putMVar iMVar $ Right initTime
Left e <- runExceptT $ reactimate $ runningClock >>> proc (td, tag2) -> do
arrM (lift . putMVar mvar) -< Right (td, leftright tag2)
me <- constM (lift $ readIORef errorref) -< ()
_ <- throwMaybe -< me
returnA -< ()
putMVar mvar $ Left e
Left e -> void $ putMVar iMVar $ Left e
catchAndDrain mvar initScheduleAction = catchE initScheduleAction $ \e -> do
_ <- reactimate $ (constM $ ExceptT $ takeMVar mvar) >>> arr (const ())
throwE e
concurrentlyMaybe
:: ( Clock (MaybeT IO) cl1
, Clock (MaybeT IO) cl2
, Time cl1 ~ Time cl2
)
=> Schedule (MaybeT IO) cl1 cl2
concurrentlyMaybe = Schedule $ \cl1 cl2 -> initSchedule
(hoistSchedule exceptTIOToMaybeTIO concurrentlyExcept)
(HoistClock cl1 maybeTIOToExceptTIO)
(HoistClock cl2 maybeTIOToExceptTIO)
where
exceptTIOToMaybeTIO :: ExceptT () IO a -> MaybeT IO a
exceptTIOToMaybeTIO = exceptToMaybeT
maybeTIOToExceptTIO :: MaybeT IO a -> ExceptT () IO a
maybeTIOToExceptTIO = maybeToExceptT ()