module Network.Riak.Resolvable.Internal
(
Resolvable(..)
, ResolvableMonoid(..)
, ResolutionFailure(..)
, get
, getMany
, modify
, modify_
, put
, put_
, putMany
, putMany_
) where
import Control.Applicative ((<$>))
import Control.Arrow (first)
import Control.Exception (Exception, throwIO)
import Control.Monad (unless)
import Data.Aeson.Types (FromJSON, ToJSON)
import Data.Data (Data)
import Data.Either (partitionEithers)
import Data.Function (on)
import Data.List (foldl', sortBy)
import Data.Maybe (isJust)
import Data.Monoid (Monoid(mappend))
import Data.Typeable (Typeable)
import Network.Riak.Debug (debugValues)
import Network.Riak.Types.Internal hiding (MessageTag(..))
data ResolutionFailure = RetriesExceeded
deriving (Eq, Show, Typeable)
instance Exception ResolutionFailure
class (Show a) => Resolvable a where
resolve :: a -> a -> a
newtype ResolvableMonoid a = RM { unRM :: a }
deriving (Eq, Ord, Read, Show, Typeable, Data, Monoid, FromJSON, ToJSON)
instance (Eq a, Show a, Monoid a) => Resolvable (ResolvableMonoid a) where
resolve = mappend
instance (Resolvable a) => Resolvable (Maybe a) where
resolve (Just a) (Just b) = Just (resolve a b)
resolve a@(Just _) _ = a
resolve _ b = b
type Get a = Connection -> Bucket -> Key -> R -> IO (Maybe ([a], VClock))
get :: (Resolvable a) => Get a
-> (Connection -> Bucket -> Key -> R -> IO (Maybe (a, VClock)))
get doGet conn bucket key r =
fmap (first resolveMany) `fmap` doGet conn bucket key r
getMany :: (Resolvable a) =>
(Connection -> Bucket -> [Key] -> R -> IO [Maybe ([a], VClock)])
-> Connection -> Bucket -> [Key] -> R -> IO [Maybe (a, VClock)]
getMany doGet conn b ks r =
map (fmap (first resolveMany)) `fmap` doGet conn b ks r
type Put a = Connection -> Bucket -> Key -> Maybe VClock -> a -> W -> DW
-> IO ([a], VClock)
put :: (Resolvable a) => Put a
-> Connection -> Bucket -> Key -> Maybe VClock -> a -> W -> DW
-> IO (a, VClock)
put doPut conn bucket key mvclock0 val0 w dw = do
let go !i val mvclock
| i == maxRetries = throwIO RetriesExceeded
| otherwise = do
(xs, vclock) <- doPut conn bucket key mvclock val w dw
case xs of
[x] | i > 0 || isJust mvclock -> return (x, vclock)
(_:_) -> do debugValues "put" "conflict" xs
go (i+1) (resolveMany' val xs) (Just vclock)
[] -> unexError "Network.Riak.Resolvable" "put"
"received empty response from server"
go (0::Int) val0 mvclock0
maxRetries :: Int
maxRetries = 64
put_ :: (Resolvable a) =>
(Connection -> Bucket -> Key -> Maybe VClock -> a -> W -> DW
-> IO ([a], VClock))
-> Connection -> Bucket -> Key -> Maybe VClock -> a -> W -> DW
-> IO ()
put_ doPut conn bucket key mvclock0 val0 w dw =
put doPut conn bucket key mvclock0 val0 w dw >> return ()
modify :: (Resolvable a) => Get a -> Put a
-> Connection -> Bucket -> Key -> R -> W -> DW -> (Maybe a -> IO (a,b))
-> IO (a,b)
modify doGet doPut conn bucket key r w dw act = do
a0 <- get doGet conn bucket key r
(a,b) <- act (fst <$> a0)
(a',_) <- put doPut conn bucket key (snd <$> a0) a w dw
return (a',b)
modify_ :: (Resolvable a) => Get a -> Put a
-> Connection -> Bucket -> Key -> R -> W -> DW -> (Maybe a -> IO a)
-> IO a
modify_ doGet doPut conn bucket key r w dw act = do
a0 <- get doGet conn bucket key r
a <- act (fst <$> a0)
fst <$> put doPut conn bucket key (snd <$> a0) a w dw
putMany :: (Resolvable a) =>
(Connection -> Bucket -> [(Key, Maybe VClock, a)] -> W -> DW
-> IO [([a], VClock)])
-> Connection -> Bucket -> [(Key, Maybe VClock, a)] -> W -> DW
-> IO [(a, VClock)]
putMany doPut conn bucket puts0 w dw = go (0::Int) [] . zip [(0::Int)..] $ puts0
where
go _ acc [] = return . map snd . sortBy (compare `on` fst) $ acc
go !i acc puts
| i == maxRetries = throwIO RetriesExceeded
| otherwise = do
rs <- doPut conn bucket (map snd puts) w dw
let (conflicts, ok) = partitionEithers $ zipWith mush puts rs
unless (null conflicts) $
debugValues "putMany" "conflicts" conflicts
go (i+1) (ok++acc) conflicts
mush (i,(k,mv,c)) (cs,v) =
case cs of
[x] | i > 0 || isJust mv -> Right (i,(x,v))
(_:_) -> Left (i,(k,Just v, resolveMany' c cs))
[] -> unexError "Network.Riak.Resolvable" "put"
"received empty response from server"
putMany_ :: (Resolvable a) =>
(Connection -> Bucket -> [(Key, Maybe VClock, a)] -> W -> DW
-> IO [([a], VClock)])
-> Connection -> Bucket -> [(Key, Maybe VClock, a)] -> W -> DW -> IO ()
putMany_ doPut conn bucket puts0 w dw =
putMany doPut conn bucket puts0 w dw >> return ()
resolveMany' :: (Resolvable a) => a -> [a] -> a
resolveMany' = foldl' resolve
resolveMany :: (Resolvable a) => [a] -> a
resolveMany (a:as) = resolveMany' a as
resolveMany _ = error "resolveMany: empty list"