{-# LANGUAGE Trustworthy, DeriveDataTypeable, DeriveFunctor, ScopedTypeVariables, RecursiveDo #-} module FRP.Reactivity.AlternateEvent (Event(..), eFromML, nonDispatchedIO, runEvent, -- ** A minimal set of combinators EventStream(..), filterMaybes, delay, delays, -- ** Optimizer rules fmap', scan') where import qualified Data.Map as M import Data.Unique import Data.IORef import Data.Typeable import Data.Time.Clock.POSIX import Data.Maybe import Data.Monoid import Data.Traversable import Control.Monad hiding (mapM) import Control.Monad.Reader hiding (mapM) import Control.Applicative import Control.Concurrent import Control.Monad.Catch import System.IO.Unsafe import FRP.Reactivity.Measurement import FRP.Reactivity.RPC import Prelude hiding (mapM) -- | Event streams are here presented using the publisher-subscriber model (push-based handling -- in contrast to the pull-based handling of 'MeasurementWrapper'). Such an event -- is represented by a subscription function and a callback. The subscription function finishes fast allowing -- the caller to continue. -- -- The motivation for introducing this data type is that, while the 'Measurement'/'MeasurementWrapper' -- system is fast, its intensive use of memory cells that need to be garbage collected means -- that it may not be fast enough for some purposes. newtype Event t = Event { unEvent :: (t -> POSIXTime -> RPC ()) -> RPC (RPC ()) } deriving (Typeable, Functor) -- avoid inlining until rules have fired {-# INLINE[0] eFromML #-} -- | Extracts an 'Event' from a list of measurements. No attempt has been made to sync up -- the time order of different calls to 'eFromML'. For this reason, it is not true that -- "eFromML ml `mplus` eFromML ml2" equals "eFromML (ml `merge` ml2)"; please do any necessary -- syncing before calling 'eFromML'. eFromML :: [Measurement t] -> Event t eFromML ls = Event (\f -> liftM (_nonDispatchedIO . killThread) $ rpcFork (mapM_ (\meas -> _nonDispatchedIO (getValue meas) >>= uncurry f) ls)) instance Monad Event where return x = Event (\f -> f x 0 >> return (return ())) Event f >>= g = Event (\h -> do -- Lots of machinery to track all subscriptions... ref <- _nonDispatchedIO (newIORef (return ())) liftM (\m -> m >> join (_nonDispatchedIO (readIORef ref))) (f (get2 ref h))) where get h t x' t' = h x' (max t t') get2 ref h x t = unEvent (g x) (get h t) >>= \m -> _nonDispatchedIO (atomicModifyIORef ref (\m2 -> (m2 >> m, ()))) fail _ = mzero -- All the properties (associativity, unitality etc.) fall out of the monad axioms. instance MonadPlus Event where mzero = Event (\_ -> return (return ())) -- Return immediately mplus e e2 = Event (\f -> liftM2 (>>) (unEvent e f) (unEvent e2 f)) instance Monoid (Event t) where mempty = mzero mappend = mplus instance MonadIO Event where liftIO m = Event (\f -> liftIO m >>= \x -> _nonDispatchedIO (measure (return ())) >>= \meas -> f x (time meas) >> return (return ())) -- I/O can be useful... nonDispatchedIO m = Event (\f -> _nonDispatchedIO (measure m >>= getValue) >>= uncurry f >> return (return ())) -- | Run an event -- see .Basic module for a way to run with a Windows event loop. runEvent :: Event t -> (t -> POSIXTime -> IO ()) -> IO () runEvent e f = void $ do mv <- newEmptyMVar forkIO $ void $ runReaderT (unRPC $ unEvent e (\x t -> liftIO (f x t))) (False, mv) rpcServer mv instance Applicative Event where pure = return (<*>) = ap instance Alternative Event where empty = mzero (<|>) = mplus class (Functor e, MonadPlus e) => EventStream e where -- | Prooty self-explanatory. eventFromList :: [(t, POSIXTime)] -> e t -- | The "scan" primitive is analogous to "scanl" for lists. scan :: (t -> u -> (t, v)) -> t -> e u -> e v -- | A primitive like "switch" is the main way of implementing behaviors that can be switched -- in and out as required. switch :: e (e t) -> e t -- | The main use of "withRemainder" is to implement manual aging of inputs. It helps prevent space -- and time leaks. withRemainder :: e t -> e (t, e t) -- | Construct a channel in order to receive external events. channel :: IO (t -> IO (), e t) -- | Gets the current time along with every occurrence. adjoinTime :: e t -> e (t, POSIXTime) retryLoop ref = -- It's a comin' _nonDispatchedIO (readIORef ref) >>= either (\_ -> _nonDispatchedIO yield >> retryLoop ref) id eventSwitch e = Event (\f -> do -- Just keep track of the most recent subscription... ref <- _nonDispatchedIO (newIORef (Right (return ()))) liftM (\m -> m >> retryLoop ref) (unEvent e (handler f ref))) where handler f ref e' t = do ei <- _nonDispatchedIO (atomicModifyIORef ref (\m -> (Left (), m))) either (\_ -> _nonDispatchedIO yield >> handler f ref e' t) (\m -> do -- Unsubscribe from it... m -- ... and switch in the new one m' <- rpcFinally (unEvent e' f) (_nonDispatchedIO (writeIORef ref (Right (return ())))) _nonDispatchedIO (writeIORef ref (Right m'))) ei instance EventStream Event where eventFromList = eFromML . fromList scan f x e = Event (\g -> do ref <- _nonDispatchedIO (newIORef x) unEvent e (\y t -> _nonDispatchedIO (atomicModifyIORef ref (\x -> let (x', z) = f x y in (x', z))) >>= \z -> g z t)) switch = eventSwitch withRemainder e = Event (\f -> unEvent e (\x t -> f (x, e) t)) channel = do ref <- newIORef M.empty return (add ref, e ref) where -- Machinery to keep track of subscriptions. This will maintain a set of callbacks -- that want to receive messages. When a subscriber unsubscribes, it will be -- removed from the set. add ref x = measure (return ()) >>= \meas -> readIORef ref >>= \mp -> void $ mapM (\f -> f x (time meas)) mp unsub ref un = _nonDispatchedIO $ atomicModifyIORef ref (\mp -> (M.delete un mp, ())) e ref = Event (\f -> _nonDispatchedIO newUnique >>= \un -> RPC ask >>= \(_, mv) -> _nonDispatchedIO (atomicModifyIORef ref (\mp -> (M.insert un (\x t -> runReaderT (unRPC (f x t)) (False, mv)) mp, ()))) >> return (unsub ref un)) adjoinTime e = Event (\f -> unEvent e (\x t -> f (x, t) t)) filterMaybes :: (MonadPlus m) => m (Maybe t) -> m t filterMaybes e = e >>= \x -> guard (isJust x) >> return (fromJust x) delay t e = filterMaybes $ adjoinTime (return Nothing `mplus` fmap Just e) >>= \(x, t') -> eventFromList [(x,t+t')] delays :: (EventStream e, MonadIO e) => e (t, POSIXTime) -> e t delays e = e >>= \(x, t) -> eventFromList [(x, t)] ---------------------------------------- -- Optimization {-# INLINE fmap' #-} fmap' :: (Measurement t -> Measurement u) -> [Measurement t] -> Event u fmap' f e = scan' (\_ x -> let y = f x in (y, y)) (fmap (const undefined) (head e)) e {-# INLINE scan' #-} scan' f x e = delays $ fmap (\x -> (copoint x, time x)) $ scan (\y z -> f y (unsafePerformIO $ assertMeasurement (return z))) x (adjoinTime (eFromML e)) {-# RULES "eFromML/map" forall f e. eFromML (map f e) = fmap' f e "eFromML/scanl" forall f x e. eFromML (scanl f x e) = scan' (\x y -> let z = f x y in (z, z)) x e "eFromML/mzero" eFromML [] = mzero #-}