module Pipes.RealTime ( -- *Pipes throttled by their own timestamps timeCat, timeCatDelayedBy, relativeTimeCat, relativeTimeCatDelayedBy, -- *Pipes throttled by you steadyCat, poissonCat, poissonCatConst, genPoissonCat, catAtTimes, catAtRelativeTimes, ) where import Prelude hiding (dropWhile) import Pipes import Pipes.Prelude (chain, dropWhile) import Control.Concurrent (threadDelay) import Data.Time.Clock import Data.Time.Calendar import System.Random.MWC import qualified System.Random.MWC.Distributions as MWCDists {-| Yield values some time after the effect is run, according to their relative timestamps. Assumes that values arrive in ascending time order. Values with negative relative timestamps are discarded -} relativeTimeCat :: (MonadIO m) => (a -> Double) -> Pipe a a m r relativeTimeCat toRelTime = do t0 <- liftIO getCurrentTime dropWhile (( < 0 ) . toRelTime) >-> chain (\v -> liftIO $ pauseUntil (doubleToNomDiffTime (toRelTime v) `addUTCTime` t0)) {-| Yield values at their timestamps, but delay by some time (given in seconds). Passing a negative delay advances the generator, discarding events happening before the effect -} relativeTimeCatDelayedBy :: (MonadIO m) => (a -> Double) -> Double -> Pipe a a m r relativeTimeCatDelayedBy toTime delay = relativeTimeCat toTime' where toTime' = ((+ delay) . toTime) {-| Yield values at the absolute times given by their timestamps. Assumes that they arrive in ascending time order. Values with timestamps earlier than the starting time of the effect are discarded -} timeCat :: (MonadIO m) => (a -> UTCTime) -> Pipe a a m r timeCat toTime = do t0 <- liftIO getCurrentTime dropWhile (( < t0 ) . toTime) >-> chain (liftIO . pauseUntil . toTime) {-| Yield values at their absolute timesteps, but delay or advance their production by some time (given in seconds). Values with timestamps less than zero after adjustment are discarded -} timeCatDelayedBy :: (MonadIO m) => (a -> UTCTime) -> Double -> Pipe a a m r timeCatDelayedBy toTime delay = do timeCat $ toTime' where toTime' = (doubleToNomDiffTime delay `addUTCTime`) . toTime {-| Yield values at steady rate (Hz) -} steadyCat :: (MonadIO m) => Double -> Pipe a a m r steadyCat rate = do t0 <- liftIO getCurrentTime loop t0 where dtUTC = doubleToNomDiffTime (1/rate) loop t = let t' = dtUTC `addUTCTime` t in do liftIO $ pauseUntil t' v <- await yield v loop t' {-| Constant-rate Poisson process yielding values, randomized by IO -} poissonCat :: (MonadIO m) => Double -> Pipe a a m r poissonCat rate = liftIO createSystemRandom >>= \gen -> genPoissonCat gen rate {-| Constant-rate Poisson process with a fixed seed - the same random every time -} poissonCatConst :: (MonadIO m) => Double -> Pipe a a m r poissonCatConst rate = liftIO create >>= \gen -> genPoissonCat gen rate {-| Constant-rate Poisson process yielding values, seeded by you -} genPoissonCat :: (MonadIO m) => GenIO -> Double -> Pipe a a m r genPoissonCat gen rate = do t0 <- liftIO getCurrentTime loop t0 where loop t = do v <- await dt <- liftIO $ MWCDists.exponential rate gen let t' = addUTCTime (doubleToNomDiffTime dt) t liftIO $ pauseUntil t' yield v loop t' {-|Yield values at a set of absolute times. Yield remaining values immediately if the time list becomes empty -} catAtTimes :: (MonadIO m) => [UTCTime] -> Pipe a a m r catAtTimes [] = cat catAtTimes (t:ts) = do liftIO $ pauseUntil t v <- await yield v catAtTimes ts {-|Yield values at a set of times relative to the first received value. Yield remaining values immediately if the time list becomes empty -} catAtRelativeTimes :: (MonadIO m) => [Double] -> Pipe a a m r catAtRelativeTimes [] = cat catAtRelativeTimes ts@(_:_) = liftIO absTimes >>= catAtTimes where absTimes = getCurrentTime >>= \t0 -> return $ map (\d -> doubleToNomDiffTime d `addUTCTime` t0) ts pauseUntil :: UTCTime -> IO () pauseUntil t = do now <- getCurrentTime case compare now t of LT -> threadDelay (truncate (diffUTCTime t now * 1000000)) _ -> return () doubleToNomDiffTime :: Double -> NominalDiffTime doubleToNomDiffTime x = let d0 = ModifiedJulianDay 0 t0 = UTCTime d0 (picosecondsToDiffTime 0) t1 = UTCTime d0 (picosecondsToDiffTime $ floor (x/1e-12)) in diffUTCTime t1 t0