{-# LANGUAGE Trustworthy, DeriveFunctor, DeriveDataTypeable #-} -- | A different presentation of functional reactive programming, based on the Reactive -- library on Hackage. The functionals in Combinators are based on those from Reactive. module FRP.Reactivity (module Data.Time.Clock.POSIX, Time, Event, firstRestE, -- * Primitive event combinators (see also Monad and MonadPlus instances) cons, corec, withTime, withRest, once, over, displace, list, Stream(Stream), addToEvent, getEvent, chanSource, -- * Executing events FrameOfReference, startT, setupFrame, makeFrame, runFrame, diagnostic) where import GHC.Prim (Any) import Control.Concurrent import qualified Control.CUtils.FChan as C import qualified Control.CUtils.Conc as CC import Control.Monad import Control.Applicative import Control.Comonad import Data.Maybe import Data.List hiding (union) import Data.Monoid hiding (Any) import Data.Function import Data.Typeable import qualified Data.Map as M import Data.List.Extras.Argmax import Data.Time.Clock.POSIX import Data.IORef import System.IO.Unsafe import Unsafe.Coerce import System.IO import System.Mem.Weak {- Desirable properties of functional reactive systems, and how this system addresses - them: - - * Temporal monotonicity: This is accomplished using a global lock. By forcing - channel submissions to be ordered, one can ensure a consistent ordering on - external inputs. Monotonicity is further preserved by the primitive combinators. - * Glitch-freedom: Once external inputs are acquired, they are preseved in - channels. - * Recursion-friendly: Consider the term 'let e = return 1 <> delayE 4 e in e'. - Without lower bounds this is bottom, but an a priori lower bound makes this - a productive recursion. - * Leak-freedom: running a program should take constant space in situations where it - could in principle take constant space.-} type Time = Double data Handler t u = Handler !(Channel (t, Time)) !(t -> Time -> Event u) deriving Functor -- | A type of event streams. data Event t = Event (Maybe t, Time, Event t) -- Wait for the first external event in the dictionary. The Time parameter provides a time -- limit, after which we will continue with the event parameter. !(M.Map Integer (Handler Any t)) deriving (Typeable, Functor) pureOccurrences (Event (_, t, _) _) | t == 1/0 = [] pureOccurrences (Event (Nothing, _, e) _) = pureOccurrences e pureOccurrences (Event (Just x, t, e) _) = (x, t) : pureOccurrences e instance (Show t) => Show (Event t) where showsPrec _ e = ("One possible sequence is:"++) . showsPrec 11 (pureOccurrences e) {-# NOINLINE lock #-} lock = unsafePerformIO (newMVar ()) -- | Find out if any of the channels contain occurrences. firstRestE (Event (x, t, rest) mp) = do when (M.size mp >= 1000) (modifyMVar_ lock $ \_ -> hPutStrLn stderr "Reactivity: Number of event sources getting large (>=1000)") available <- liftM catMaybes $ mapM (\(Handler (Channel ref) f) -> do my <- readIORef ref return (fmap (\((x, t), _) -> (f x t, t)) my)) (M.elems mp) let (e, t2) = argmin snd available if null available || t <= t2 then if t == 1/0 then do when (M.null mp) (modifyMVar_ lock $ \_ -> hPutStrLn stderr "Reactivity: Event stream is empty!") return Nothing else if isJust x then return (Just (fromJust x, t, rest)) else firstRestE rest else firstRestE e {-# INLINE cons #-} cons x t e = Event (Just x, t, displace t e) M.empty -- | Carry some kind of value which gets updated from occurrence to occurrence, and collect -- the results in an event. corec :: (t -> u -> Time -> (t, v, Time)) -> t -> Event u -> Event v corec f x e = over e (\y t rest -> case f x y t of (y, z, t2) -> cons z t2 (corec f y rest)) {-# INLINE withTime #-} -- | Get the time of event occurrences along with their values. withTime e = corec (\_ x t -> ((), (x, t), t)) () e -- | This functional lets you get an idea of the future starting from a certain point -- in time. It is similar to the 'tails' function for lists. withRest :: Event t -> Event (t, Event t) withRest e = over e (\x t rest -> cons (x, rest) t (withRest rest)) instance Comonad Event where duplicate e = over e (\x t rest -> cons (cons x t rest) t (duplicate rest)) extract e = unsafePerformIO $ do -- Sticky counts atomicModifyIORef waiting (\x -> (if x == maxBound then x else succ x, ())) res <- loop e atomicModifyIORef waiting (\x -> (if x == maxBound then x else pred x, ())) return res where loop e = do my <- firstRestE e case my of Just (x, _, _) -> return x Nothing -> -- There is a semaphore that gets pulsed any time an occurrence -- enters the system. waitQSem sem >> loop e {-# INLINE once #-} once e = over e (\x t _ -> cons x t mzero) -- | A case analysis on events. over :: Event t -> (t -> Time -> Event t -> Event u) -> Event u over (Event (x, t, rest) mp) f = if isNothing x then Event (Nothing, t, over rest f) mappedMp else case f (fromJust x) t rest of -- Get hold of the first limiting occurrence Event (y, t2, rest2) mp2 -> let mappedMp2 = fmap (\(Handler ref g) -> Handler ref (\x t2 -> displace t (g x t2))) mp2 in -- Get the external alternatives and limit them at 't', -- Switch in the external alternatives from 'f'; they have to be displaced -- to beyond 't' for monotonicity. Event (Nothing, t, Event (y, t`max`t2, rest2) mappedMp2) mappedMp where mappedMp = fmap (\(Handler ref g) -> Handler ref (\x t -> over (g x t) f)) mp displace' :: Time -> Event t -> Event t displace' t e@(Event (x, t2, rest) mp) = Event (if t <= t2 then (x, t2, rest) else (x, t`max`t2, displace' t rest)) (fmap (\(Handler ref f) -> Handler ref (\x t2 -> if t <= t2 then f x t2 else displace' t (f x t2))) mp) -- | Displace occurrences to at least 't'. displace t e = Event (Nothing, t, displace' t e) M.empty -- Starting with a lower bound at 't' helps recursion be productive. -- | Turns a plain list of occurrences (and times) into an Event. list ((x, t):xs) = cons x t (list xs) list [] = mzero instance MonadPlus Event where mzero = Event (Nothing, 1/0, mzero) M.empty mplus (Event (_, t, _) mp) e2 | t == 1/0 && M.null mp = e2 mplus e (Event (_, t, _) mp) | t == 1/0 && M.null mp = e mplus e@(Event (x, t, rest) mp) e2@(Event (x2, t2, rest2) mp2) = Event (if t <= t2 then (x, t, mplus rest e2) else (x2, t2, mplus e rest2)) (fmap (\ei -> case ei of Left ei2 -> case ei2 of Left (Handler ref f) -> Handler ref (\x t -> mplus (f x t) e2) Right (Handler ref f) -> Handler ref (\x t -> mplus e (f x t)) Right h -> h) $ M.unionWith (\(Left (Left (Handler ref f))) (Left (Right (Handler _ g))) -> Right (Handler ref (\x t -> f x t `mplus` g x t))) (fmap (Left . Left) mp) (fmap (Left . Right) mp2)) data Stream t = Stream !(t -> IO ()) !(Event t) deriving Typeable addToEvent ~(Stream f _) = f getEvent ~(Stream _ e) = e {-# NOINLINE counter #-} counter :: IORef Integer counter = unsafePerformIO (newIORef 0) unsafeCast :: Handler t u -> Handler v u unsafeCast = unsafeCoerce chanSource :: FrameOfReference -> IO (Stream t) chanSource frame = do n <- atomicModifyIORef counter (\x -> (succ x, x)) chn <- liftM Channel $ newIORef Nothing end <- newIORef chn let chanLoop chn@(Channel ref) = do e <- unsafeInterleaveIO (do Just (_, chn') <- readIORef ref chanLoop chn') return (Event (Nothing, 1/0, mzero) (M.singleton n (unsafeCast (Handler chn (\x t -> cons x t e))))) event <- chanLoop chn let strm = Stream (\x -> modifyMVar_ (remainder frame) (\e@(Event tup mp) -> do -- Lock is for monotonicity t1 <- getPOSIXTime let t = fromRational (toRational (t1 - startT frame)) -- Write into the channel chanWrite end (x, t) -- ...and deliver directly to the waiting handler let my = M.lookup n mp e' <- maybe (return e) (\hnd -> do let Handler _ f = unsafeCast hnd immediates t (f x t)) my return e') >> wakeup) event mkWeak end end (Just (void $ forkIO $ modifyMVar_ (remainder frame) (\(Event tup mp) -> return (Event tup (M.delete n mp))))) return strm where immediates t e = firstRestE e >>= maybe (return e) (\(x, t1, rest) -> if t1 <= t then do x immediates t rest else return e) instance Alternative Event where empty = mzero (<|>) = mplus instance Monoid (Event t) where mempty = mzero mappend = mplus -- The monad for Event is much like the list monad: -- * 'join' - instead of concatenating, it interleaves result events according to their times. -- * 'unit' - yields a "boring" one-point event at t = 0. instance Monad Event where return x = cons x 0 mzero e >>= f = over e (\x _ rest -> f x <> (rest >>= f)) fail _ = mzero instance Applicative Event where pure = return (<*>) = ap data Channel t = Channel !(IORef (Maybe (t, Channel t))) {-# INLINE chanWrite #-} chanWrite chnEnd x = do chn' <- newIORef Nothing Channel chn <- readIORef chnEnd writeIORef chnEnd (Channel chn') writeIORef chn (Just (x, Channel chn')) ------------------------------------------- -- Executing events data FrameOfReference = FrameOfReference !(MVar (Event (IO ()))) -- The remainder as of the present time !POSIXTime -- A start time deriving Typeable remainder ~(FrameOfReference mv _) = mv startT ~(FrameOfReference _ start) = start {-# NOINLINE waiting #-} waiting :: IORef Int waiting = unsafePerformIO (newIORef 0) -- This is a condition variable that wakes up all waiting threads. {-# NOINLINE sem #-} sem = unsafePerformIO (newQSem 0) {-# INLINE wakeup #-} wakeup = do n <- readIORef waiting modifyMVar_ lock (\_ -> hPutStrLn stderr (show n ++ " bumps")) replicateM_ n (signalQSem sem) -- | Create a frame of reference from an event and handler. {-# INLINE makeFrame #-} makeFrame e = do mv <- newMVar e startT <- getPOSIXTime return (FrameOfReference mv startT) {-# INLINE setupFrame #-} setupFrame frame e = do tryTakeMVar (remainder frame) putMVar (remainder frame) e -- | Run a frame of reference in the current thread -- but N.B. that some executions of the -- frame's 'sink' may occur in other threads. -- -- See FRP.Reactivity.Basic for a more elaborate scheme that gets the results -- from I/O as event occurrences. runFrame :: FrameOfReference -> IO a runFrame frame = do e <- takeMVar (remainder frame) my <- firstRestE e let (x, t, rest) = case my of Just tup -> tup Nothing -> (undefined, 1/0, mzero) t1 <- getPOSIXTime let time = t - fromRational (toRational (t1 - startT frame)) if time <= 0 then do x putMVar (remainder frame) rest else if time == 1/0 then do atomicModifyIORef waiting (\x -> (if x == maxBound then x else succ x, ())) putMVar (remainder frame) e waitQSem sem atomicModifyIORef waiting (\x -> (if x == maxBound then x else pred x, ())) else do putMVar (remainder frame) e modifyMVar_ lock (\_ -> hPutStrLn stderr ("Wait for " ++ show time)) threadDelay (round (1000000 * time)) runFrame frame diagnostic (Event (my, t, _) mp) = do putStr (if isJust my then "occurrence" else "bound") putStr (" at " ++ show t ++ ", map:") ls <- mapM (\(n, Handler (Channel ref) _) -> liftM ((,) n . isJust) (readIORef ref)) (M.assocs mp) print ls