{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module Network.EtcdV3
(
etcdClientConfigSimple
, EtcdQuery
, KeyRange(..)
, range
, rangeReq
, rangeResponsePairs
, grantLease
, GrantedLease
, fromLeaseGrantResponse
, keepAlive
, put
, putReq
, delete
, delReq
, AcquiredLock
, fromLockResponse
, lock
, unlock
, transaction
, successTxnReq
, kvEq
, kvNeq
, kvGt
, kvLt
, watchForever
, watchReq
, Election(..)
, LeaderEvidence
, runForLeadership
, fromCampaignResponse
, proclaim
, resign
, getProclaimedValue
, observeProclaimedValues
, proclaimReq
, campaignReq
, resignReq
, leaderReq
, defMessage
, module Control.Lens
) where
import Control.Lens
import Control.Exception (throwIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as C8
import Data.ProtoLens.Message (defMessage)
import Data.Semigroup (Endo)
import Data.String (IsString)
import GHC.Int (Int64)
import Network.GRPC.Client
import Network.GRPC.Client.Helpers
import Network.HTTP2.Client (TooMuchConcurrency)
import Network.Socket (HostName, PortNumber)
import qualified Proto.Etcd.Etcdserver.Etcdserverpb.Rpc as EtcdRPC
import qualified Proto.Etcd.Etcdserver.Etcdserverpb.Rpc_Fields as EtcdPB
import qualified Proto.Etcd.Etcdserver.Api.V3lock.V3lockpb.V3lock as LockRPC
import qualified Proto.Etcd.Etcdserver.Api.V3lock.V3lockpb.V3lock_Fields as LockPB
import qualified Proto.Etcd.Etcdserver.Api.V3election.V3electionpb.V3election as ElectionRPC
import qualified Proto.Etcd.Etcdserver.Api.V3election.V3electionpb.V3election_Fields as ElectionPB
etcdClientConfigSimple :: HostName -> PortNumber -> UseTlsOrNot -> GrpcClientConfig
etcdClientConfigSimple host port tls =
(grpcClientConfigSimple host port tls) { _grpcClientConfigCompression = uncompressed }
type EtcdQuery a = IO (Maybe a)
data KeyRange
= SingleKey !ByteString
| FromKey !ByteString
| Prefixed !ByteString
deriving (Show, Eq, Ord)
range
:: GrpcClient
-> KeyRange
-> EtcdQuery EtcdRPC.RangeResponse
range grpc r = preview unaryOutput <$>
rawUnary (RPC :: RPC EtcdRPC.KV "range") grpc (rangeReq r)
rangeReq :: KeyRange -> EtcdRPC.RangeRequest
rangeReq r = defMessage
& EtcdPB.key .~ k0
& EtcdPB.rangeEnd .~ kend
where
(k0, kend) = rangePairForRangeQuery r
rangeResponsePairs
:: Getting (Endo [(ByteString, ByteString)]) EtcdRPC.RangeResponse (ByteString, ByteString)
rangeResponsePairs = EtcdPB.kvs . traverse . to (\x -> (x ^. EtcdPB.key, x ^. EtcdPB.value))
rangePairForRangeQuery :: KeyRange -> (ByteString, ByteString)
rangePairForRangeQuery (SingleKey k) = (k, "")
rangePairForRangeQuery (FromKey k) = (k, "\NUL")
rangePairForRangeQuery (Prefixed k) = (k, kPlus1)
where
rest = C8.dropWhile (== '\xff') $ C8.reverse k
kPlus1 = if C8.null rest then "\NUL" else C8.reverse $ C8.cons (succ (C8.head rest)) (C8.drop 1 rest)
grantLease
:: GrpcClient
-> Int64
-> EtcdQuery EtcdRPC.LeaseGrantResponse
grantLease grpc seconds =
preview unaryOutput <$> rawUnary (RPC :: RPC EtcdRPC.Lease "leaseGrant") grpc (defMessage & EtcdPB.ttl .~ seconds)
newtype GrantedLease = GrantedLease { _getGrantedLeaseId :: Int64 }
instance Show GrantedLease where
show (GrantedLease n) = "(lease #" <> show n <> ")"
fromLeaseGrantResponse :: EtcdRPC.LeaseGrantResponse -> GrantedLease
fromLeaseGrantResponse r = GrantedLease $ r ^. EtcdPB.id
keepAlive
:: GrpcClient
-> GrantedLease
-> EtcdQuery EtcdRPC.LeaseKeepAliveResponse
keepAlive grpc (GrantedLease leaseID) =
preview unaryOutput <$> rawUnary (RPC :: RPC EtcdRPC.Lease "leaseKeepAlive") grpc (defMessage & EtcdPB.id .~ leaseID)
put
:: GrpcClient
-> ByteString
-> ByteString
-> Maybe GrantedLease
-> EtcdQuery EtcdRPC.PutResponse
put grpc k v gl =
preview unaryOutput <$> rawUnary (RPC :: RPC EtcdRPC.KV "put") grpc (putReq k v gl)
putReq :: ByteString -> ByteString -> Maybe GrantedLease -> EtcdRPC.PutRequest
putReq k v gl = defMessage
& EtcdPB.key .~ k
& EtcdPB.value .~ v
& EtcdPB.lease .~ (maybe 0 _getGrantedLeaseId gl)
delete
:: GrpcClient
-> KeyRange
-> EtcdQuery EtcdRPC.DeleteRangeResponse
delete grpc r = preview unaryOutput <$>
rawUnary (RPC :: RPC EtcdRPC.KV "deleteRange") grpc (delReq r)
delReq :: KeyRange -> EtcdRPC.DeleteRangeRequest
delReq r = defMessage
& EtcdPB.key .~ k0
& EtcdPB.rangeEnd .~ kend
where
(k0, kend) = rangePairForRangeQuery r
newtype AcquiredLock = AcquiredLock { _getAcquiredLock :: ByteString }
instance Show AcquiredLock where
show (AcquiredLock n) = "(lock #" <> show n <> ")"
fromLockResponse :: LockRPC.LockResponse -> AcquiredLock
fromLockResponse l = AcquiredLock $ l ^. LockPB.key
lock
:: GrpcClient
-> ByteString
-> GrantedLease
-> EtcdQuery LockRPC.LockResponse
lock grpc n (GrantedLease leaseID) = preview unaryOutput <$>
rawUnary (RPC :: RPC LockRPC.Lock "lock") grpc (defMessage & LockPB.name .~ n & LockPB.lease .~ leaseID)
unlock
:: GrpcClient
-> AcquiredLock
-> EtcdQuery LockRPC.UnlockResponse
unlock grpc (AcquiredLock k) = preview unaryOutput <$>
rawUnary (RPC :: RPC LockRPC.Lock "unlock") grpc (defMessage & LockPB.key .~ k)
kvEq :: ByteString -> ByteString -> EtcdRPC.Compare
kvEq k v = defMessage
& EtcdPB.target .~ EtcdRPC.Compare'VALUE
& EtcdPB.result .~ EtcdRPC.Compare'EQUAL
& EtcdPB.key .~ k
& EtcdPB.value .~ v
kvNeq :: ByteString -> ByteString -> EtcdRPC.Compare
kvNeq k v = defMessage
& EtcdPB.target .~ EtcdRPC.Compare'VALUE
& EtcdPB.result .~ EtcdRPC.Compare'NOT_EQUAL
& EtcdPB.key .~ k
& EtcdPB.value .~ v
kvGt :: ByteString -> ByteString -> EtcdRPC.Compare
kvGt k v = defMessage
& EtcdPB.target .~ EtcdRPC.Compare'VALUE
& EtcdPB.result .~ EtcdRPC.Compare'GREATER
& EtcdPB.key .~ k
& EtcdPB.value .~ v
kvLt :: ByteString -> ByteString -> EtcdRPC.Compare
kvLt k v = defMessage
& EtcdPB.target .~ EtcdRPC.Compare'VALUE
& EtcdPB.result .~ EtcdRPC.Compare'LESS
& EtcdPB.key .~ k
& EtcdPB.value .~ v
successTxnReq
:: [EtcdRPC.Compare]
-> [EtcdRPC.PutRequest]
-> [EtcdRPC.DeleteRangeRequest]
-> [EtcdRPC.RangeRequest]
-> EtcdRPC.TxnRequest
successTxnReq cmps rps drrs rrs = defMessage
& EtcdPB.compare .~ cmps
& EtcdPB.success .~ (mconcat [
[ defMessage & EtcdPB.maybe'requestPut ?~ rp | rp <- rps ]
, [ defMessage & EtcdPB.maybe'requestDeleteRange ?~ drr | drr <- drrs ]
, [ defMessage & EtcdPB.maybe'requestRange ?~ rr | rr <- rrs ]
])
transaction
:: GrpcClient
-> EtcdRPC.TxnRequest
-> EtcdQuery EtcdRPC.TxnResponse
transaction grpc tx = preview unaryOutput <$>
rawUnary (RPC :: RPC EtcdRPC.KV "txn") grpc tx
watchReq
:: KeyRange
-> [EtcdRPC.WatchCreateRequest'FilterType]
-> Int64
-> EtcdRPC.WatchRequest
watchReq r fs wId = defMessage
& EtcdPB.maybe'createRequest ?~ (defMessage
& EtcdPB.key .~ k0
& EtcdPB.rangeEnd .~ kend
& EtcdPB.filters .~ fs
& EtcdPB.fragment .~ True
& EtcdPB.watchId .~ wId)
where
(k0, kend) = rangePairForRangeQuery r
watchForever
:: GrpcClient
-> [EtcdRPC.WatchRequest]
-> (a -> EtcdRPC.WatchResponse -> IO a)
-> a
-> IO (Either TooMuchConcurrency ())
watchForever grpc wcs handle v0 = do
fmap (const ()) <$> rawGeneralStream (RPC :: RPC EtcdRPC.Watch "watch") grpc v0 handleWatch wcs registerAndWait
where
handleWatch v (RecvMessage msg) = handle v msg
handleWatch _ (Invalid err) = throwIO err
handleWatch v _ = pure v
registerAndWait (x:xs) = pure (xs, SendMessage Compressed x)
registerAndWait [] = pure ([], Finalize)
newtype Election = Election { _getElectionName :: ByteString }
deriving (Show, Eq, Ord, IsString)
newtype LeaderEvidence = LeaderEvidence { _getLeaderKey :: ElectionRPC.LeaderKey }
runForLeadership
:: GrpcClient
-> Election
-> GrantedLease
-> ByteString
-> EtcdQuery ElectionRPC.CampaignResponse
runForLeadership grpc el gl v = preview unaryOutput <$>
rawUnary (RPC :: RPC ElectionRPC.Election "campaign") grpc (campaignReq el gl v)
campaignReq
:: Election
-> GrantedLease
-> ByteString
-> ElectionRPC.CampaignRequest
campaignReq el gl v = defMessage
& ElectionPB.name .~ _getElectionName el
& ElectionPB.lease .~ _getGrantedLeaseId gl
& ElectionPB.value .~ v
fromCampaignResponse :: ElectionRPC.CampaignResponse -> Maybe LeaderEvidence
fromCampaignResponse cr = cr ^? ElectionPB.leader . to LeaderEvidence
proclaim
:: GrpcClient
-> LeaderEvidence
-> ByteString
-> EtcdQuery ElectionRPC.ProclaimResponse
proclaim grpc le v = preview unaryOutput <$>
rawUnary (RPC :: RPC ElectionRPC.Election "proclaim") grpc (proclaimReq le v)
proclaimReq
:: LeaderEvidence
-> ByteString
-> ElectionRPC.ProclaimRequest
proclaimReq le v = defMessage
& ElectionPB.leader .~ _getLeaderKey le
& ElectionPB.value .~ v
resign
:: GrpcClient
-> LeaderEvidence
-> EtcdQuery ElectionRPC.ResignResponse
resign grpc le = preview unaryOutput <$>
rawUnary (RPC :: RPC ElectionRPC.Election "resign") grpc (resignReq le)
resignReq
:: LeaderEvidence
-> ElectionRPC.ResignRequest
resignReq le = defMessage
& ElectionPB.leader .~ _getLeaderKey le
getProclaimedValue
:: GrpcClient
-> Election
-> EtcdQuery ElectionRPC.LeaderResponse
getProclaimedValue grpc el = preview unaryOutput <$>
rawUnary (RPC :: RPC ElectionRPC.Election "leader") grpc (leaderReq el)
leaderReq
:: Election
-> ElectionRPC.LeaderRequest
leaderReq el = defMessage
& ElectionPB.name .~ _getElectionName el
observeProclaimedValues
:: GrpcClient
-> Election
-> (a -> ElectionRPC.LeaderResponse -> IO a)
-> a
-> IO (Either TooMuchConcurrency a)
observeProclaimedValues grpc el handler v0 = fmap (view _1) <$>
rawStreamServer (RPC :: RPC ElectionRPC.Election "observe") grpc v0 (observeReq el) handle2
where
observeReq = leaderReq
handle2 v1 _ msg = handler v1 msg