module Streamly.Internal.Data.Time.Clock
(
Clock(..)
, getTime
, asyncClock
, readClock
, Timer
, timer
, resetTimer
, extendTimer
, shortenTimer
, readTimer
, waitTimer
)
where
import Control.Concurrent (threadDelay, ThreadId)
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar, tryPutMVar)
import Control.Monad (forever, when, void)
import Streamly.Internal.Data.Time.Clock.Type (Clock(..), getTime)
import Streamly.Internal.Data.Time.Units
(MicroSecond64(..), fromAbsTime, addToAbsTime, toRelTime)
import Streamly.Internal.Control.ForkIO (forkIOManaged)
import qualified Streamly.Internal.Data.IORef.Unboxed as Unboxed
{-# INLINE updateTimeVar #-}
updateTimeVar :: Clock -> Unboxed.IORef MicroSecond64 -> IO ()
updateTimeVar :: Clock -> IORef MicroSecond64 -> IO ()
updateTimeVar Clock
clock IORef MicroSecond64
timeVar = do
MicroSecond64
t <- AbsTime -> MicroSecond64
forall a. TimeUnit a => AbsTime -> a
fromAbsTime (AbsTime -> MicroSecond64) -> IO AbsTime -> IO MicroSecond64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Clock -> IO AbsTime
getTime Clock
clock
IORef MicroSecond64 -> (MicroSecond64 -> MicroSecond64) -> IO ()
forall a. Unbox a => IORef a -> (a -> a) -> IO ()
Unboxed.modifyIORef' IORef MicroSecond64
timeVar (MicroSecond64 -> MicroSecond64 -> MicroSecond64
forall a b. a -> b -> a
const MicroSecond64
t)
{-# INLINE updateWithDelay #-}
updateWithDelay :: RealFrac a =>
Clock -> a -> Unboxed.IORef MicroSecond64 -> IO ()
updateWithDelay :: Clock -> a -> IORef MicroSecond64 -> IO ()
updateWithDelay Clock
clock a
precision IORef MicroSecond64
timeVar = do
Int -> IO ()
threadDelay (a -> Int
forall p a. (Bounded p, RealFrac a, Integral p) => a -> p
delayTime a
precision)
Clock -> IORef MicroSecond64 -> IO ()
updateTimeVar Clock
clock IORef MicroSecond64
timeVar
where
{-# INLINE delayTime #-}
delayTime :: a -> p
delayTime a
g
| a
g' a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
forall a. Bounded a => a
maxBound :: Int) = p
forall a. Bounded a => a
maxBound
| a
g' a -> a -> Bool
forall a. Ord a => a -> a -> Bool
< a
1000 = p
1000
| Bool
otherwise = a -> p
forall a b. (RealFrac a, Integral b) => a -> b
round a
g'
where
g' :: a
g' = a
g a -> a -> a
forall a. Num a => a -> a -> a
* a
10 a -> Int -> a
forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
6 :: Int)
asyncClock :: Clock -> Double -> IO (ThreadId, Unboxed.IORef MicroSecond64)
asyncClock :: Clock -> Double -> IO (ThreadId, IORef MicroSecond64)
asyncClock Clock
clock Double
g = do
IORef MicroSecond64
timeVar <- MicroSecond64 -> IO (IORef MicroSecond64)
forall a. Unbox a => a -> IO (IORef a)
Unboxed.newIORef MicroSecond64
0
Clock -> IORef MicroSecond64 -> IO ()
updateTimeVar Clock
clock IORef MicroSecond64
timeVar
ThreadId
tid <- IO () -> IO ThreadId
forkIOManaged (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Clock -> Double -> IORef MicroSecond64 -> IO ()
forall a. RealFrac a => Clock -> a -> IORef MicroSecond64 -> IO ()
updateWithDelay Clock
clock Double
g IORef MicroSecond64
timeVar)
(ThreadId, IORef MicroSecond64)
-> IO (ThreadId, IORef MicroSecond64)
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid, IORef MicroSecond64
timeVar)
{-# INLINE readClock #-}
readClock :: (ThreadId, Unboxed.IORef MicroSecond64) -> IO MicroSecond64
readClock :: (ThreadId, IORef MicroSecond64) -> IO MicroSecond64
readClock (ThreadId
_, IORef MicroSecond64
timeVar) = IORef MicroSecond64 -> IO MicroSecond64
forall a. Unbox a => IORef a -> IO a
Unboxed.readIORef IORef MicroSecond64
timeVar
data Timer = Timer ThreadId (MVar ()) (IO ())
{-# INLINE resetTimerExpiry #-}
resetTimerExpiry :: Clock -> MicroSecond64 -> Unboxed.IORef MicroSecond64 -> IO ()
resetTimerExpiry :: Clock -> MicroSecond64 -> IORef MicroSecond64 -> IO ()
resetTimerExpiry Clock
clock MicroSecond64
period IORef MicroSecond64
timeVar = do
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
clock
let t1 :: AbsTime
t1 = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
t (MicroSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime MicroSecond64
period)
IORef MicroSecond64 -> (MicroSecond64 -> MicroSecond64) -> IO ()
forall a. Unbox a => IORef a -> (a -> a) -> IO ()
Unboxed.modifyIORef' IORef MicroSecond64
timeVar (MicroSecond64 -> MicroSecond64 -> MicroSecond64
forall a b. a -> b -> a
const (AbsTime -> MicroSecond64
forall a. TimeUnit a => AbsTime -> a
fromAbsTime AbsTime
t1))
{-# INLINE processTimerTick #-}
processTimerTick :: RealFrac a =>
Clock -> a -> Unboxed.IORef MicroSecond64 -> MVar () -> IO () -> IO ()
processTimerTick :: Clock -> a -> IORef MicroSecond64 -> MVar () -> IO () -> IO ()
processTimerTick Clock
clock a
precision IORef MicroSecond64
timeVar MVar ()
mvar IO ()
reset = do
Int -> IO ()
threadDelay (a -> Int
forall p a. (Bounded p, RealFrac a, Integral p) => a -> p
delayTime a
precision)
MicroSecond64
t <- AbsTime -> MicroSecond64
forall a. TimeUnit a => AbsTime -> a
fromAbsTime (AbsTime -> MicroSecond64) -> IO AbsTime -> IO MicroSecond64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Clock -> IO AbsTime
getTime Clock
clock
MicroSecond64
expiry <- IORef MicroSecond64 -> IO MicroSecond64
forall a. Unbox a => IORef a -> IO a
Unboxed.readIORef IORef MicroSecond64
timeVar
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (MicroSecond64
t MicroSecond64 -> MicroSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
>= MicroSecond64
expiry) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
mvar ()
IO ()
reset
where
{-# INLINE delayTime #-}
delayTime :: a -> p
delayTime a
g
| a
g' a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
forall a. Bounded a => a
maxBound :: Int) = p
forall a. Bounded a => a
maxBound
| a
g' a -> a -> Bool
forall a. Ord a => a -> a -> Bool
< a
1000 = p
1000
| Bool
otherwise = a -> p
forall a b. (RealFrac a, Integral b) => a -> b
round a
g'
where
g' :: a
g' = a
g a -> a -> a
forall a. Num a => a -> a -> a
* a
10 a -> Int -> a
forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
6 :: Int)
timer :: Clock -> Double -> Double -> IO Timer
timer :: Clock -> Double -> Double -> IO Timer
timer Clock
clock Double
g Double
period = do
MVar ()
mvar <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef MicroSecond64
timeVar <- MicroSecond64 -> IO (IORef MicroSecond64)
forall a. Unbox a => a -> IO (IORef a)
Unboxed.newIORef MicroSecond64
0
let p :: Int
p = Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
period Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1e6) :: Int
p1 :: MicroSecond64
p1 = Int -> MicroSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p :: MicroSecond64
reset :: IO ()
reset = Clock -> MicroSecond64 -> IORef MicroSecond64 -> IO ()
resetTimerExpiry Clock
clock MicroSecond64
p1 IORef MicroSecond64
timeVar
process :: IO ()
process = Clock -> Double -> IORef MicroSecond64 -> MVar () -> IO () -> IO ()
forall a.
RealFrac a =>
Clock -> a -> IORef MicroSecond64 -> MVar () -> IO () -> IO ()
processTimerTick Clock
clock Double
g IORef MicroSecond64
timeVar MVar ()
mvar IO ()
reset
IO ()
reset
ThreadId
tid <- IO () -> IO ThreadId
forkIOManaged (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever IO ()
process
Timer -> IO Timer
forall (m :: * -> *) a. Monad m => a -> m a
return (Timer -> IO Timer) -> Timer -> IO Timer
forall a b. (a -> b) -> a -> b
$ ThreadId -> MVar () -> IO () -> Timer
Timer ThreadId
tid MVar ()
mvar IO ()
reset
{-# INLINE waitTimer #-}
waitTimer :: Timer -> IO ()
waitTimer :: Timer -> IO ()
waitTimer (Timer ThreadId
_ MVar ()
mvar IO ()
_) = MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
mvar
{-# INLINE resetTimer #-}
resetTimer :: Timer -> IO ()
resetTimer :: Timer -> IO ()
resetTimer (Timer ThreadId
_ MVar ()
_ IO ()
reset) = IO ()
reset
{-# INLINE extendTimer #-}
extendTimer :: Timer -> Double -> IO ()
extendTimer :: Timer -> Double -> IO ()
extendTimer = Timer -> Double -> IO ()
forall a. HasCallStack => a
undefined
{-# INLINE shortenTimer #-}
shortenTimer :: Timer -> Double -> IO ()
shortenTimer :: Timer -> Double -> IO ()
shortenTimer = Timer -> Double -> IO ()
forall a. HasCallStack => a
undefined
{-# INLINE readTimer #-}
readTimer :: Timer -> IO Double
readTimer :: Timer -> IO Double
readTimer = Timer -> IO Double
forall a. HasCallStack => a
undefined