module FRP.Reactivity.AlternateEvent (Event(..), eFromML, nonDispatchedIO, runEvent,
EventStream(..), filterMaybes, delay, delays,
fmap', scan') where
import qualified Data.Map as M
import Data.Unique
import Data.IORef
import Data.Typeable
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Reader
import Control.Applicative
import Control.Concurrent
import Control.Monad.Catch
import System.IO.Unsafe
import Data.Time.Clock.POSIX
import Data.Maybe
import FRP.Reactivity.Measurement
import FRP.Reactivity.RPC
newtype Event t = Event { unEvent :: (t -> POSIXTime -> RPC ()) -> RPC (RPC ()) } deriving (Typeable, Functor)
eFromML :: [Measurement t] -> Event t
eFromML ls = Event (\f -> liftM (_nonDispatchedIO . killThread) $ rpcFork (mapM_ (\meas -> _nonDispatchedIO (getValue meas) >>= uncurry f) ls))
instance Monad Event where
return x = Event (\f -> f x 0 >> return (return ()))
Event f >>= g = Event (\h -> do
ref <- _nonDispatchedIO (newIORef (return ()))
liftM (\m -> m >> join (_nonDispatchedIO (readIORef ref))) (f (get2 ref h))) where
get h t x' t' = h x' (max t t')
get2 ref h x t = unEvent (g x) (get h t) >>= \m -> _nonDispatchedIO (atomicModifyIORef ref (\m2 -> (m2 >> m, ())))
fail _ = mzero
instance MonadPlus Event where
mzero = Event (\_ -> return (return ()))
mplus e e2 = Event (\f -> liftM2 (>>) (unEvent e f) (unEvent e2 f))
instance Monoid (Event t) where
mempty = mzero
mappend = mplus
instance MonadIO Event where
liftIO m = Event (\f -> liftIO m >>= \x ->
_nonDispatchedIO (measure (return ())) >>= \meas ->
f x (time meas) >> return (return ()))
nonDispatchedIO m = Event (\f -> _nonDispatchedIO (measure m >>= getValue) >>= uncurry f >> return (return ()))
runEvent :: Event t -> (t -> POSIXTime -> IO ()) -> IO ()
runEvent e f = void $ do
mv <- newEmptyMVar
forkIO $ void $ runReaderT (unRPC $ unEvent e (\x t -> liftIO (f x t))) (False, mv)
rpcServer mv
instance Applicative Event where
pure = return
(<*>) = ap
instance Alternative Event where
empty = mzero
(<|>) = mplus
class (MonadPlus e) => EventStream e where
eventFromList :: [(t, POSIXTime)] -> e t
scan :: (t -> u -> (t, v)) -> t -> e u -> e v
switch :: e (e t) -> e t
withRemainder :: e t -> e (t, e t)
channel :: IO (t -> IO (), e t)
adjoinTime :: e t -> e (t, POSIXTime)
retryLoop ref =
_nonDispatchedIO (readIORef ref) >>= either (\_ -> _nonDispatchedIO yield >> retryLoop ref) id
eventSwitch e = Event (\f -> do
ref <- _nonDispatchedIO (newIORef (Right (return ())))
liftM (\m -> m >> retryLoop ref) (unEvent e (handler f ref))) where
handler f ref e' t = do
ei <- _nonDispatchedIO (atomicModifyIORef ref (\m -> (Left (), m)))
either
(\_ -> _nonDispatchedIO yield >> handler f ref e' t)
(\m -> do
m
m' <- rpcFinally
(unEvent e' f)
(_nonDispatchedIO (writeIORef ref (Right (return ()))))
_nonDispatchedIO (writeIORef ref (Right m')))
ei
instance EventStream Event where
eventFromList = eFromML . fromList
scan f x e = Event (\g -> do
ref <- _nonDispatchedIO (newIORef x)
unEvent e (\y t -> _nonDispatchedIO (atomicModifyIORef ref (\x -> let (x', z) = f x y in (x', z))) >>= \z -> g z t))
switch = eventSwitch
withRemainder e = Event (\f -> unEvent e (\x t -> f (x, e) t))
channel = do
ref <- newIORef M.empty
return (add ref, e ref)
where
add ref x = measure (return ()) >>= \meas -> readIORef ref >>= \mp -> mapM_ (\f -> f x (time meas)) mp
unsub ref un = _nonDispatchedIO $ atomicModifyIORef ref (\mp -> (M.delete un mp, ()))
e ref = Event (\f -> _nonDispatchedIO newUnique >>= \un ->
RPC ask >>= \(_, mv) ->
_nonDispatchedIO (atomicModifyIORef ref (\mp -> (M.insert un (\x t -> runReaderT (unRPC (f x t)) (False, mv)) mp, ())))
>> return (unsub ref un))
adjoinTime e = Event (\f -> unEvent e (\x t -> f (x, t) t))
filterMaybes :: (MonadPlus m) => m (Maybe t) -> m t
filterMaybes e = e >>= \x -> guard (isJust x) >> return (fromJust x)
delay t e = filterMaybes $ adjoinTime (return Nothing `mplus` fmap Just e) >>= \(x, t') -> eventFromList [(x,t+t')]
delays :: (EventStream e, MonadIO e) => e (t, POSIXTime) -> e t
delays e = e >>= \(x, t) -> eventFromList [(x, t)]
fmap' :: (Measurement t -> Measurement u) -> [Measurement t] -> Event u
fmap' f e = scan' (\_ x -> let y = f x in (y, y)) (fmap (const undefined) (head e)) e
scan' f x e = delays $ fmap (\x -> (copoint x, time x)) $ scan (\y z -> f y (unsafePerformIO $ assertMeasurement (return z)))
x
(adjoinTime (eFromML e))