-- |
-- Module      : Streamly.Internal.Data.Time.Clock
-- Copyright   : (c) 2021 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : pre-release
-- Portability : GHC

module Streamly.Internal.Data.Time.Clock
    (
    -- * System clock
      Clock(..)
    , getTime

    -- * Async clock
    , asyncClock
    , readClock

    -- * Adjustable Timer
    , Timer
    , timer
    , resetTimer
    , extendTimer
    , shortenTimer
    , readTimer
    , waitTimer
    )
where

import Control.Concurrent (threadDelay, ThreadId)
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar, tryPutMVar)
import Control.Monad (forever, when, void)
import Streamly.Internal.Data.Time.Clock.Type (Clock(..), getTime)
import Streamly.Internal.Data.Time.Units
    (MicroSecond64(..), fromAbsTime, addToAbsTime, toRelTime)
import Streamly.Internal.Control.ForkIO (forkIOManaged)

import qualified Streamly.Internal.Data.IORef.Unboxed as Unboxed

------------------------------------------------------------------------------
-- Async clock
------------------------------------------------------------------------------

{-# INLINE updateTimeVar #-}
updateTimeVar :: Clock -> Unboxed.IORef MicroSecond64 -> IO ()
updateTimeVar :: Clock -> IORef MicroSecond64 -> IO ()
updateTimeVar Clock
clock IORef MicroSecond64
timeVar = do
    MicroSecond64
t <- AbsTime -> MicroSecond64
forall a. TimeUnit a => AbsTime -> a
fromAbsTime (AbsTime -> MicroSecond64) -> IO AbsTime -> IO MicroSecond64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Clock -> IO AbsTime
getTime Clock
clock
    IORef MicroSecond64 -> (MicroSecond64 -> MicroSecond64) -> IO ()
forall a. Unbox a => IORef a -> (a -> a) -> IO ()
Unboxed.modifyIORef' IORef MicroSecond64
timeVar (MicroSecond64 -> MicroSecond64 -> MicroSecond64
forall a b. a -> b -> a
const MicroSecond64
t)

{-# INLINE updateWithDelay #-}
updateWithDelay :: RealFrac a =>
    Clock -> a -> Unboxed.IORef MicroSecond64 -> IO ()
updateWithDelay :: Clock -> a -> IORef MicroSecond64 -> IO ()
updateWithDelay Clock
clock a
precision IORef MicroSecond64
timeVar = do
    Int -> IO ()
threadDelay (a -> Int
forall p a. (Bounded p, RealFrac a, Integral p) => a -> p
delayTime a
precision)
    Clock -> IORef MicroSecond64 -> IO ()
updateTimeVar Clock
clock IORef MicroSecond64
timeVar

    where

    -- Keep the minimum at least a millisecond to avoid high CPU usage
    {-# INLINE delayTime #-}
    delayTime :: a -> p
delayTime a
g
        | a
g' a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
forall a. Bounded a => a
maxBound :: Int) = p
forall a. Bounded a => a
maxBound
        | a
g' a -> a -> Bool
forall a. Ord a => a -> a -> Bool
< a
1000 = p
1000
        | Bool
otherwise = a -> p
forall a b. (RealFrac a, Integral b) => a -> b
round a
g'

        where

        g' :: a
g' = a
g a -> a -> a
forall a. Num a => a -> a -> a
* a
10 a -> Int -> a
forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
6 :: Int)

-- | @asyncClock g@ starts a clock thread that updates an IORef with current
-- time as a 64-bit value in microseconds, every 'g' seconds. The IORef can be
-- read asynchronously.  The thread exits automatically when the reference to
-- the returned 'ThreadId' is lost.
--
-- Minimum granularity of clock update is 1 ms. Higher is better for
-- performance.
--
-- CAUTION! This is safe only on a 64-bit machine. On a 32-bit machine a 64-bit
-- 'Var' cannot be read consistently without a lock while another thread is
-- writing to it.
asyncClock :: Clock -> Double -> IO (ThreadId, Unboxed.IORef MicroSecond64)
asyncClock :: Clock -> Double -> IO (ThreadId, IORef MicroSecond64)
asyncClock Clock
clock Double
g = do
    IORef MicroSecond64
timeVar <- MicroSecond64 -> IO (IORef MicroSecond64)
forall a. Unbox a => a -> IO (IORef a)
Unboxed.newIORef MicroSecond64
0
    Clock -> IORef MicroSecond64 -> IO ()
updateTimeVar Clock
clock IORef MicroSecond64
timeVar
    ThreadId
tid <- IO () -> IO ThreadId
forkIOManaged (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Clock -> Double -> IORef MicroSecond64 -> IO ()
forall a. RealFrac a => Clock -> a -> IORef MicroSecond64 -> IO ()
updateWithDelay Clock
clock Double
g IORef MicroSecond64
timeVar)
    (ThreadId, IORef MicroSecond64)
-> IO (ThreadId, IORef MicroSecond64)
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid, IORef MicroSecond64
timeVar)

