{-# LANGUAGE Trustworthy, DeriveFunctor, DeriveDataTypeable, ImplicitParams #-} -- | A different presentation of functional reactive programming, based on the Reactive -- library on Hackage. The functionals in Combinators are based on those from Reactive. module FRP.Reactivity (module Data.Time.Clock.POSIX, Time, Event, firstRestE, firstRestE', -- * Primitive event combinators (see also Monad and MonadPlus instances) cons, corec, withTime, withRest, once, over, displace, list, firstE, Stream(Stream), addToEvent, addToEventWithTime, getEvent, chanSource, -- * Executing events FrameOfReference, startT, setupFrame, makeFrame, runFrame, diagnostic, ChanEnd, chanEnd, Channel(Channel), chanWrite) where import GHC.Prim (Any) import Control.Concurrent import qualified Control.CUtils.Conc as CC import Control.Parallel import Control.Monad import Control.Applicative import Control.Comonad import Data.Maybe import Data.List hiding (union) import Data.Monoid hiding (Any) import Data.Function import Data.Typeable import Data.Traversable (mapM) import qualified Data.Map as M import Data.List.Extras.Argmax import Data.Time.Clock.POSIX import Data.IORef import Data.Array import System.IO.Unsafe import Unsafe.Coerce import System.IO import System.Mem.Weak import Prelude hiding (mapM) {- Desirable properties of functional reactive systems, and how this system addresses - them: - - * Temporal monotonicity: This is accomplished using a global lock. By forcing - channel submissions to be ordered, one can ensure a consistent ordering on - external inputs. Monotonicity is further preserved by the primitive combinators. - * Glitch-freedom: Once external inputs are acquired, they are preserved for - later examination.-} type Time = Double data Handler t u = Handler !(Channel (Maybe (t, Time))) !(t -> Time -> Event u) deriving Functor -- | A type of event streams. data Event t = Event (Maybe t, Time, Event t) -- Wait for the first external event in the dictionary. The Time parameter provides a time -- limit, after which we will continue with the event parameter. (M.Map Integer (Handler Any t)) deriving (Typeable, Functor) pureOccurrences (Event (_, t, _) _) | t == 1/0 = [] pureOccurrences (Event (Nothing, _, e) _) = pureOccurrences e pureOccurrences (Event (Just x, t, e) _) = (x, t) : pureOccurrences e instance (Show t) => Show (Event t) where showsPrec _ e = ("One possible sequence is:"++) . showsPrec 11 (pureOccurrences e) {-# NOINLINE lock #-} lock = unsafePerformIO (newMVar ()) -- | Find out if any of the channels contain occurrences. We have to be careful about how -- we dispatch the so-called pure events; if the current time is t0, and t <= t0, we -- cannot automatically conclude that t has occurred first, because we may be awaiting -- an occurrence from before 't' to get into the variable. However, if something has -- already occurred at a time 't2', we may conclude that t2 <= t0. If t <= t2, we can -- conclude that nothing t3 is waiting in the wings prior to t, because it must be t3 >= t2 -- due to interlocks and therefore t3 >= t. firstRestE (Event (x, t, rest) mp) = do when (M.size mp >= 1000) (modifyMVar_ lock $ \_ -> hPutStrLn stderr "Reactivity: Number of event sources getting large (>=1000)") let ?seq = True available <- liftM (catMaybes . elems) $ CC.conc $ listArray (0, M.size mp - 1) $ fmap (\(Handler (Channel ref) f) -> do my <- readIORef ref return (my >>= \(my', _) -> fmap (\(x, t) -> (f x t, t)) my')) (M.elems mp) let (e, t2) = argmin snd available if null available then return Nothing else if t <= t2 then if t == 1/0 then do when (M.null mp) (modifyMVar_ lock $ \_ -> hPutStrLn stderr "Reactivity: Event stream is empty!") return Nothing else if isJust x then return (Just (cons (fromJust x) t rest)) else return (Just rest) else return (Just e) -- | This function is beefier because it additionally accepts (a lower bound of) the current time, -- which upper bounds all prior occurrences. firstRestE' t e@(Event (x, t2, rest) _) = if t2 <= t then if isJust x then return (Just (fromJust x, t2, rest)) else firstRestE' t rest else firstRestE e >>= maybe (return Nothing) (firstRestE' t) {-# INLINE cons #-} cons x t e = Event (Just x, t, displace t e) M.empty -- | Carry some kind of value which gets updated from occurrence to occurrence, and collect -- the results in an event. {-# INLINE[0] corec #-} corec :: (t -> u -> Time -> (t, v, Time)) -> t -> Event u -> Event v corec f x e = over e (\y t rest -> case f x y t of (y, z, t2) -> cons z t2 (corec f y rest)) {-# INLINE withTime #-} -- | Get the time of event occurrences along with their values. withTime e = corec (\_ x t -> ((), (x, t), t)) () e -- | This functional lets you get a snapshot of the remaining elements of the event starting from a certain point -- in time. It is similar to the 'tails' function for lists. withRest :: Event t -> Event (t, Event t) withRest e = over e (\x t rest -> cons (x, rest) t (withRest rest)) instance Comonad Event where duplicate e = over e (\x t rest -> cons (cons x t rest) t (duplicate rest)) extract e = unsafePerformIO $ do -- Sticky counts atomicModifyIORef waiting (\x -> (if x == maxBound then x else succ x, ())) res <- loop e atomicModifyIORef waiting (\x -> (if x == maxBound then x else pred x, ())) return res where loop e = do my <- firstRestE' 0 e case my of Just (x, _, _) -> return x Nothing -> -- There is a semaphore that gets pulsed any time an occurrence -- enters the system. waitQSem sem >> loop e {-# INLINE once #-} once e = over e (\x t _ -> cons x t mzero) -- | A case analysis on events. {-# INLINE over #-} over :: Event t -> (t -> Time -> Event t -> Event u) -> Event u over e@(Event (x, t, rest) mp) f = if isNothing x then Event (Nothing, t, over rest f) mappedMp else case f (fromJust x) t rest of -- Get hold of the first limiting occurrence Event (y, t2, rest2) mp2 -> let mappedMp2 = fmap (\(Handler ref g) -> Handler ref (\x t2 -> displace t (g x t2))) mp2 in -- Get the external alternatives and limit them at 't', -- Switch in the external alternatives from 'f'; they have to be displaced -- to beyond 't' for monotonicity. Event (Nothing, t, Event (y, t`max`t2, rest2) mappedMp2) mappedMp where mappedMp = fmap (\(Handler ref g) -> Handler ref (\x t -> over (g x t) f)) mp displace' :: Time -> Event t -> Event t displace' t e@(Event (x, t2, rest) mp) = Event (if t <= t2 then (x, t2, rest) else (x, t`max`t2, displace' t rest)) (fmap (\(Handler ref f) -> Handler ref (\x t2 -> if t <= t2 then f x t2 else displace' t (f x t2))) mp) -- | Displace occurrences to at least 't'. -- -- Starting with a lower bound at 't' helps recursion be productive. {-# INLINE displace #-} displace t e = Event (Nothing, t, displace' t e) M.empty -- | Turns a plain list of occurrences (and times) into an Event. list ((x, t):xs) = cons x t (list xs) list [] = mzero firstE e e2 = join $ once $ duplicate e <> duplicate e2 -- The goal with this code: to determine, promptly and accurately, which of two external -- event sources ticks first. I am aided in this by the presence of locks on the addition -- of external events; these locks induce strong memory barriers; this means in turn -- that certain bad views of memory cannot occur. I do not need to contend with channel -- elements being read as filled in the opposite order they are filled, because of the -- memory barriers. On the whole, it does not matter either if a channel cell is read as -- empty when it is actually filled; in that case the whole question of which comes first -- can be put off. Lastly, I have to contend with two cells from the same channel both -- appearing empty; this is solved by retrying as described below. merge (Handler (Channel ref) f) (Handler (Channel ref2) g) = -- When this test is false for full channel cells, we will chew through those -- channel cells to reach an empty cell. -- -- For empty channel cells, strictly speaking this test should always be true; -- there is only one empty channel cell at a given time for a particular channel. -- However, it may be observed to be false; this is a consequence of the weak -- memory model and can be fixed by retrying. The practice of retrying works -- because an inconsistent view of memory must eventually be replaced by the -- correct view if we wait long enough (this is called eventual consistency). if ref == ref2 then Just (Handler (Channel ref) (\x t -> f x t `mplus` g x t)) else Nothing instance MonadPlus Event where mzero = Event (Nothing, 1/0, mzero) M.empty mplus (Event (_, t, _) mp) e2 | t == 1/0 && M.null mp = e2 mplus e (Event (_, t, _) mp) | t == 1/0 && M.null mp = e mplus ~e@(Event (x, t, rest) mp) ~e2@(Event (x2, t2, rest2) mp2) = unsafePerformIO loop where loop = do -- We will chew through the channel cells and make sure that the comparison -- between two channels' times is made against the latest cell. my <- firstRestE e my2 <- firstRestE e2 case (my, my2) of (Just e', Just e2') -> return (e' `mplus` e2') (Just e', Nothing) -> return (e' `mplus` e2) (Nothing, Just e2') -> return (e `mplus` e2') _ -> if M.foldl' (\b -> (b ||) . isNothing) False unioned then yield >> loop else return (Event (if t <= t2 then (x, t, mplus rest e2) else (x2, t2, mplus e rest2)) (fmap fromJust unioned)) unioned = fmap (\ei -> case ei of Left ei2 -> case ei2 of Left (Handler ref f) -> Just $ Handler ref (\x t -> f x t `mplus` e2) Right (Handler ref f) -> Just $ Handler ref (\x t -> e `mplus` f x t) Right h -> h) $ M.unionWith (\(Left (Left hdl)) (Left (Right hdl2)) -> Right (merge hdl hdl2)) (fmap (Left . Left) mp) (fmap (Left . Right) mp2) data Stream t = Stream !(t -> IO ()) !(t -> Time -> IO ()) !(Event t) deriving Typeable addToEvent ~(Stream f _ _) = f addToEventWithTime ~(Stream _ f _) = f getEvent ~(Stream _ _ e) = e {-# NOINLINE counter #-} counter :: IORef Integer counter = unsafePerformIO (newIORef 0) unsafeCast :: Handler t u -> Handler v u unsafeCast = unsafeCoerce -- Not used {-immediates t e = firstRestE' t e >>= maybe (return e) (\(x, t1, rest) -> if t1 <= t then do x immediates t rest else return e)-} chanSource :: FrameOfReference -> IO (Stream t) chanSource frame = do n <- atomicModifyIORef counter (\x -> (succ x, x)) chn <- liftM Channel $ newIORef Nothing end <- liftM chanEnd $ newIORef chn let chanLoop chn@(Channel ref) = do e <- unsafeInterleaveIO (do Just (_, chn') <- readIORef ref chanLoop chn') return (Event (Nothing, 1/0, mzero) (M.singleton n (unsafeCast (Handler chn (\x t -> cons x t e))))) event <- chanLoop chn let strm = Stream (\x -> modifyMVar_ (remainder frame) (\e@(Event tup mp) -> do t1 <- getPOSIXTime let t = fromRational (toRational (t1 - startT frame)) -- Write into the channel chanWrite end (Just (x, t)) wakeup return e)) (\x t -> modifyMVar_ (remainder frame) (\e -> chanWrite end (Just (x, t)) >> wakeup >> return e)) event -- Channels have to be terminated when their channel ends are garbage collected; otherwise there is a space leak. -- This termination requires a finalizer, because I have elected not to embed event streams in a larger -- structure such as an arrow; which manages resources. mkWeak end end (Just (void $ forkIO $ modifyMVar_ (remainder frame) (\e -> putStrLn "Finalize" >> chanWrite end Nothing >> return e))) return strm instance Alternative Event where empty = mzero (<|>) = mplus instance Monoid (Event t) where mempty = mzero mappend = mplus -- The monad for Event is much like the list monad: -- * 'join' - instead of concatenating, it interleaves result events according to their times. -- * 'unit' - yields a "boring" one-point event at t = 0. instance Monad Event where return x = cons x 0 mzero e >>= f = over e (\x _ rest -> f x <> (rest >>= f)) fail _ = mzero instance Applicative Event where pure = return (<*>) = ap data ChanEnd t = ChanEnd {-# NOUNPACK #-} !(IORef (Channel t)) {-# NOINLINE chanEnd #-} chanEnd x = ChanEnd x data Channel t = Channel !(IORef (Maybe (t, Channel t))) {-# NOINLINE chanWrite #-} chanWrite (ChanEnd chnEnd) x = do chn' <- newIORef Nothing Channel chn <- readIORef chnEnd writeIORef chnEnd (Channel chn') writeIORef chn (Just (x, Channel chn')) ------------------------------------------- -- Executing events data FrameOfReference = FrameOfReference !(MVar (Event (IO ()))) -- The remainder as of the present time !POSIXTime -- A start time deriving Typeable remainder ~(FrameOfReference mv _) = mv startT ~(FrameOfReference _ start) = start {-# NOINLINE waiting #-} waiting :: IORef Int waiting = unsafePerformIO (newIORef 0) -- This is a condition variable that wakes up all waiting threads. -- -- The semaphore is paired with a counter that indicates how many threads are waiting -- at a given time. Only waking up a certain number of threads works because I do -- not care about spurious wakeups -- i.e. it doesn't matter if a thread is woken up -- when there is no work for it to do. Care must be taken however, that a resource -- is checked in an interlocked manner prior to engaging with the wakeup mechanism, -- otherwise the thread may miss its opportunity to be woken up. {-# NOINLINE sem #-} sem = unsafePerformIO (newQSem 0) {-# INLINE wakeup #-} wakeup = do n <- readIORef waiting modifyMVar_ lock (\_ -> hPutStrLn stderr (show n ++ " bumps")) replicateM_ n (signalQSem sem) -- | Create a frame of reference from an event and handler. {-# INLINE makeFrame #-} makeFrame e = do mv <- newMVar e startT <- getPOSIXTime return (FrameOfReference mv startT) {-# INLINE setupFrame #-} setupFrame frame e = do tryTakeMVar (remainder frame) putMVar (remainder frame) e -- | Run a frame of reference in the current thread -- but N.B. that some executions of the -- frame's 'sink' may occur in other threads. -- -- See FRP.Reactivity.Basic for a more elaborate scheme that gets the results -- from I/O as event occurrences. runFrame :: FrameOfReference -> IO a runFrame frame = do e <- takeMVar (remainder frame) t1 <- getPOSIXTime let t2 = fromRational (toRational (t1 - startT frame)) my <- firstRestE' t2 e let (x, t, rest) = case my of Just tup -> tup Nothing -> (undefined, 1/0, mzero) my' <- firstRestE e let Event (_, lower, _) _ = maybe e id my' let time = min lower t - t2 if t - t2 <= 0 then do putMVar (remainder frame) rest x else do atomicModifyIORef waiting (\x -> (if x == maxBound then x else succ x, ())) putMVar (remainder frame) e CC.oneOf $ listArray (0, 1) [threadDelay (round (1000000 * time)), waitQSem sem] atomicModifyIORef waiting (\x -> (if x == maxBound then x else pred x, ())) runFrame frame diagnostic (Event (my, t, _) mp) = do putStr (if isJust my then "occurrence" else "bound") putStr (" at " ++ show t ++ ", map:") ls <- mapM (\(n, Handler (Channel ref) _) -> liftM ((,) n . isJust) (readIORef ref)) (M.assocs mp) print ls {-# RULES "corec/corec" forall f x g y e. corec f x (corec g y e) = corec (\(x, y) z t -> case g y z t of (y', a, t') -> case f x a t' of (x', b, t'') -> ((x', y'), b, t'')) (x, y) e #-}