{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module Network.Riak.Cluster
( Cluster(..)
, InClusterError(..)
, connectToCluster
, inCluster
, Riak.create
, Riak.defaultClient
) where
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMVar
import Control.Exception
import Control.Exception.Enclosed
import Control.Monad.Base (liftBase)
import Control.Monad.Catch (MonadThrow (..))
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Typeable
import Network.Riak (Connection)
import qualified Network.Riak as Riak
import qualified Network.Riak.Connection.Pool as Riak
import System.Random.Mersenne.Pure64
data Cluster = Cluster
{ clusterPools :: [Riak.Pool]
, clusterGen :: TMVar PureMT
}
data InClusterError = InClusterError [SomeException]
deriving (Show, Typeable)
instance Exception InClusterError
connectToCluster :: [Riak.Client] -> IO Cluster
connectToCluster clients = do
pools <- mapM (\c -> Riak.create c 1 10 20) clients
connectToClusterWithPools pools
connectToClusterWithPools :: [Riak.Pool] -> IO Cluster
connectToClusterWithPools pools = do
gen <- newPureMT
mt <- atomically (newTMVar gen)
return (Cluster pools mt)
inCluster :: (MonadThrow m, MonadBaseControl IO m)
=> Cluster -> (Connection -> m a) -> m a
inCluster Cluster{clusterPools=pools, clusterGen=tMT} f = do
rnd <- liftBase $ atomically $ do
mt <- takeTMVar tMT
let (i, mt') = randomInt mt
putTMVar tMT mt'
return i
let n = if null pools then 0 else rnd `mod` length pools
pools' = rotateL n pools
go pools' []
where
go [] errors = throwM (InClusterError errors)
go (p:ps) es =
catchAny (Riak.withConnectionM p f) (\e -> go ps (e:es))
rotateL :: Int -> [a] -> [a]
rotateL i xs = right ++ left
where
(left, right) = splitAt i xs