{-# LANGUAGE BangPatterns, DeriveDataTypeable, GeneralizedNewtypeDeriving, CPP #-}
module Network.Riak.Resolvable.Internal
(
Resolvable(..)
, ResolvableMonoid(..)
, ResolutionFailure(..)
, get
, getMany
, modify
, modify_
, put
, put_
, putMany
, putMany_
) where
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative ((<$>))
#endif
import Control.Arrow (first)
import Control.Exception (Exception, throwIO)
import Control.Monad (unless)
import Control.Monad.IO.Class
import Data.Aeson.Types (FromJSON, ToJSON)
import Data.Data (Data)
import Data.Either (partitionEithers)
import Data.Function (on)
import Data.List (foldl', sortBy)
import Data.Maybe (isJust)
#if __GLASGOW_HASKELL__ < 710
import Data.Monoid (Monoid(mappend))
#endif
import Data.Semigroup (Semigroup)
import Data.Typeable (Typeable)
import Network.Riak.Debug (debugValues)
import Network.Riak.Types.Internal hiding (MessageTag(..))
data ResolutionFailure = RetriesExceeded
deriving (Eq, Show, Typeable)
instance Exception ResolutionFailure
class (Show a) => Resolvable a where
resolve :: a -> a -> a
newtype ResolvableMonoid a = RM { unRM :: a }
deriving (Eq, Ord, Read, Show, Typeable, Data, Semigroup, Monoid, FromJSON, ToJSON)
instance (Show a, Monoid a) => Resolvable (ResolvableMonoid a) where
resolve = mappend
{-# INLINE resolve #-}
instance (Resolvable a) => Resolvable (Maybe a) where
resolve (Just a) (Just b) = Just (resolve a b)
resolve a@(Just _) _ = a
resolve _ b = b
{-# INLINE resolve #-}
type Get a = Connection -> Maybe BucketType -> Bucket -> Key -> R -> IO (Maybe ([a], VClock))
get :: (Resolvable a) => Get a
-> (Connection -> Maybe BucketType -> Bucket -> Key -> R -> IO (Maybe (a, VClock)))
get doGet conn btype bucket' key' r =
fmap (first resolveMany) `fmap` doGet conn btype bucket' key' r
{-# INLINE get #-}
getMany :: (Resolvable a) =>
(Connection -> Maybe BucketType -> Bucket -> [Key]
-> R -> IO [Maybe ([a], VClock)])
-> Connection -> Maybe BucketType -> Bucket -> [Key]
-> R -> IO [Maybe (a, VClock)]
getMany doGet conn bt b ks r =
map (fmap (first resolveMany)) `fmap` doGet conn bt b ks r
{-# INLINE getMany #-}
type Put a = Connection -> Maybe BucketType -> Bucket -> Key -> Maybe VClock -> a -> W -> DW
-> IO ([a], VClock)
put :: (Resolvable a) => Put a
-> Connection -> Maybe BucketType -> Bucket -> Key -> Maybe VClock -> a -> W -> DW
-> IO (a, VClock)
put doPut conn btype bucket' key' mvclock0 val0 w dw = do
let go !i val mvclock
| i == maxRetries = throwIO RetriesExceeded
| otherwise = do
(xs, vclock) <- doPut conn btype bucket' key' mvclock val w dw
case xs of
[x] | i > 0 || isJust mvclock -> return (x, vclock)
(_:_) -> do debugValues "put" "conflict" xs
go (i+1) (resolveMany xs) (Just vclock)
[] -> unexError "Network.Riak.Resolvable" "put"
"received empty response from server"
go (0::Int) val0 mvclock0
{-# INLINE put #-}
maxRetries :: Int
maxRetries = 64
{-# INLINE maxRetries #-}
put_ :: (Resolvable a) =>
(Connection -> Maybe BucketType -> Bucket -> Key -> Maybe VClock -> a -> W -> DW
-> IO ([a], VClock))
-> Connection -> Maybe BucketType -> Bucket -> Key -> Maybe VClock -> a -> W -> DW
-> IO ()
put_ doPut conn btype bucket' key' mvclock0 val0 w dw =
put doPut conn btype bucket' key' mvclock0 val0 w dw >> return ()
{-# INLINE put_ #-}
modify :: (MonadIO m, Resolvable a) => Get a -> Put a
-> Connection -> Maybe BucketType -> Bucket
-> Key -> R -> W -> DW -> (Maybe a -> m (a,b))
-> m (a,b)
modify doGet doPut conn btype bucket' key' r w dw act = do
a0 <- liftIO $ get doGet conn btype bucket' key' r
(a,b) <- act (fst <$> a0)
(a',_) <- liftIO $ put doPut conn btype bucket' key' (snd <$> a0) a w dw
return (a',b)
{-# INLINE modify #-}
modify_ :: (MonadIO m, Resolvable a) => Get a -> Put a
-> Connection -> Maybe BucketType -> Bucket
-> Key -> R -> W -> DW -> (Maybe a -> m a)
-> m a
modify_ doGet doPut conn btype bucket' key' r w dw act = do
a0 <- liftIO $ get doGet conn btype bucket' key' r
a <- act (fst <$> a0)
liftIO $ fst <$> put doPut conn btype bucket' key' (snd <$> a0) a w dw
{-# INLINE modify_ #-}
putMany :: (Resolvable a) =>
(Connection -> Maybe BucketType -> Bucket -> [(Key, Maybe VClock, a)] -> W -> DW
-> IO [([a], VClock)])
-> Connection -> Maybe BucketType -> Bucket -> [(Key, Maybe VClock, a)] -> W -> DW
-> IO [(a, VClock)]
putMany doPut conn btype bucket' puts0 w dw = go (0::Int) [] . zip [(0::Int)..] $ puts0
where
go _ acc [] = return . map snd . sortBy (compare `on` fst) $ acc
go !i acc puts
| i == maxRetries = throwIO RetriesExceeded
| otherwise = do
rs <- doPut conn btype bucket' (map snd puts) w dw
let (conflicts, ok) = partitionEithers $ zipWith mush puts rs
unless (null conflicts) $
debugValues "putMany" "conflicts" conflicts
go (i+1) (ok++acc) conflicts
mush (i,(k,mv,_)) (cs,v) =
case cs of
[x] | i > 0 || isJust mv -> Right (i,(x,v))
(_:_) -> Left (i,(k,Just v, resolveMany cs))
[] -> unexError "Network.Riak.Resolvable" "put"
"received empty response from server"
{-# INLINE putMany #-}
putMany_ :: (Resolvable a) =>
(Connection -> Maybe BucketType -> Bucket -> [(Key, Maybe VClock, a)] -> W -> DW
-> IO [([a], VClock)])
-> Connection -> Maybe BucketType -> Bucket
-> [(Key, Maybe VClock, a)] -> W -> DW -> IO ()
putMany_ doPut conn btype bucket' puts0 w dw =
putMany doPut conn btype bucket' puts0 w dw >> return ()
{-# INLINE putMany_ #-}
resolveMany' :: (Resolvable a) => a -> [a] -> a
resolveMany' = foldl' resolve
{-# INLINE resolveMany' #-}
resolveMany :: (Resolvable a) => [a] -> a
resolveMany (a:as) = resolveMany' a as
resolveMany _ = error "resolveMany: empty list"
{-# INLINE resolveMany #-}