{-# INLINE readClock #-}
readClock :: (ThreadId, Unboxed.IORef MicroSecond64) -> IO MicroSecond64
readClock :: (ThreadId, IORef MicroSecond64) -> IO MicroSecond64
readClock (ThreadId
_, IORef MicroSecond64
timeVar) = IORef MicroSecond64 -> IO MicroSecond64
forall a. Unbox a => IORef a -> IO a
Unboxed.readIORef IORef MicroSecond64
timeVar

------------------------------------------------------------------------------
-- Adjustable Timer
------------------------------------------------------------------------------

-- | Adjustable periodic timer.
data Timer = Timer ThreadId (MVar ()) (IO ())

-- Set the expiry to current time + timer period
{-# INLINE resetTimerExpiry #-}
resetTimerExpiry :: Clock -> MicroSecond64 -> Unboxed.IORef MicroSecond64 -> IO ()
resetTimerExpiry :: Clock -> MicroSecond64 -> IORef MicroSecond64 -> IO ()
resetTimerExpiry Clock
clock MicroSecond64
period IORef MicroSecond64
timeVar = do
    AbsTime
t <- Clock -> IO AbsTime
getTime Clock
clock
    let t1 :: AbsTime
t1 = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
t (MicroSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime MicroSecond64
period)
    IORef MicroSecond64 -> (MicroSecond64 -> MicroSecond64) -> IO ()
forall a. Unbox a => IORef a -> (a -> a) -> IO ()
Unboxed.modifyIORef' IORef MicroSecond64
timeVar (MicroSecond64 -> MicroSecond64 -> MicroSecond64
forall a b. a -> b -> a
const (AbsTime -> MicroSecond64
forall a. TimeUnit a => AbsTime -> a
fromAbsTime AbsTime
t1))

{-# INLINE processTimerTick #-}
processTimerTick :: RealFrac a =>
    Clock -> a -> Unboxed.IORef MicroSecond64 -> MVar () -> IO () -> IO ()
processTimerTick :: Clock -> a -> IORef MicroSecond64 -> MVar () -> IO () -> IO ()
processTimerTick Clock
clock a
precision IORef MicroSecond64
timeVar MVar ()
mvar IO ()
reset = do
    Int -> IO ()
threadDelay (a -> Int
forall p a. (Bounded p, RealFrac a, Integral p) => a -> p
delayTime a
precision)
    MicroSecond64
t <- AbsTime -> MicroSecond64
forall a. TimeUnit a => AbsTime -> a
fromAbsTime (AbsTime -> MicroSecond64) -> IO AbsTime -> IO MicroSecond64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Clock -> IO AbsTime
getTime Clock
clock
    MicroSecond64
expiry <- IORef MicroSecond64 -> IO MicroSecond64
forall a. Unbox a => IORef a -> IO a
Unboxed.readIORef IORef MicroSecond64
timeVar
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (MicroSecond64
t MicroSecond64 -> MicroSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
>= MicroSecond64
expiry) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        -- non-blocking put so that we can process multiple timers in a
        -- non-blocking manner in future.
        IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
mvar ()
        IO ()
reset

    where

    -- Keep the minimum at least a millisecond to avoid high CPU usage
    {-# INLINE delayTime #-}
    delayTime :: a -> p
delayTime a
g
        | a
g' a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= Int -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
forall a. Bounded a => a
maxBound :: Int) = p
forall a. Bounded a => a
maxBound
        | a
g' a -> a -> Bool
forall a. Ord a => a -> a -> Bool
< a
1000 = p
1000
        | Bool
otherwise = a -> p
forall a b. (RealFrac a, Integral b) => a -> b
round a
g'

        where

        g' :: a
g' = a
g a -> a -> a
forall a. Num a => a -> a -> a
* a
10 a -> Int -> a
forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
6 :: Int)

-- XXX In future we can add a timer in a heap of timers.
--
-- | @timer clockType granularity period@ creates a timer.  The timer produces
-- timer ticks at specified time intervals that can be waited upon using
-- 'waitTimer'.  If the previous tick is not yet processed, the new tick is
-- lost.
timer :: Clock -> Double -> Double -> IO Timer
timer :: Clock -> Double -> Double -> IO Timer
timer Clock
clock Double
g Double
period = do
    MVar ()
mvar <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    IORef MicroSecond64
timeVar <- MicroSecond64 -> IO (IORef MicroSecond64)
forall a. Unbox a => a -> IO (IORef a)
Unboxed.newIORef MicroSecond64
0
    let p :: Int
p = Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
period Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1e6) :: Int
        p1 :: MicroSecond64
p1 = Int -> MicroSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p :: MicroSecond64
        reset :: IO ()
reset = Clock -> MicroSecond64 -> IORef MicroSecond64 -> IO ()
resetTimerExpiry Clock
clock MicroSecond64
p1 IORef MicroSecond64
timeVar
        process :: IO ()
process = Clock -> Double -> IORef MicroSecond64 -> MVar () -> IO () -> IO ()
forall a.
RealFrac a =>
Clock -> a -> IORef MicroSecond64 -> MVar () -> IO () -> IO ()
processTimerTick Clock
clock Double
g IORef MicroSecond64
timeVar MVar ()
mvar IO ()
reset
    IO ()
reset
    ThreadId
tid <- IO () -> IO ThreadId
forkIOManaged (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever IO ()
process
    Timer -> IO Timer
forall (m :: * -> *) a. Monad m => a -> m a
return (Timer -> IO Timer) -> Timer -> IO Timer
forall a b. (a -> b) -> a -> b
$ ThreadId -> MVar () -> IO () -> Timer
Timer ThreadId
tid MVar ()
mvar IO ()
reset

-- | Blocking wait for a timer tick.
{-# INLINE waitTimer #-}
waitTimer :: Timer -> IO ()
waitTimer :: Timer -> IO ()
waitTimer (Timer ThreadId
_ MVar ()
mvar IO ()
_) = MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
mvar

-- | Resets the current period.
{-# INLINE resetTimer #-}
resetTimer :: Timer -> IO ()
resetTimer :: Timer -> IO ()
resetTimer (Timer ThreadId
_ MVar ()
_ IO ()
reset) = IO ()
reset

-- | Elongates the current period by specified amount.
--
-- /Unimplemented/
{-# INLINE extendTimer #-}
extendTimer :: Timer -> Double -> IO ()
extendTimer :: Timer -> Double -> IO ()
extendTimer = Timer -> Double -> IO ()
forall a. HasCallStack => a
undefined

-- | Shortens the current period by specified amount.
--
-- /Unimplemented/
{-# INLINE shortenTimer #-}
shortenTimer :: Timer -> Double -> IO ()
shortenTimer :: Timer -> Double -> IO ()
shortenTimer = Timer -> Double -> IO ()
forall a. HasCallStack => a
undefined

-- | Show the remaining time in the current time period.
--
-- /Unimplemented/
{-# INLINE readTimer #-}
readTimer :: Timer -> IO Double
readTimer :: Timer -> IO Double
readTimer = Timer -> IO Double
forall a. HasCallStack => a
undefined