{-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE StrictData #-} {-# LANGUAGE TypeApplications #-} module Simpoole ( Pool , mapPool , newPool , withResource , acquireResource , returnResource , destroyResource , poolMetrics , Settings (..) , defaultSettings , ReturnPolicy (..) , Metrics (..) ) where import qualified Control.Concurrent.Classy as Concurrent import qualified Control.Concurrent.Classy.Async as Async import Control.Monad (forever, unless, void) import qualified Control.Monad.Catch as Catch import Control.Monad.IO.Class (MonadIO (liftIO)) import Data.Coerce (coerce) import Data.Foldable (for_) import qualified Data.Sequence as Seq import qualified Data.Time as Time import Numeric.Natural (Natural) import Simpoole.Internal (FailToIO (..), failToIO) -- | Strategy to use when returning resources to the pool -- -- @since 0.1.0 data ReturnPolicy = ReturnToFront -- ^ Return resources to the front. Resources that have been used recently are more likely to be -- reused again quicker. -- -- This strategy is good if you want to scale down the pool more quickly in case resources are not -- needed. -- -- @since 0.1.0 | ReturnToBack -- ^ Return resources to the back. Resources that have been used recently are less likely to be -- used again quicker. -- -- Use this strategy if you want to keep more resources in the pool fresh, or when maintaining the -- pool in order to be ready for burst workloads. -- -- Note: This strategy can lead to no resources ever being destroyed when all resources are -- repeatedly used within the idle timeout. -- -- @since 0.1.0 | ReturnToMiddle -- ^ Return resources to the middle. This offers a middleground between 'ReturnToFront' and -- 'ReturnToBack'. By ensuring that the starting sub-sequence of resources is reused quicker but -- the trailing sub-sequence is not and therefore released more easily. -- -- @since 0.1.0 deriving stock (Show, Read, Eq, Ord, Enum, Bounded) -- | Insert a value based on the return policy. applyReturnPolicy :: ReturnPolicy -> a -> Seq.Seq a -> Seq.Seq a applyReturnPolicy policy value seq = case policy of ReturnToFront -> value Seq.<| seq ReturnToBack -> seq Seq.|> value ReturnToMiddle -> Seq.insertAt middleIndex value seq where middleIndex | even (Seq.length seq) = div (Seq.length seq) 2 | otherwise = div (Seq.length seq) 2 + 1 -- | Lets you configure certain behaviours of the pool -- -- @since 0.1.0 data Settings = Settings -- ^ @since 0.2.0 { settings_idleTimeout :: Maybe Time.NominalDiffTime -- ^ Maximum idle time after which a resource is destroyed -- -- Setting it to @Nothing@ means that nothing will ever be destroyed based on idle time. -- -- @since 0.1.0 , settings_returnPolicy :: ReturnPolicy -- ^ Read documentation on 'ReturnPolicy' for details. -- -- @since 0.1.0 , settings_maxLiveLimit :: Maybe Int -- ^ Maximum number of resources that may live at the same time. -- -- @Nothing@ means unlimited. -- -- @since 0.2.0 } -- | Default pool settings -- -- @since 0.1.0 defaultSettings :: Settings defaultSettings = Settings { settings_idleTimeout = Just 60 -- 60 seconds , settings_returnPolicy = ReturnToMiddle , settings_maxLiveLimit = Nothing } -- | Pool of resources -- -- @since 0.0.0 data Pool m a = Pool { pool_acquire :: m a , pool_return :: a -> m () , pool_destroy :: a -> m () , pool_metrics :: m (Metrics Natural) } -- | Lift a natural transformation @m ~> n@ to @Pool m ~> Pool n@. -- -- @since 0.0.0 mapPool :: (forall x. m x -> n x) -> Pool m a -> Pool n a mapPool to pool = Pool { pool_acquire = to $ pool_acquire pool , pool_return = to . pool_return pool , pool_destroy = to . pool_destroy pool , pool_metrics = to $ pool_metrics pool } {-# INLINE mapPool #-} -- | Pool resource data Resource a = Resource Time.UTCTime -- ^ Last use time a -- ^ The resource item -- | Create a new pool that has no limit on how many resources it may create and hold. newUnlimitedPool :: (Concurrent.MonadConc m, MonadIO m) => m a -- ^ Resource creation -> (a -> m ()) -- ^ Resource destruction -> Settings -- ^ Pool settings -> m (Pool m a) newUnlimitedPool create destroy settings = do leftOversRef <- Concurrent.newIORefN "leftOvers" Seq.empty createdRef <- Concurrent.newIORefN "created" 0 destroyedRef <- Concurrent.newIORefN "destroyed" 0 maxLiveRef <- Concurrent.newIORefN "maxLive" 0 let getMetrics = do created <- Concurrent.readIORef createdRef destroyed <- Concurrent.readIORef destroyedRef maxLive <- Concurrent.readIORef maxLiveRef leftOvers <- Concurrent.readIORef leftOversRef pure Metrics { metrics_createdResources = created , metrics_destroyedResources = destroyed , metrics_maxLiveResources = maxLive , metrics_idleResources = fromIntegral (Seq.length leftOvers) } wrappedCreate = do value <- create succIORef createdRef pure value wrappedDestroy resource = destroy resource `Catch.finally` succIORef destroyedRef acquireResource = do mbResource <- Concurrent.atomicModifyIORef' leftOversRef $ \leftOvers -> case leftOvers of Resource _ head Seq.:<| tail -> (tail, Just head) _empty -> (leftOvers, Nothing) resource <- maybe wrappedCreate pure mbResource numDestroyed <- Concurrent.readIORef destroyedRef numCreated <- Concurrent.readIORef createdRef maxIORef maxLiveRef (numCreated - numDestroyed) pure resource returnResource value = do now <- liftIO Time.getCurrentTime Concurrent.atomicModifyIORef' leftOversRef $ \leftOvers -> ( applyReturnPolicy (settings_returnPolicy settings) (Resource now value) leftOvers , () ) for_ (settings_idleTimeout settings) $ \idleTimeout -> void $ Async.asyncWithUnmaskN "reaperThread" $ \unmask -> unmask $ forever $ do now <- liftIO Time.getCurrentTime let isStillGood (Resource lastUse _) = Time.diffUTCTime now lastUse <= idleTimeout oldResource <- Concurrent.atomicModifyIORef' leftOversRef (Seq.partition isStillGood) unless (null oldResource) $ void $ Async.asyncN "destructionThread" $ for_ oldResource $ \(Resource _ value) -> Catch.try @_ @Catch.SomeException $ wrappedDestroy value Concurrent.threadDelay 1_000_000 pure Pool { pool_acquire = acquireResource , pool_return = returnResource , pool_destroy = wrappedDestroy , pool_metrics = getMetrics } -- | Similar to 'newUnlimitedPool' but allows you to limit the number of resources that will exist -- at the same time. When all resources are currently in use, further resource acquisition will -- block until one is no longer in use. newLimitedPool :: (Concurrent.MonadConc m, MonadIO m, MonadFail m) => m a -- ^ Resource creation -> (a -> m ()) -- ^ Resource destruction -> Int -- ^ Maximum number of resources to exist at the same time -> Settings -- ^ Pool settings -> m (Pool m a) newLimitedPool create destroy maxElems settings = do basePool <- newUnlimitedPool create destroy settings maxElemBarrier <- Concurrent.newQSem maxElems let acquireResource = Catch.mask $ \restore -> do Concurrent.waitQSem maxElemBarrier restore (pool_acquire basePool) `Catch.onError` Concurrent.signalQSem maxElemBarrier giveBackResource f value = Catch.mask $ \restore -> restore (f basePool value) `Catch.finally` Concurrent.signalQSem maxElemBarrier pure Pool { pool_acquire = acquireResource , pool_return = giveBackResource pool_return , pool_destroy = giveBackResource pool_destroy , pool_metrics = pool_metrics basePool } -- | Create a new pool. -- -- @since 0.2.0 newPool :: (Concurrent.MonadConc m, MonadIO m) => m a -- ^ Resource creation -> (a -> m ()) -- ^ Resource destruction -> Settings -- ^ Pool settings -> m (Pool m a) newPool create destroy settings = case settings_maxLiveLimit settings of Just maxLive -> failToIO $ do -- Translate the MonadFail constraint to MonadIO. pool <- newLimitedPool (coerce create) (coerce destroy) maxLive settings pure (mapPool coerce pool) Nothing -> newUnlimitedPool create destroy settings -- | Use a resource from the pool. Once the continuation returns, the resource will be returned to -- the pool. If the given continuation throws an error then the acquired resource will be destroyed -- instead. -- -- @since 0.0.0 withResource :: Catch.MonadMask m => Pool m a -> (a -> m r) -> m r withResource pool f = Catch.mask $ \restore -> do resource <- restore (pool_acquire pool) result <- restore (f resource) `Catch.onError` pool_destroy pool resource pool_return pool resource pure result {-# INLINE withResource #-} -- | Acquire a resource. -- -- @since 0.1.0 acquireResource :: Pool m a -> m a acquireResource = pool_acquire {-# INLINE acquireResource #-} -- | Return a resource to the pool. -- -- @since 0.1.0 returnResource :: Pool m a -> a -> m () returnResource = pool_return {-# INLINE returnResource #-} -- | Destroy a resource. -- -- @since 0.1.0 destroyResource :: Pool m a -> a -> m () destroyResource = pool_destroy {-# INLINE destroyResource #-} -- | Fetch pool metrics. -- -- @since 0.0.0 poolMetrics :: Pool m a -> m (Metrics Natural) poolMetrics = pool_metrics {-# INLINE poolMetrics #-} --- -- | Pool metrics -- -- @since 0.0.0 data Metrics a = Metrics { metrics_createdResources :: a -- ^ Total number of resources created -- -- @since 0.0.0 , metrics_destroyedResources :: a -- ^ Total number of resources destroyed -- -- @since 0.0.0 , metrics_maxLiveResources :: a -- ^ Maximum number of resources that were alive simultaneously -- -- @since 0.0.0 , metrics_idleResources :: a -- ^ Number of resources currently idle -- -- @since 0.1.0 } deriving stock (Show, Read, Eq, Ord, Functor, Foldable, Traversable) -- | Increase a value held by an IORef by one. succIORef :: (Concurrent.MonadConc m, Enum a) => Concurrent.IORef m a -> m () succIORef ref = Concurrent.atomicModifyIORef' ref (\x -> (succ x, ())) -- | Replace the value in an IORef with the given value if the latter is greater. maxIORef :: (Concurrent.MonadConc m, Ord a) => Concurrent.IORef m a -> a -> m () maxIORef ref y = Concurrent.atomicModifyIORef' ref (\x -> (max x y, ()))