module FRP.Reactivity.AlternateEvent (Event(..), eFromML, nonDispatchedIO, runEvent,
EventStream(..), filterMaybes, delay, delays,
fmap', scan') where
import qualified Data.Map as M
import Data.Unique
import Data.IORef
import Data.Typeable
import Data.Time.Clock.POSIX
import Data.Maybe
import Data.Monoid
import Data.Traversable
import Control.Monad hiding (mapM)
import Control.Monad.Reader hiding (mapM)
import Control.Applicative
import Control.Concurrent
import Control.Monad.Catch
import System.IO.Unsafe
import FRP.Reactivity.Measurement
import FRP.Reactivity.RPC
import Prelude hiding (mapM)
newtype Event t = Event { unEvent :: (t -> POSIXTime -> RPC ()) -> RPC (RPC ()) } deriving (Typeable, Functor)
eFromML :: [Measurement t] -> Event t
eFromML ls = Event (\f -> liftM (_nonDispatchedIO . killThread) $ rpcFork (mapM_ (\meas -> _nonDispatchedIO (getValue meas) >>= uncurry f) ls))
instance Monad Event where
return x = Event (\f -> f x 0 >> return (return ()))
Event f >>= g = Event (\h -> do
ref <- _nonDispatchedIO (newIORef (return ()))
liftM (\m -> m >> join (_nonDispatchedIO (readIORef ref))) (f (get2 ref h))) where
get h t x' t' = h x' (max t t')
get2 ref h x t = unEvent (g x) (get h t) >>= \m -> _nonDispatchedIO (atomicModifyIORef ref (\m2 -> (m2 >> m, ())))
fail _ = mzero
instance MonadPlus Event where
mzero = Event (\_ -> return (return ()))
mplus e e2 = Event (\f -> liftM2 (>>) (unEvent e f) (unEvent e2 f))
instance Monoid (Event t) where
mempty = mzero
mappend = mplus
instance MonadIO Event where
liftIO m = Event (\f -> liftIO m >>= \x ->
_nonDispatchedIO (measure (return ())) >>= \meas ->
f x (time meas) >> return (return ()))
nonDispatchedIO m = Event (\f -> _nonDispatchedIO (measure m >>= getValue) >>= uncurry f >> return (return ()))
runEvent :: Event t -> (t -> POSIXTime -> IO ()) -> IO ()
runEvent e f = void $ do
mv <- newEmptyMVar
forkIO $ void $ runReaderT (unRPC $ unEvent e (\x t -> liftIO (f x t))) (False, mv)
rpcServer mv
instance Applicative Event where
pure = return
(<*>) = ap
instance Alternative Event where
empty = mzero
(<|>) = mplus
class (Functor e, MonadPlus e) => EventStream e where
eventFromList :: [(t, POSIXTime)] -> e t
scan :: (t -> u -> (t, v)) -> t -> e u -> e v
switch :: e (e t) -> e t
withRemainder :: e t -> e (t, e t)
channel :: IO (t -> IO (), e t)
adjoinTime :: e t -> e (t, POSIXTime)
retryLoop ref =
_nonDispatchedIO (readIORef ref) >>= either (\_ -> _nonDispatchedIO yield >> retryLoop ref) id
eventSwitch e = Event (\f -> do
ref <- _nonDispatchedIO (newIORef (Right (return ())))
liftM (\m -> m >> retryLoop ref) (unEvent e (handler f ref))) where
handler f ref e' t = do
ei <- _nonDispatchedIO (atomicModifyIORef ref (\m -> (Left (), m)))
either
(\_ -> _nonDispatchedIO yield >> handler f ref e' t)
(\m -> do
m
m' <- rpcFinally
(unEvent e' f)
(_nonDispatchedIO (writeIORef ref (Right (return ()))))
_nonDispatchedIO (writeIORef ref (Right m')))
ei
instance EventStream Event where
eventFromList = eFromML . fromList
scan f x e = Event (\g -> do
ref <- _nonDispatchedIO (newIORef x)
unEvent e (\y t -> _nonDispatchedIO (atomicModifyIORef ref (\x -> let (x', z) = f x y in (x', z))) >>= \z -> g z t))
switch = eventSwitch
withRemainder e = Event (\f -> unEvent e (\x t -> f (x, e) t))
channel = do
ref <- newIORef M.empty
return (add ref, e ref)
where
add ref x = measure (return ()) >>= \meas -> readIORef ref >>= \mp -> void $ mapM (\f -> f x (time meas)) mp
unsub ref un = _nonDispatchedIO $ atomicModifyIORef ref (\mp -> (M.delete un mp, ()))
e ref = Event (\f -> _nonDispatchedIO newUnique >>= \un ->
RPC ask >>= \(_, mv) ->
_nonDispatchedIO (atomicModifyIORef ref (\mp -> (M.insert un (\x t -> runReaderT (unRPC (f x t)) (False, mv)) mp, ())))
>> return (unsub ref un))
adjoinTime e = Event (\f -> unEvent e (\x t -> f (x, t) t))
filterMaybes :: (MonadPlus m) => m (Maybe t) -> m t
filterMaybes e = e >>= \x -> guard (isJust x) >> return (fromJust x)
delay t e = filterMaybes $ adjoinTime (return Nothing `mplus` fmap Just e) >>= \(x, t') -> eventFromList [(x,t+t')]
delays :: (EventStream e, MonadIO e) => e (t, POSIXTime) -> e t
delays e = e >>= \(x, t) -> eventFromList [(x, t)]
fmap' :: (Measurement t -> Measurement u) -> [Measurement t] -> Event u
fmap' f e = scan' (\_ x -> let y = f x in (y, y)) (fmap (const undefined) (head e)) e
scan' f x e = delays $ fmap (\x -> (copoint x, time x)) $ scan (\y z -> f y (unsafePerformIO $ assertMeasurement (return z)))
x
(adjoinTime (eFromML e))