{-# LANGUAGE Trustworthy, DeriveDataTypeable, DeriveFunctor, ScopedTypeVariables, RecursiveDo #-} module FRP.Reactivity.AlternateEvent (Event(..), eFromML, nonDispatchedIO, runEvent, -- ** A minimal set of combinators EventStream(..), filterMaybes, delay, delays, -- ** Optimizer rules fmap', scan') where import qualified Data.Map as M import Data.Unique import Data.IORef import Data.Typeable import Control.Monad import Control.Monad.IO.Class import Control.Monad.Reader import Control.Applicative import Control.Concurrent import Control.Monad.Catch import System.IO.Unsafe import Data.Time.Clock.POSIX import Data.Maybe import FRP.Reactivity.Measurement import FRP.Reactivity.RPC -- | Event streams are here presented using the publisher-subscriber model (push-based handling -- in contrast to the pull-based handling of 'MeasurementWrapper'). Such an event -- is represented by a subscription function and a callback. The subscription function finishes fast allowing -- the caller to continue. -- -- The motivation for introducing this data type is that, while the 'Measurement'/'MeasurementWrapper' -- system is fast, its intensive use of memory cells that need to be garbage collected means -- that it may not be fast enough for some purposes. newtype Event t = Event { unEvent :: (t -> POSIXTime -> RPC ()) -> RPC (RPC ()) } deriving (Typeable, Functor) -- avoid inlining until rules have fired {-# INLINE[0] eFromML #-} -- | Extracts an 'Event' from a list of measurements. No attempt has been made to sync up -- the time order of different calls to 'eFromML'. For this reason, it is not true that -- "eFromML ml `mplus` eFromML ml2" equals "eFromML (ml `merge` ml2)"; please do any necessary -- syncing before calling 'eFromML'. eFromML :: [Measurement t] -> Event t eFromML ls = Event (\f -> liftM (_nonDispatchedIO . killThread) $ rpcFork (mapM_ (\meas -> _nonDispatchedIO (getValue meas) >>= uncurry f) ls)) instance Monad Event where return x = Event (\f -> f x 0 >> return (return ())) Event f >>= g = Event (\h -> do -- Lots of machinery to track all subscriptions... ref <- _nonDispatchedIO (newIORef (return ())) liftM (\m -> m >> join (_nonDispatchedIO (readIORef ref))) (f (get2 ref h))) where get h t x' t' = h x' (max t t') get2 ref h x t = unEvent (g x) (get h t) >>= \m -> _nonDispatchedIO (atomicModifyIORef ref (\m2 -> (m2 >> m, ()))) fail _ = mzero -- All the properties (associativity, unitality etc.) fall out of the monad axioms. instance MonadPlus Event where mzero = Event (\_ -> return (return ())) -- Return immediately mplus e e2 = Event (\f -> liftM2 (>>) (unEvent e f) (unEvent e2 f)) instance Monoid (Event t) where mempty = mzero mappend = mplus instance MonadIO Event where liftIO m = Event (\f -> liftIO m >>= \x -> _nonDispatchedIO (measure (return ())) >>= \meas -> f x (time meas) >> return (return ())) -- I/O can be useful... nonDispatchedIO m = Event (\f -> _nonDispatchedIO (measure m >>= getValue) >>= uncurry f >> return (return ())) -- | Run an event -- see .Basic module for a way to run with a Windows event loop. runEvent :: Event t -> (t -> POSIXTime -> IO ()) -> IO () runEvent e f = void $ do mv <- newEmptyMVar forkIO $ void $ runReaderT (unRPC $ unEvent e (\x t -> liftIO (f x t))) (False, mv) rpcServer mv instance Applicative Event where pure = return (<*>) = ap instance Alternative Event where empty = mzero (<|>) = mplus class (MonadPlus e) => EventStream e where -- | Prooty self-explanatory. eventFromList :: [(t, POSIXTime)] -> e t -- | The "scan" primitive is analogous to "scanl" for lists. scan :: (t -> u -> (t, v)) -> t -> e u -> e v -- | A primitive like "switch" is the main way of implementing behaviors that can be switched -- in and out as required. switch :: e (e t) -> e t -- | The main use of "withRemainder" is to implement manual aging of inputs. It helps prevent space -- and time leaks. withRemainder :: e t -> e (t, e t) -- | Construct a channel in order to receive external events. channel :: IO (t -> IO (), e t) -- | Gets the current time along with every occurrence. adjoinTime :: e t -> e (t, POSIXTime) retryLoop ref = -- It's a comin' _nonDispatchedIO (readIORef ref) >>= either (\_ -> _nonDispatchedIO yield >> retryLoop ref) id eventSwitch e = Event (\f -> do -- Just keep track of the most recent subscription... ref <- _nonDispatchedIO (newIORef (Right (return ()))) liftM (\m -> m >> retryLoop ref) (unEvent e (handler f ref))) where handler f ref e' t = do ei <- _nonDispatchedIO (atomicModifyIORef ref (\m -> (Left (), m))) either (\_ -> _nonDispatchedIO yield >> handler f ref e' t) (\m -> do -- Unsubscribe from it... m -- ... and switch in the new one m' <- rpcFinally (unEvent e' f) (_nonDispatchedIO (writeIORef ref (Right (return ())))) _nonDispatchedIO (writeIORef ref (Right m'))) ei instance EventStream Event where eventFromList = eFromML . fromList scan f x e = Event (\g -> do ref <- _nonDispatchedIO (newIORef x) unEvent e (\y t -> _nonDispatchedIO (atomicModifyIORef ref (\x -> let (x', z) = f x y in (x', z))) >>= \z -> g z t)) switch = eventSwitch withRemainder e = Event (\f -> unEvent e (\x t -> f (x, e) t)) channel = do ref <- newIORef M.empty return (add ref, e ref) where -- Machinery to keep track of subscriptions. This will maintain a set of callbacks -- that want to receive messages. When a subscriber unsubscribes, it will be -- removed from the set. add ref x = measure (return ()) >>= \meas -> readIORef ref >>= \mp -> mapM_ (\f -> f x (time meas)) mp unsub ref un = _nonDispatchedIO $ atomicModifyIORef ref (\mp -> (M.delete un mp, ())) e ref = Event (\f -> _nonDispatchedIO newUnique >>= \un -> RPC ask >>= \(_, mv) -> _nonDispatchedIO (atomicModifyIORef ref (\mp -> (M.insert un (\x t -> runReaderT (unRPC (f x t)) (False, mv)) mp, ()))) >> return (unsub ref un)) adjoinTime e = Event (\f -> unEvent e (\x t -> f (x, t) t)) filterMaybes :: (MonadPlus m) => m (Maybe t) -> m t filterMaybes e = e >>= \x -> guard (isJust x) >> return (fromJust x) delay t e = filterMaybes $ adjoinTime (return Nothing `mplus` fmap Just e) >>= \(x, t') -> eventFromList [(x,t+t')] delays :: (EventStream e, MonadIO e) => e (t, POSIXTime) -> e t delays e = e >>= \(x, t) -> eventFromList [(x, t)] ---------------------------------------- -- Optimization {-# INLINE fmap' #-} fmap' :: (Measurement t -> Measurement u) -> [Measurement t] -> Event u fmap' f e = scan' (\_ x -> let y = f x in (y, y)) (fmap (const undefined) (head e)) e {-# INLINE scan' #-} scan' f x e = delays $ fmap (\x -> (copoint x, time x)) $ scan (\y z -> f y (unsafePerformIO $ assertMeasurement (return z))) x (adjoinTime (eFromML e)) {-# RULES "eFromML/map" forall f e. eFromML (map f e) = fmap' f e "eFromML/scanl" forall f x e. eFromML (scanl f x e) = scan' (\x y -> let z = f x y in (z, z)) x e "eFromML/mzero" eFromML [] = mzero #-}