{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecordWildCards #-}

-- | This module provides the ability to create reapers: dedicated cleanup
-- threads. These threads will automatically spawn and die based on the
-- presence of a workload to process on. Example uses include:
--
-- * Killing long-running jobs
-- * Closing unused connections in a connection pool
-- * Pruning a cache of old items (see example below)
--
-- For real-world usage, search the <https://github.com/yesodweb/wai WAI family of packages>
-- for imports of "Control.Reaper".
module Control.Reaper (
    -- * Example: Regularly cleaning a cache
    -- $example1

    -- * Settings
    ReaperSettings,
    defaultReaperSettings,

    -- * Accessors
    reaperAction,
    reaperDelay,
    reaperCons,
    reaperNull,
    reaperEmpty,
    reaperThreadName,

    -- * Type
    Reaper,
    reaperAdd,
    reaperRead,
    reaperModify,
    reaperStop,
    reaperKill,

    -- * Creation
    mkReaper,

    -- * Helper
    mkListAction,
) where

import Control.AutoUpdate.Util (atomicModifyIORef')
import Control.Concurrent (ThreadId, forkIO, killThread, threadDelay)
import Control.Exception (mask_)
import Control.Reaper.Internal
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import GHC.Conc.Sync (labelThread)

-- | Settings for creating a reaper. This type has two parameters:
-- @workload@ gives the entire workload, whereas @item@ gives an
-- individual piece of the queue. A common approach is to have @workload@
-- be a list of @item@s. This is encouraged by 'defaultReaperSettings' and
-- 'mkListAction'.
--
-- @since 0.1.1
data ReaperSettings workload item = ReaperSettings
    { forall workload item.
ReaperSettings workload item
-> workload -> IO (workload -> workload)
reaperAction :: workload -> IO (workload -> workload)
    -- ^ The action to perform on a workload. The result of this is a
    -- \"workload modifying\" function. In the common case of using lists,
    -- the result should be a difference list that prepends the remaining
    -- workload to the temporary workload. The temporary workload here
    -- refers to items added to the workload while the reaper action is
    -- running. For help with setting up such an action, see 'mkListAction'.
    --
    -- Default: do nothing with the workload, and then prepend it to the
    -- temporary workload. This is incredibly useless; you should
    -- definitely override this default.
    --
    -- @since 0.1.1
    , forall workload item. ReaperSettings workload item -> Int
reaperDelay :: {-# UNPACK #-} !Int
    -- ^ Number of microseconds to delay between calls of 'reaperAction'.
    --
    -- Default: 30 seconds.
    --
    -- @since 0.1.1
    , forall workload item.
ReaperSettings workload item -> item -> workload -> workload
reaperCons :: item -> workload -> workload
    -- ^ Add an item onto a workload.
    --
    -- Default: list consing.
    --
    -- @since 0.1.1
    , forall workload item.
ReaperSettings workload item -> workload -> Bool
reaperNull :: workload -> Bool
    -- ^ Check if a workload is empty, in which case the worker thread
    -- will shut down.
    --
    -- Default: 'null'.
    --
    -- @since 0.1.1
    , forall workload item. ReaperSettings workload item -> workload
reaperEmpty :: workload
    -- ^ An empty workload.
    --
    -- Default: empty list.
    --
    -- @since 0.1.1
    , forall workload item. ReaperSettings workload item -> String
reaperThreadName :: String
    -- ^ Label of the thread spawned by the reaper.
    --
    -- Default: @"Reaper"@.
    --
    -- @since 0.2.2
    }

-- | Default @ReaperSettings@ value, biased towards having a list of work
-- items.
--
-- @since 0.1.1
defaultReaperSettings :: ReaperSettings [item] item
defaultReaperSettings :: forall item. ReaperSettings [item] item
defaultReaperSettings =
    ReaperSettings
        { reaperAction :: [item] -> IO ([item] -> [item])
reaperAction = \[item]
wl -> ([item] -> [item]) -> IO ([item] -> [item])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([item]
wl [item] -> [item] -> [item]
forall a. [a] -> [a] -> [a]
++)
        , reaperDelay :: Int
reaperDelay = Int
30000000
        , reaperCons :: item -> [item] -> [item]
reaperCons = (:)
        , reaperNull :: [item] -> Bool
reaperNull = [item] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null
        , reaperEmpty :: [item]
reaperEmpty = []
        , reaperThreadName :: String
reaperThreadName = String
"Reaper"
        }

-- | State of reaper.
data State workload
    = -- | No reaper thread
      NoReaper
    | -- | The current jobs
      Workload !workload

-- | Create a reaper addition function. This function can be used to add
-- new items to the workload. Spawning of reaper threads will be handled
-- for you automatically.
--
-- @since 0.1.1
mkReaper :: ReaperSettings workload item -> IO (Reaper workload item)
mkReaper :: forall workload item.
ReaperSettings workload item -> IO (Reaper workload item)
mkReaper settings :: ReaperSettings workload item
settings@ReaperSettings{workload
Int
String
workload -> Bool
workload -> IO (workload -> workload)
item -> workload -> workload
reaperAction :: forall workload item.
ReaperSettings workload item
-> workload -> IO (workload -> workload)
reaperDelay :: forall workload item. ReaperSettings workload item -> Int
reaperCons :: forall workload item.
ReaperSettings workload item -> item -> workload -> workload
reaperNull :: forall workload item.
ReaperSettings workload item -> workload -> Bool
reaperEmpty :: forall workload item. ReaperSettings workload item -> workload
reaperThreadName :: forall workload item. ReaperSettings workload item -> String
reaperAction :: workload -> IO (workload -> workload)
reaperDelay :: Int
reaperCons :: item -> workload -> workload
reaperNull :: workload -> Bool
reaperEmpty :: workload
reaperThreadName :: String
..} = do
    IORef (State workload)
stateRef <- State workload -> IO (IORef (State workload))
forall a. a -> IO (IORef a)
newIORef State workload
forall workload. State workload
NoReaper
    IORef (Maybe ThreadId)
tidRef <- Maybe ThreadId -> IO (IORef (Maybe ThreadId))
forall a. a -> IO (IORef a)
newIORef Maybe ThreadId
forall a. Maybe a
Nothing
    Reaper workload item -> IO (Reaper workload item)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
        Reaper
            { reaperAdd :: item -> IO ()
reaperAdd = ReaperSettings workload item
-> IORef (State workload)
-> IORef (Maybe ThreadId)
-> item
-> IO ()
forall workload item.
ReaperSettings workload item
-> IORef (State workload)
-> IORef (Maybe ThreadId)
-> item
-> IO ()
add ReaperSettings workload item
settings IORef (State workload)
stateRef IORef (Maybe ThreadId)
tidRef
            , reaperRead :: IO workload
reaperRead = IORef (State workload) -> IO workload
readRef IORef (State workload)
stateRef
            , reaperModify :: (workload -> workload) -> IO workload
reaperModify = IORef (State workload) -> (workload -> workload) -> IO workload
modifyRef IORef (State workload)
stateRef
            , reaperStop :: IO workload
reaperStop = IORef (State workload) -> IO workload
stop IORef (State workload)
stateRef
            , reaperKill :: IO ()
reaperKill = IORef (Maybe ThreadId) -> IO ()
kill IORef (Maybe ThreadId)
tidRef
            }
  where
    readRef :: IORef (State workload) -> IO workload
readRef IORef (State workload)
stateRef = do
        State workload
mx <- IORef (State workload) -> IO (State workload)
forall a. IORef a -> IO a
readIORef IORef (State workload)
stateRef
        case State workload
mx of
            State workload
NoReaper -> workload -> IO workload
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return workload
reaperEmpty
            Workload workload
wl -> workload -> IO workload
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return workload
wl
    modifyRef :: IORef (State workload) -> (workload -> workload) -> IO workload
modifyRef IORef (State workload)
stateRef workload -> workload
modifier = IORef (State workload)
-> (State workload -> (State workload, workload)) -> IO workload
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (State workload)
stateRef ((State workload -> (State workload, workload)) -> IO workload)
-> (State workload -> (State workload, workload)) -> IO workload
forall a b. (a -> b) -> a -> b
$ \State workload
mx ->
        case State workload
mx of
            State workload
NoReaper ->
                (State workload
forall workload. State workload
NoReaper, workload
reaperEmpty)
            Workload workload
wl ->
                let !wl' :: workload
wl' = workload -> workload
modifier workload
wl
                 in (workload -> State workload
forall workload. workload -> State workload
Workload workload
wl', workload
wl')
    stop :: IORef (State workload) -> IO workload
stop IORef (State workload)
stateRef = IORef (State workload)
-> (State workload -> (State workload, workload)) -> IO workload
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (State workload)
stateRef ((State workload -> (State workload, workload)) -> IO workload)
-> (State workload -> (State workload, workload)) -> IO workload
forall a b. (a -> b) -> a -> b
$ \State workload
mx ->
        case State workload
mx of
            State workload
NoReaper -> (State workload
forall workload. State workload
NoReaper, workload
reaperEmpty)
            Workload workload
x -> (workload -> State workload
forall workload. workload -> State workload
Workload workload
reaperEmpty, workload
x)
    kill :: IORef (Maybe ThreadId) -> IO ()
kill IORef (Maybe ThreadId)
tidRef = do
        Maybe ThreadId
mtid <- IORef (Maybe ThreadId) -> IO (Maybe ThreadId)
forall a. IORef a -> IO a
readIORef IORef (Maybe ThreadId)
tidRef
        case Maybe ThreadId
mtid of
            Maybe ThreadId
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just ThreadId
tid -> ThreadId -> IO ()
killThread ThreadId
tid

add
    :: ReaperSettings workload item
    -> IORef (State workload)
    -> IORef (Maybe ThreadId)
    -> item
    -> IO ()
add :: forall workload item.
ReaperSettings workload item
-> IORef (State workload)
-> IORef (Maybe ThreadId)
-> item
-> IO ()
add settings :: ReaperSettings workload item
settings@ReaperSettings{workload
Int
String
workload -> Bool
workload -> IO (workload -> workload)
item -> workload -> workload
reaperAction :: forall workload item.
ReaperSettings workload item
-> workload -> IO (workload -> workload)
reaperDelay :: forall workload item. ReaperSettings workload item -> Int
reaperCons :: forall workload item.
ReaperSettings workload item -> item -> workload -> workload
reaperNull :: forall workload item.
ReaperSettings workload item -> workload -> Bool
reaperEmpty :: forall workload item. ReaperSettings workload item -> workload
reaperThreadName :: forall workload item. ReaperSettings workload item -> String
reaperAction :: workload -> IO (workload -> workload)
reaperDelay :: Int
reaperCons :: item -> workload -> workload
reaperNull :: workload -> Bool
reaperEmpty :: workload
reaperThreadName :: String
..} IORef (State workload)
stateRef IORef (Maybe ThreadId)
tidRef item
item =
    IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        IO ()
next <- IORef (State workload)
-> (State workload -> (State workload, IO ())) -> IO (IO ())
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (State workload)
stateRef State workload -> (State workload, IO ())
cons
        IO ()
next
  where
    cons :: State workload -> (State workload, IO ())
cons State workload
NoReaper =
        let wl :: workload
wl = item -> workload -> workload
reaperCons item
item workload
reaperEmpty
         in (workload -> State workload
forall workload. workload -> State workload
Workload workload
wl, ReaperSettings workload item
-> IORef (State workload) -> IORef (Maybe ThreadId) -> IO ()
forall workload item.
ReaperSettings workload item
-> IORef (State workload) -> IORef (Maybe ThreadId) -> IO ()
spawn ReaperSettings workload item
settings IORef (State workload)
stateRef IORef (Maybe ThreadId)
tidRef)
    cons (Workload workload
wl) =
        let wl' :: workload
wl' = item -> workload -> workload
reaperCons item
item workload
wl
         in (workload -> State workload
forall workload. workload -> State workload
Workload workload
wl', () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())

spawn
    :: ReaperSettings workload item
    -> IORef (State workload)
    -> IORef (Maybe ThreadId)
    -> IO ()
spawn :: forall workload item.
ReaperSettings workload item
-> IORef (State workload) -> IORef (Maybe ThreadId) -> IO ()
spawn ReaperSettings workload item
settings IORef (State workload)
stateRef IORef (Maybe ThreadId)
tidRef = do
    ThreadId
tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ ReaperSettings workload item
-> IORef (State workload) -> IORef (Maybe ThreadId) -> IO ()
forall workload item.
ReaperSettings workload item
-> IORef (State workload) -> IORef (Maybe ThreadId) -> IO ()
reaper ReaperSettings workload item
settings IORef (State workload)
stateRef IORef (Maybe ThreadId)
tidRef
    ThreadId -> String -> IO ()
labelThread ThreadId
tid (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ ReaperSettings workload item -> String
forall workload item. ReaperSettings workload item -> String
reaperThreadName ReaperSettings workload item
settings
    IORef (Maybe ThreadId) -> Maybe ThreadId -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe ThreadId)
tidRef (Maybe ThreadId -> IO ()) -> Maybe ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> Maybe ThreadId
forall a. a -> Maybe a
Just ThreadId
tid

reaper
    :: ReaperSettings workload item
    -> IORef (State workload)
    -> IORef (Maybe ThreadId)
    -> IO ()
reaper :: forall workload item.
ReaperSettings workload item
-> IORef (State workload) -> IORef (Maybe ThreadId) -> IO ()
reaper settings :: ReaperSettings workload item
settings@ReaperSettings{workload
Int
String
workload -> Bool
workload -> IO (workload -> workload)
item -> workload -> workload
reaperAction :: forall workload item.
ReaperSettings workload item
-> workload -> IO (workload -> workload)
reaperDelay :: forall workload item. ReaperSettings workload item -> Int
reaperCons :: forall workload item.
ReaperSettings workload item -> item -> workload -> workload
reaperNull :: forall workload item.
ReaperSettings workload item -> workload -> Bool
reaperEmpty :: forall workload item. ReaperSettings workload item -> workload
reaperThreadName :: forall workload item. ReaperSettings workload item -> String
reaperAction :: workload -> IO (workload -> workload)
reaperDelay :: Int
reaperCons :: item -> workload -> workload
reaperNull :: workload -> Bool
reaperEmpty :: workload
reaperThreadName :: String
..} IORef (State workload)
stateRef IORef (Maybe ThreadId)
tidRef = do
    Int -> IO ()
threadDelay Int
reaperDelay
    -- Getting the current jobs. Push an empty job to the reference.
    workload
wl <- IORef (State workload)
-> (State workload -> (State workload, workload)) -> IO workload
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (State workload)
stateRef State workload -> (State workload, workload)
forall {b}. State b -> (State workload, b)
swapWithEmpty
    -- Do the jobs. A function to merge the left jobs and
    -- new jobs is returned.
    !workload -> workload
merge <- workload -> IO (workload -> workload)
reaperAction workload
wl
    -- Merging the left jobs and new jobs.
    -- If there is no jobs, this thread finishes.
    Bool
cont <- IORef (State workload)
-> (State workload -> (State workload, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (State workload)
stateRef ((workload -> workload) -> State workload -> (State workload, Bool)
forall {workload}.
(workload -> workload) -> State workload -> (State workload, Bool)
check workload -> workload
merge)
    if Bool
cont
        then
            ReaperSettings workload item
-> IORef (State workload) -> IORef (Maybe ThreadId) -> IO ()
forall workload item.
ReaperSettings workload item
-> IORef (State workload) -> IORef (Maybe ThreadId) -> IO ()
reaper ReaperSettings workload item
settings IORef (State workload)
stateRef IORef (Maybe ThreadId)
tidRef
        else
            IORef (Maybe ThreadId) -> Maybe ThreadId -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe ThreadId)
tidRef Maybe ThreadId
forall a. Maybe a
Nothing
  where
    swapWithEmpty :: State b -> (State workload, b)
swapWithEmpty State b
NoReaper = String -> (State workload, b)
forall a. HasCallStack => String -> a
error String
"Control.Reaper.reaper: unexpected NoReaper (1)"
    swapWithEmpty (Workload b
wl) = (workload -> State workload
forall workload. workload -> State workload
Workload workload
reaperEmpty, b
wl)

    check :: (workload -> workload) -> State workload -> (State workload, Bool)
check workload -> workload
_ State workload
NoReaper = String -> (State workload, Bool)
forall a. HasCallStack => String -> a
error String
"Control.Reaper.reaper: unexpected NoReaper (2)"
    check workload -> workload
merge (Workload workload
wl)
        -- If there is no job, reaper is terminated.
        | workload -> Bool
reaperNull workload
wl' = (State workload
forall workload. State workload
NoReaper, Bool
False)
        -- If there are jobs, carry them out.
        | Bool
otherwise = (workload -> State workload
forall workload. workload -> State workload
Workload workload
wl', Bool
True)
      where
        wl' :: workload
wl' = workload -> workload
merge workload
wl

-- | A helper function for creating 'reaperAction' functions. You would
-- provide this function with a function to process a single work item and
-- return either a new work item, or @Nothing@ if the work item is
-- expired.
--
-- @since 0.1.1
mkListAction
    :: (item -> IO (Maybe item'))
    -> [item]
    -> IO ([item'] -> [item'])
mkListAction :: forall item item'.
(item -> IO (Maybe item')) -> [item] -> IO ([item'] -> [item'])
mkListAction item -> IO (Maybe item')
f =
    ([item'] -> [item']) -> [item] -> IO ([item'] -> [item'])
forall {c}. ([item'] -> c) -> [item] -> IO ([item'] -> c)
go [item'] -> [item']
forall a. a -> a
id
  where
    go :: ([item'] -> c) -> [item] -> IO ([item'] -> c)
go ![item'] -> c
front [] = ([item'] -> c) -> IO ([item'] -> c)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [item'] -> c
front
    go ![item'] -> c
front (item
x : [item]
xs) = do
        Maybe item'
my <- item -> IO (Maybe item')
f item
x
        let front' :: [item'] -> c
front' =
                case Maybe item'
my of
                    Maybe item'
Nothing -> [item'] -> c
front
                    Just item'
y -> [item'] -> c
front ([item'] -> c) -> ([item'] -> [item']) -> [item'] -> c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (item'
y item' -> [item'] -> [item']
forall a. a -> [a] -> [a]
:)
        ([item'] -> c) -> [item] -> IO ([item'] -> c)
go [item'] -> c
front' [item]
xs

-- $example1
-- In this example code, we use a 'Data.Map.Strict.Map' to cache fibonacci numbers, and a 'Reaper' to prune the cache.
--
-- NOTE: When using this module as a cache you should keep in mind that while
-- the reaper thread is active running your "reaperAction", the cache will
-- appear empty to concurrently running threads.  Any newly created cache
-- entries will be on the temporary worklist, and will merged back into the the
-- main cache only once the "reaperAction" completes (together with the portion
-- of the extant worklist that the @cleaner@ callback decided to retain).
--
-- If you're looking for a cache that supports concurrent purging of stale
-- items, but without exposing a transient empty cache during cleanup, this is
-- not the cache implementation you need.  This module was primarily designed
-- for cleaning up /stuck/ processes, or idle threads in a thread pool.  The cache
-- use-case was not a primary design focus.
--
-- The @main@ function first creates a 'Reaper', with fields to initialize the
-- cache ('reaperEmpty'), add items to it ('reaperCons'), and prune it ('reaperAction').
-- The reaper will run every two seconds ('reaperDelay'), but will stop running while
-- 'reaperNull' is true.
--
-- @main@ then loops infinitely ('Control.Monad.forever'). Each second it calculates the fibonacci number
-- for a value between 30 and 34, first trying the cache ('reaperRead' and 'Data.Map.Strict.lookup'),
-- then falling back to manually calculating it (@fib@)
-- and updating the cache with the result ('reaperAdd')
--
-- @clean@ simply removes items cached for more than 10 seconds.
-- This function is where you would perform IO-related cleanup,
-- like killing threads or closing connections, if that was the purpose of your reaper.
--
-- @
-- module Main where
--
-- import "Data.Time" (UTCTime, getCurrentTime, diffUTCTime)
-- import "Control.Reaper"
-- import "Control.Concurrent" (threadDelay)
-- import "Data.Map.Strict" (Map)
-- import qualified "Data.Map.Strict" as Map
-- import "Control.Monad" (forever)
-- import "System.Random" (getStdRandom, randomR)
--
-- fib :: 'Int' -> 'Int'
-- fib 0 = 0
-- fib 1 = 1
-- fib n = fib (n-1) + fib (n-2)
--
-- type Cache = 'Data.Map.Strict.Map' 'Int' ('Int', 'Data.Time.Clock.UTCTime')
--
-- main :: IO ()
-- main = do
--   reaper <- 'mkReaper' 'defaultReaperSettings'
--     { 'reaperEmpty' = Map.'Data.Map.Strict.empty'
--     , 'reaperCons' = \\(k, v, time) workload -> Map.'Data.Map.Strict.insert' k (v, time) workload
--     , 'reaperAction' = clean
--     , 'reaperDelay' = 1000000 * 2 -- Clean every 2 seconds
--     , 'reaperNull' = Map.'Data.Map.Strict.null'
--     }
--   forever $ do
--     fibArg <- 'System.Random.getStdRandom' ('System.Random.randomR' (30,34))
--     cache <- 'reaperRead' reaper
--     let cachedResult = Map.'Data.Map.Strict.lookup' fibArg cache
--     case cachedResult of
--       'Just' (fibResult, _createdAt) -> 'putStrLn' $ "Found in cache: `fib " ++ 'show' fibArg ++ "` " ++ 'show' fibResult
--       'Nothing' -> do
--         let fibResult = fib fibArg
--         'putStrLn' $ "Calculating `fib " ++ 'show' fibArg ++ "` " ++ 'show' fibResult
--         time <- 'Data.Time.Clock.getCurrentTime'
--         ('reaperAdd' reaper) (fibArg, fibResult, time)
--     'threadDelay' 1000000 -- 1 second
--
-- -- Remove items > 10 seconds old
-- clean :: Cache -> IO (Cache -> Cache)
-- clean oldMap = do
--   currentTime <- 'Data.Time.Clock.getCurrentTime'
--   let pruned = Map.'Data.Map.Strict.filter' (\\(_, createdAt) -> currentTime \`diffUTCTime\` createdAt < 10.0) oldMap
--   return (\\newData -> Map.'Data.Map.Strict.union' pruned newData)
-- @