module Pipes.RealTime (
timeCat,
timeCatDelayedBy,
relativeTimeCat,
relativeTimeCatDelayedBy,
steadyCat,
poissonCat,
poissonCatConst,
genPoissonCat,
catAtTimes,
catAtRelativeTimes,
) where
import Prelude hiding (dropWhile)
import Control.Monad
import Pipes
import Pipes.Prelude (chain, dropWhile)
import Control.Concurrent (threadDelay)
import Data.Time.Clock
import Data.Time.Calendar
import System.Random.MWC
import qualified System.Random.MWC.Distributions as MWCDists
relativeTimeCat :: (a -> Double) -> Pipe a a IO r
relativeTimeCat toRelTime = do
t0 <- lift getCurrentTime
dropWhile (( < 0 ) . toRelTime) >->
chain (\v -> pauseUntil (doubleToNomDiffTime (toRelTime v) `addUTCTime` t0))
relativeTimeCatDelayedBy :: (a -> Double) -> Double -> Pipe a a IO r
relativeTimeCatDelayedBy toTime delay = relativeTimeCat toTime'
where toTime' = ((+ delay) . toTime)
timeCat :: (a -> UTCTime) -> Pipe a a IO r
timeCat toTime = do
t0 <- lift getCurrentTime
dropWhile (( < t0 ) . toTime) >->
chain (pauseUntil . toTime)
timeCatDelayedBy :: (a -> UTCTime) -> Double -> Pipe a a IO r
timeCatDelayedBy toTime delay = do
timeCat $ toTime'
where toTime' = (doubleToNomDiffTime delay `addUTCTime`) . toTime
steadyCat :: Double -> Pipe a a IO r
steadyCat rate = do
t0 <- lift getCurrentTime
loop t0
where
dtUTC = doubleToNomDiffTime (1/rate)
loop t =
let t' = dtUTC `addUTCTime` t in do
lift $ pauseUntil t'
v <- await
yield v
loop t'
poissonCat :: Double -> Pipe a a IO r
poissonCat rate = lift createSystemRandom >>= \gen ->
genPoissonCat gen rate
poissonCatConst :: Double -> Pipe a a IO r
poissonCatConst rate = lift create >>= \gen ->
genPoissonCat gen rate
genPoissonCat :: GenIO -> Double -> Pipe a a IO r
genPoissonCat gen rate = do
t0 <- lift getCurrentTime
loop t0
where
loop t = do
v <- await
dt <- lift $ MWCDists.exponential rate gen
let t' = addUTCTime (doubleToNomDiffTime dt) t
lift $ pauseUntil t'
yield v
loop t'
catAtTimes :: [UTCTime] -> Pipe a a IO r
catAtTimes [] = cat
catAtTimes (t:ts) = do
lift $ pauseUntil t
v <- await
yield v
catAtTimes ts
catAtRelativeTimes :: [Double] -> Pipe a a IO r
catAtRelativeTimes [] = cat
catAtRelativeTimes ts@(_:_) = lift absTimes >>= catAtTimes
where absTimes =
getCurrentTime >>= \t0 ->
return $ map (\d -> doubleToNomDiffTime d `addUTCTime` t0) ts
pauseUntil :: UTCTime -> IO ()
pauseUntil t = do
now <- getCurrentTime
case compare now t of
LT -> threadDelay (truncate (diffUTCTime t now * 1000000))
_ -> return ()
doubleToNomDiffTime :: Double -> NominalDiffTime
doubleToNomDiffTime x =
let d0 = ModifiedJulianDay 0
t0 = UTCTime d0 (picosecondsToDiffTime 0)
t1 = UTCTime d0 (picosecondsToDiffTime $ floor (x/1e-12))
in diffUTCTime t1 t0