module FRP.Reactivity.Measurement (Measurement(Empty), wait, stmAction, assertMeasurement, measure, await, blindAwait, fromList, assertChan, chan, first, leMeas, mergeStreams, getValue, copoint, time) where
import GHC.Conc hiding (Chan, newChan)
import Control.Concurrent.MVar
import Control.CUtils.Conc
import Control.CUtils.FChan
import System.IO.Unsafe
import Data.Time.Clock.POSIX
import Data.Typeable
import Control.Monad
import Control.Applicative
import Data.Maybe
import Data.Monoid
eitherOr :: IO t -> IO u -> IO ()
eitherOr m m2 = oneOfF 2 (\n -> if n == 0 then void m else void m2)
data Measurement t = Measurement
!(IO ())
!(STM (Maybe (t, POSIXTime))) | Empty deriving (Typeable, Functor)
instance (Show t) => Show (Measurement t) where
showsPrec prec meas = showsPrec prec (copoint meas, time meas)
instance (Eq t) => Eq (Measurement t) where
meas == meas2 = copoint meas == copoint meas2 && time meas == time meas2
instance Monad Measurement where
return x = Measurement (return ()) (return (Just (x, 0)))
meas >>= f = Measurement
(getValue meas >>= wait . f . fst)
(stmAction meas >>= maybe
(return Nothing)
(\(x, t) -> liftM (fmap (\(y, t') -> (y, max t t'))) (stmAction (f x))))
fail _ = mzero
instance Applicative Measurement where
pure = return
(<*>) = ap
wait ~(Measurement co _) = co
stmAction ~(Measurement _ stm) = stm
_assertMeasurement :: IO (t, Maybe POSIXTime) -> IO (Measurement t)
_assertMeasurement m = do
mv <- newEmptyMVar
tv <- newTVarIO Nothing
let writeMeas = do
(x, my) <- m
atomically (maybe (unsafeIOToSTM getPOSIXTime) return my >>= \t -> readTVar tv >>= maybe (writeTVar tv (Just (x, t))) (\_ -> return ()))
tryPutMVar mv ()
return ()
let meas = Measurement
(readMVar mv)
(readTVar tv)
forkIO writeMeas
return meas
assertMeasurement :: IO (t, POSIXTime) -> IO (Measurement t)
assertMeasurement m = _assertMeasurement (liftM (\(x, t) -> (x, Just t)) m)
measure :: IO t -> IO (Measurement t)
measure m = _assertMeasurement (liftM (\x -> (x, Nothing)) m)
delayUntil :: POSIXTime -> IO ()
delayUntil t = getPOSIXTime >>= \time -> threadDelay (round (1000000 * fromRational (toRational (t time))))
await :: POSIXTime -> IO (Measurement ())
await t = measure (delayUntil t)
blindAwait :: POSIXTime -> Measurement ()
blindAwait t = unsafePerformIO (do
Measurement wait stm <- measure (delayUntil t)
return (Measurement wait (return (Just ((), t)))))
fromList :: [(t, POSIXTime)] -> [Measurement t]
fromList ((x, t):xs) = fmap fst meas : snd (copoint meas) where
meas = fmap (const (x, fromList xs)) (blindAwait t)
fromList [] = []
assertChan :: forall t. IO (t -> POSIXTime -> IO (), [Measurement t])
assertChan = do
(f, chn) <- newChan
let loop (chn :: Chan (t, POSIXTime)) = do
meas <- assertMeasurement (do
((x, t), chn') <- takeChan chn
ls <- unsafeInterleaveIO (loop chn')
return ((x, ls), t))
return (fmap fst meas : snd (copoint meas))
ls <- loop chn
return (curry f, ls)
chan :: IO (t -> IO (), [Measurement t])
chan = liftM (\(f, ls) -> (\x -> getPOSIXTime >>= f x, ls)) assertChan
first :: Measurement t -> Measurement t -> Measurement t
first Empty meas2 = meas2
first meas Empty = meas
first meas meas2 = Measurement wt stm
where
stm = do
pr <- liftM2 (,) (stmAction meas) (stmAction meas2)
return $ case pr of
(Just (x, t), Just (x2, t2)) -> Just (if t <= t2 then (x, t) else (x2, t2))
(Just pr, Nothing) -> Just pr
(Nothing, Just pr) -> Just pr
_ -> Nothing
wt = wait meas `eitherOr` wait meas2
instance MonadPlus Measurement where
mzero = Empty
mplus = first
instance Monoid (Measurement t) where
mempty = mzero
mappend = mplus
instance Alternative Measurement where
empty = mzero
(<|>) = mplus
leMeas :: Measurement t -> Measurement u -> Bool
leMeas x y = copoint (fmap (const True) x `first` fmap (const False) y)
mergeStreams (x:xs) (y:ys) = fmap fst x' : snd (copoint x') where
x' = fmap (\x' -> (x', mergeStreams xs (y:ys))) x
`first` fmap (\x' -> (x', mergeStreams (x:xs) ys)) y
mergeStreams [] xs = xs
mergeStreams xs [] = xs
getValue :: Measurement t -> IO (t, POSIXTime)
getValue meas = do
wait meas
liftM fromJust (atomically (stmAction meas))
copoint :: Measurement t -> t
copoint = fst . unsafePerformIO . getValue
time :: Measurement t -> POSIXTime
time = snd . unsafePerformIO . getValue