{-# LANGUAGE LambdaCase, OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE TypeFamilies #-}
module Database.Franz.Client
( FranzPath(..)
, fromFranzPath
, toFranzPath
, defaultPort
, Connection
, withConnection
, connect
, disconnect
, StreamName(..)
, Query(..)
, ItemRef(..)
, RequestType(..)
, defQuery
, Response
, awaitResponse
, Contents
, fetch
, fetchSimple
, atomicallyWithin
, FranzException(..)) where
import Control.Arrow ((&&&))
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.Delay (newDelay, waitDelay)
import Control.Exception
import Control.Monad
import qualified Data.ByteString.Char8 as B
import Data.ConcurrentResourceMap
import Data.IORef
import Data.IORef.Unboxed
import qualified Data.IntMap.Strict as IM
import Data.Serialize hiding (getInt64le)
import Database.Franz.Internal.Contents
import Database.Franz.Internal.Fuse
import Database.Franz.Internal.IO
import Database.Franz.Internal.Protocol
import Database.Franz.Internal.Reader
import Database.Franz.Internal.URI
import qualified Network.Socket as S
import qualified Network.Socket.ByteString as SB
import System.Process (ProcessHandle)
import System.Directory
import System.FilePath
import System.IO.Temp
newtype ConnStateMap v = ConnStateMap (IM.IntMap v)
instance ResourceMap ConnStateMap where
type Key ConnStateMap = Int
empty :: ConnStateMap v
empty = IntMap v -> ConnStateMap v
forall v. IntMap v -> ConnStateMap v
ConnStateMap IntMap v
forall a. IntMap a
IM.empty
delete :: Key ConnStateMap -> ConnStateMap v -> ConnStateMap v
delete Key ConnStateMap
k (ConnStateMap IntMap v
m) = IntMap v -> ConnStateMap v
forall v. IntMap v -> ConnStateMap v
ConnStateMap (Key -> IntMap v -> IntMap v
forall a. Key -> IntMap a -> IntMap a
IM.delete Key
Key ConnStateMap
k IntMap v
m)
insert :: Key ConnStateMap -> v -> ConnStateMap v -> ConnStateMap v
insert Key ConnStateMap
k v
v (ConnStateMap IntMap v
m) = IntMap v -> ConnStateMap v
forall v. IntMap v -> ConnStateMap v
ConnStateMap (Key -> v -> IntMap v -> IntMap v
forall a. Key -> a -> IntMap a -> IntMap a
IM.insert Key
Key ConnStateMap
k v
v IntMap v
m)
lookup :: Key ConnStateMap -> ConnStateMap v -> Maybe v
lookup Key ConnStateMap
k (ConnStateMap IntMap v
m) = Key -> IntMap v -> Maybe v
forall a. Key -> IntMap a -> Maybe a
IM.lookup Key
Key ConnStateMap
k IntMap v
m
data Connection = Connection
{ Connection -> MVar Socket
connSocket :: MVar S.Socket
, Connection -> Counter
connReqId :: !Counter
, Connection
-> ConcurrentResourceMap
ConnStateMap (TVar (ResponseStatus Contents))
connStates :: !(ConcurrentResourceMap
ConnStateMap
(TVar (ResponseStatus Contents)))
, Connection -> ThreadId
connThread :: !ThreadId
}
| LocalConnection
{ Connection -> FilePath
connDir :: FilePath
, Connection -> FranzReader
connReader :: FranzReader
, Connection -> Maybe ProcessHandle
connFuse :: Maybe ProcessHandle
}
data ResponseStatus a = WaitingInstant
| WaitingDelayed
| Errored !FranzException
| Available !a
| RequestFinished
deriving (Key -> ResponseStatus a -> ShowS
[ResponseStatus a] -> ShowS
ResponseStatus a -> FilePath
(Key -> ResponseStatus a -> ShowS)
-> (ResponseStatus a -> FilePath)
-> ([ResponseStatus a] -> ShowS)
-> Show (ResponseStatus a)
forall a. Show a => Key -> ResponseStatus a -> ShowS
forall a. Show a => [ResponseStatus a] -> ShowS
forall a. Show a => ResponseStatus a -> FilePath
forall a.
(Key -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ResponseStatus a] -> ShowS
$cshowList :: forall a. Show a => [ResponseStatus a] -> ShowS
show :: ResponseStatus a -> FilePath
$cshow :: forall a. Show a => ResponseStatus a -> FilePath
showsPrec :: Key -> ResponseStatus a -> ShowS
$cshowsPrec :: forall a. Show a => Key -> ResponseStatus a -> ShowS
Show, a -> ResponseStatus b -> ResponseStatus a
(a -> b) -> ResponseStatus a -> ResponseStatus b
(forall a b. (a -> b) -> ResponseStatus a -> ResponseStatus b)
-> (forall a b. a -> ResponseStatus b -> ResponseStatus a)
-> Functor ResponseStatus
forall a b. a -> ResponseStatus b -> ResponseStatus a
forall a b. (a -> b) -> ResponseStatus a -> ResponseStatus b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> ResponseStatus b -> ResponseStatus a
$c<$ :: forall a b. a -> ResponseStatus b -> ResponseStatus a
fmap :: (a -> b) -> ResponseStatus a -> ResponseStatus b
$cfmap :: forall a b. (a -> b) -> ResponseStatus a -> ResponseStatus b
Functor)
withConnection :: FranzPath -> (Connection -> IO r) -> IO r
withConnection :: FranzPath -> (Connection -> IO r) -> IO r
withConnection FranzPath
path = IO Connection
-> (Connection -> IO ()) -> (Connection -> IO r) -> IO r
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (FranzPath -> IO Connection
connect FranzPath
path) Connection -> IO ()
disconnect
connect :: FranzPath -> IO Connection
connect :: FranzPath -> IO Connection
connect (FranzPath FilePath
host PortNumber
port FilePath
dir) = do
let hints :: AddrInfo
hints = AddrInfo
S.defaultHints { addrFlags :: [AddrInfoFlag]
S.addrFlags = [AddrInfoFlag
S.AI_NUMERICSERV], addrSocketType :: SocketType
S.addrSocketType = SocketType
S.Stream }
AddrInfo
addr:[AddrInfo]
_ <- Maybe AddrInfo -> Maybe FilePath -> Maybe FilePath -> IO [AddrInfo]
S.getAddrInfo (AddrInfo -> Maybe AddrInfo
forall a. a -> Maybe a
Just AddrInfo
hints) (FilePath -> Maybe FilePath
forall a. a -> Maybe a
Just FilePath
host) (FilePath -> Maybe FilePath
forall a. a -> Maybe a
Just (FilePath -> Maybe FilePath) -> FilePath -> Maybe FilePath
forall a b. (a -> b) -> a -> b
$ PortNumber -> FilePath
forall a. Show a => a -> FilePath
show PortNumber
port)
Socket
sock <- Family -> SocketType -> ProtocolNumber -> IO Socket
S.socket (AddrInfo -> Family
S.addrFamily AddrInfo
addr) SocketType
S.Stream (AddrInfo -> ProtocolNumber
S.addrProtocol AddrInfo
addr)
Socket -> SocketOption -> Key -> IO ()
S.setSocketOption Socket
sock SocketOption
S.NoDelay Key
1
Socket -> SockAddr -> IO ()
S.connect Socket
sock (SockAddr -> IO ()) -> SockAddr -> IO ()
forall a b. (a -> b) -> a -> b
$ AddrInfo -> SockAddr
S.addrAddress AddrInfo
addr
Socket -> ByteString -> IO ()
SB.sendAll Socket
sock (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> ByteString
forall a. Serialize a => a -> ByteString
encode FilePath
dir
ByteString
readyMsg <- Socket -> Key -> IO ByteString
SB.recv Socket
sock Key
4096
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString
readyMsg ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
apiVersion) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ case ByteString -> Either FilePath ResponseHeader
forall a. Serialize a => ByteString -> Either FilePath a
decode ByteString
readyMsg of
Right (ResponseError Key
_ FranzException
e) -> FranzException -> IO ()
forall e a. Exception e => e -> IO a
throwIO FranzException
e
Either FilePath ResponseHeader
e -> FranzException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO ()) -> FranzException -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
ClientError (FilePath -> FranzException) -> FilePath -> FranzException
forall a b. (a -> b) -> a -> b
$ FilePath
"Database.Franz.Network.connect: Unexpected response: " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Either FilePath ResponseHeader -> FilePath
forall a. Show a => a -> FilePath
show Either FilePath ResponseHeader
e
MVar Socket
connSocket <- Socket -> IO (MVar Socket)
forall a. a -> IO (MVar a)
newMVar Socket
sock
Counter
connReqId <- Key -> IO Counter
newCounter Key
0
ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connStates <- IO
(ConcurrentResourceMap
ConnStateMap (TVar (ResponseStatus Contents)))
forall (m :: * -> *) r.
ResourceMap m =>
IO (ConcurrentResourceMap m r)
newResourceMap
IORef ByteString
buf <- ByteString -> IO (IORef ByteString)
forall a. a -> IO (IORef a)
newIORef ByteString
B.empty
let
withRequest :: Key
-> (Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents)))
-> IO ()
withRequest Key
i Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
f = ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
-> Key ConnStateMap
-> (TVar (ResponseStatus Contents) -> IO ())
-> (Maybe (TVar (ResponseStatus Contents)) -> IO ())
-> IO ()
forall (m :: * -> *) r a.
ResourceMap m =>
ConcurrentResourceMap m r
-> Key m -> (r -> IO ()) -> (Maybe r -> IO a) -> IO a
withInitialisedResource ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connStates Key
Key ConnStateMap
i (\TVar (ResponseStatus Contents)
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) ((Maybe (TVar (ResponseStatus Contents)) -> IO ()) -> IO ())
-> (Maybe (TVar (ResponseStatus Contents)) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \case
Maybe (TVar (ResponseStatus Contents))
Nothing ->
IO (Maybe (ResponseStatus Contents)) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe (ResponseStatus Contents)) -> IO ())
-> IO (Maybe (ResponseStatus Contents)) -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (Maybe (ResponseStatus Contents))
-> IO (Maybe (ResponseStatus Contents))
forall a. STM a -> IO a
atomically (Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
f Maybe (ResponseStatus Contents)
forall a. Maybe a
Nothing)
Just TVar (ResponseStatus Contents)
reqVar -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (ResponseStatus Contents) -> STM (ResponseStatus Contents)
forall a. TVar a -> STM a
readTVar TVar (ResponseStatus Contents)
reqVar STM (ResponseStatus Contents)
-> (ResponseStatus Contents -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ResponseStatus Contents
RequestFinished -> STM (Maybe (ResponseStatus Contents)) -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM (Maybe (ResponseStatus Contents)) -> STM ())
-> STM (Maybe (ResponseStatus Contents)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
f Maybe (ResponseStatus Contents)
forall a. Maybe a
Nothing
ResponseStatus Contents
s -> Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
f (ResponseStatus Contents -> Maybe (ResponseStatus Contents)
forall a. a -> Maybe a
Just ResponseStatus Contents
s) STM (Maybe (ResponseStatus Contents))
-> (Maybe (ResponseStatus Contents) -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (ResponseStatus Contents -> STM ())
-> Maybe (ResponseStatus Contents) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TVar (ResponseStatus Contents) -> ResponseStatus Contents -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (ResponseStatus Contents)
reqVar)
runGetThrow :: Get a -> IO a
runGetThrow :: Get a -> IO a
runGetThrow Get a
g = IORef ByteString -> Socket -> Get a -> IO (Either FilePath a)
forall a.
IORef ByteString -> Socket -> Get a -> IO (Either FilePath a)
runGetRecv IORef ByteString
buf Socket
sock Get a
g
IO (Either FilePath a) -> (Either FilePath a -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (FilePath -> IO a) -> (a -> IO a) -> Either FilePath a -> IO a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (FranzException -> IO a
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO a)
-> (FilePath -> FranzException) -> FilePath -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> FranzException
ClientError) a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
ThreadId
connThread <- (IO () -> (Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO () -> IO ThreadId
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally ((SomeException -> IO ())
-> (() -> IO ()) -> Either SomeException () -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure) (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Get ResponseHeader -> IO ResponseHeader
forall a. Get a -> IO a
runGetThrow Get ResponseHeader
forall t. Serialize t => Get t
get IO ResponseHeader -> (ResponseHeader -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Response Key
i -> do
Contents
resp <- Get Contents -> IO Contents
forall a. Get a -> IO a
runGetThrow Get Contents
getResponse
Key
-> (Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents)))
-> IO ()
withRequest Key
i ((Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents)))
-> IO ())
-> ((ResponseStatus Contents -> STM (ResponseStatus Contents))
-> Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents)))
-> (ResponseStatus Contents -> STM (ResponseStatus Contents))
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ResponseStatus Contents -> STM (ResponseStatus Contents))
-> Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse ((ResponseStatus Contents -> STM (ResponseStatus Contents))
-> IO ())
-> (ResponseStatus Contents -> STM (ResponseStatus Contents))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \case
ResponseStatus Contents
WaitingInstant -> ResponseStatus Contents -> STM (ResponseStatus Contents)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Contents -> ResponseStatus Contents
forall a. a -> ResponseStatus a
Available Contents
resp)
ResponseStatus Contents
WaitingDelayed -> ResponseStatus Contents -> STM (ResponseStatus Contents)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Contents -> ResponseStatus Contents
forall a. a -> ResponseStatus a
Available Contents
resp)
ResponseStatus Contents
_ -> FranzException -> STM (ResponseStatus Contents)
forall e a. Exception e => e -> STM a
throwSTM (FranzException -> STM (ResponseStatus Contents))
-> FranzException -> STM (ResponseStatus Contents)
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
ClientError (FilePath -> FranzException) -> FilePath -> FranzException
forall a b. (a -> b) -> a -> b
$ FilePath
"Unexpected state on ResponseInstant " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Key -> FilePath
forall a. Show a => a -> FilePath
show Key
i
ResponseWait Key
i -> Key
-> (Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents)))
-> IO ()
withRequest Key
i ((Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents)))
-> IO ())
-> ((ResponseStatus Contents -> STM (ResponseStatus Contents))
-> Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents)))
-> (ResponseStatus Contents -> STM (ResponseStatus Contents))
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ResponseStatus Contents -> STM (ResponseStatus Contents))
-> Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse ((ResponseStatus Contents -> STM (ResponseStatus Contents))
-> IO ())
-> (ResponseStatus Contents -> STM (ResponseStatus Contents))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \case
ResponseStatus Contents
WaitingInstant -> ResponseStatus Contents -> STM (ResponseStatus Contents)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ResponseStatus Contents
forall a. ResponseStatus a
WaitingDelayed
ResponseStatus Contents
_ -> FranzException -> STM (ResponseStatus Contents)
forall e a. Exception e => e -> STM a
throwSTM (FranzException -> STM (ResponseStatus Contents))
-> FranzException -> STM (ResponseStatus Contents)
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
ClientError (FilePath -> FranzException) -> FilePath -> FranzException
forall a b. (a -> b) -> a -> b
$ FilePath
"Unexpected state on ResponseWait " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Key -> FilePath
forall a. Show a => a -> FilePath
show Key
i
ResponseError Key
i FranzException
e -> Key
-> (Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents)))
-> IO ()
withRequest Key
i ((Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents)))
-> IO ())
-> (Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents)))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \case
Maybe (ResponseStatus Contents)
Nothing -> FranzException -> STM (Maybe (ResponseStatus Contents))
forall e a. Exception e => e -> STM a
throwSTM FranzException
e
Just{} -> Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents)))
-> Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
forall a b. (a -> b) -> a -> b
$ ResponseStatus Contents -> Maybe (ResponseStatus Contents)
forall a. a -> Maybe a
Just (FranzException -> ResponseStatus Contents
forall a. FranzException -> ResponseStatus a
Errored FranzException
e)
Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection :: MVar Socket
-> Counter
-> ConcurrentResourceMap
ConnStateMap (TVar (ResponseStatus Contents))
-> ThreadId
-> Connection
Connection{ThreadId
MVar Socket
ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
Counter
connThread :: ThreadId
connStates :: ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connReqId :: Counter
connSocket :: MVar Socket
connThread :: ThreadId
connStates :: ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connReqId :: Counter
connSocket :: MVar Socket
..}
connect (LocalFranzPath FilePath
path) = do
Bool
isLive <- FilePath -> IO Bool
doesDirectoryExist FilePath
path
FranzReader
connReader <- IO FranzReader
newFranzReader
(FilePath
connDir, Maybe ProcessHandle
connFuse) <- if Bool
isLive
then (FilePath, Maybe ProcessHandle)
-> IO (FilePath, Maybe ProcessHandle)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FilePath
path, Maybe ProcessHandle
forall a. Maybe a
Nothing)
else do
FilePath
tmpDir <- IO FilePath
getCanonicalTemporaryDirectory
let tmpDir' :: FilePath
tmpDir' = FilePath
tmpDir FilePath -> ShowS
</> FilePath
"franz"
Bool -> FilePath -> IO ()
createDirectoryIfMissing Bool
True FilePath
tmpDir'
FilePath
dir <- FilePath -> FilePath -> IO FilePath
createTempDirectory FilePath
tmpDir' (ShowS
takeBaseName FilePath
path)
ProcessHandle
fuse <- ([FilePath] -> IO ())
-> (forall x. FilePath -> IO x)
-> FilePath
-> FilePath
-> IO ProcessHandle
mountFuse [FilePath] -> IO ()
forall a. Monoid a => a
mempty (FranzException -> IO x
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO x)
-> (FilePath -> FranzException) -> FilePath -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> FranzException
InternalError) FilePath
path FilePath
dir
(FilePath, Maybe ProcessHandle)
-> IO (FilePath, Maybe ProcessHandle)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FilePath
dir, ProcessHandle -> Maybe ProcessHandle
forall a. a -> Maybe a
Just ProcessHandle
fuse)
Connection -> IO Connection
forall (f :: * -> *) a. Applicative f => a -> f a
pure LocalConnection :: FilePath -> FranzReader -> Maybe ProcessHandle -> Connection
LocalConnection{FilePath
Maybe ProcessHandle
FranzReader
connFuse :: Maybe ProcessHandle
connDir :: FilePath
connReader :: FranzReader
connFuse :: Maybe ProcessHandle
connReader :: FranzReader
connDir :: FilePath
..}
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect Connection{ThreadId
MVar Socket
ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
Counter
connThread :: ThreadId
connStates :: ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connReqId :: Counter
connSocket :: MVar Socket
connThread :: Connection -> ThreadId
connStates :: Connection
-> ConcurrentResourceMap
ConnStateMap (TVar (ResponseStatus Contents))
connReqId :: Connection -> Counter
connSocket :: Connection -> MVar Socket
..} = do
ThreadId -> IO ()
killThread ThreadId
connThread
MVar Socket -> (Socket -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Socket
connSocket Socket -> IO ()
S.close
disconnect LocalConnection{FilePath
Maybe ProcessHandle
FranzReader
connFuse :: Maybe ProcessHandle
connReader :: FranzReader
connDir :: FilePath
connFuse :: Connection -> Maybe ProcessHandle
connReader :: Connection -> FranzReader
connDir :: Connection -> FilePath
..} =
FranzReader -> IO ()
closeFranzReader FranzReader
connReader
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` (ProcessHandle -> IO ()) -> Maybe ProcessHandle -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\ProcessHandle
p -> ([FilePath] -> IO ()) -> ProcessHandle -> FilePath -> IO ()
killFuse [FilePath] -> IO ()
forall a. Monoid a => a
mempty ProcessHandle
p FilePath
connDir) Maybe ProcessHandle
connFuse
defQuery :: StreamName -> Query
defQuery :: StreamName -> Query
defQuery StreamName
name = Query :: StreamName -> ItemRef -> ItemRef -> RequestType -> Query
Query
{ reqStream :: StreamName
reqStream = StreamName
name
, reqFrom :: ItemRef
reqFrom = Key -> ItemRef
BySeqNum Key
0
, reqTo :: ItemRef
reqTo = Key -> ItemRef
BySeqNum Key
0
, reqType :: RequestType
reqType = RequestType
AllItems
}
type Response = Either Contents (STM Contents)
awaitResponse :: STM (Either a (STM a)) -> STM a
awaitResponse :: STM (Either a (STM a)) -> STM a
awaitResponse = (STM (Either a (STM a)) -> (Either a (STM a) -> STM a) -> STM a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=(a -> STM a) -> (STM a -> STM a) -> Either a (STM a) -> STM a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure STM a -> STM a
forall a. a -> a
id)
fetch
:: Connection
-> Query
-> (STM Response -> IO r)
-> IO r
fetch :: Connection -> Query -> (STM Response -> IO r) -> IO r
fetch Connection{ThreadId
MVar Socket
ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
Counter
connThread :: ThreadId
connStates :: ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connReqId :: Counter
connSocket :: MVar Socket
connThread :: Connection -> ThreadId
connStates :: Connection
-> ConcurrentResourceMap
ConnStateMap (TVar (ResponseStatus Contents))
connReqId :: Connection -> Counter
connSocket :: Connection -> MVar Socket
..} Query
req STM Response -> IO r
cont = do
Key
reqId <- Counter -> Key -> IO Key
atomicAddCounter Counter
connReqId Key
1
let
cleanupRequest :: TVar (ResponseStatus Contents) -> IO ()
cleanupRequest TVar (ResponseStatus Contents)
reqVar = do
let inFlight :: ResponseStatus a -> Bool
inFlight ResponseStatus a
WaitingInstant = Bool
True
inFlight ResponseStatus a
WaitingDelayed = Bool
True
inFlight ResponseStatus a
_ = Bool
False
Bool
requestInFlight <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$
TVar (ResponseStatus Contents)
-> (ResponseStatus Contents -> (Bool, ResponseStatus Contents))
-> STM Bool
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar (ResponseStatus Contents)
reqVar ((ResponseStatus Contents -> (Bool, ResponseStatus Contents))
-> STM Bool)
-> (ResponseStatus Contents -> (Bool, ResponseStatus Contents))
-> STM Bool
forall a b. (a -> b) -> a -> b
$ ResponseStatus Contents -> Bool
forall a. ResponseStatus a -> Bool
inFlight (ResponseStatus Contents -> Bool)
-> (ResponseStatus Contents -> ResponseStatus Contents)
-> ResponseStatus Contents
-> (Bool, ResponseStatus Contents)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& ResponseStatus Contents
-> ResponseStatus Contents -> ResponseStatus Contents
forall a b. a -> b -> a
const ResponseStatus Contents
forall a. ResponseStatus a
RequestFinished
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
requestInFlight (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Socket -> (Socket -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Socket
connSocket ((Socket -> IO ()) -> IO ()) -> (Socket -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Socket
sock ->
Socket -> ByteString -> IO ()
SB.sendAll Socket
sock (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ RawRequest -> ByteString
forall a. Serialize a => a -> ByteString
encode (RawRequest -> ByteString) -> RawRequest -> ByteString
forall a b. (a -> b) -> a -> b
$ Key -> RawRequest
RawClean Key
reqId
ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
-> Key ConnStateMap
-> IO (TVar (ResponseStatus Contents))
-> (TVar (ResponseStatus Contents) -> IO ())
-> (TVar (ResponseStatus Contents) -> IO r)
-> IO r
forall (m :: * -> *) r a.
ResourceMap m =>
ConcurrentResourceMap m r
-> Key m -> IO r -> (r -> IO ()) -> (r -> IO a) -> IO a
withSharedResource ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connStates Key
Key ConnStateMap
reqId
(ResponseStatus Contents -> IO (TVar (ResponseStatus Contents))
forall a. a -> IO (TVar a)
newTVarIO ResponseStatus Contents
forall a. ResponseStatus a
WaitingInstant)
TVar (ResponseStatus Contents) -> IO ()
cleanupRequest ((TVar (ResponseStatus Contents) -> IO r) -> IO r)
-> (TVar (ResponseStatus Contents) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \TVar (ResponseStatus Contents)
reqVar -> do
MVar Socket -> (Socket -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Socket
connSocket ((Socket -> IO ()) -> IO ()) -> (Socket -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Socket
sock -> Socket -> ByteString -> IO ()
SB.sendAll Socket
sock (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ RawRequest -> ByteString
forall a. Serialize a => a -> ByteString
encode
(RawRequest -> ByteString) -> RawRequest -> ByteString
forall a b. (a -> b) -> a -> b
$ Key -> Query -> RawRequest
RawRequest Key
reqId Query
req
let
getDelayed :: STM Contents
getDelayed = TVar (ResponseStatus Contents) -> STM (ResponseStatus Contents)
forall a. TVar a -> STM a
readTVar TVar (ResponseStatus Contents)
reqVar STM (ResponseStatus Contents)
-> (ResponseStatus Contents -> STM Contents) -> STM Contents
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ResponseStatus Contents
RequestFinished -> FranzException -> STM Contents
forall e a. Exception e => e -> STM a
throwSTM FranzException
requestFinished
ResponseStatus Contents
WaitingDelayed -> STM Contents
forall a. STM a
retry
Available Contents
xs -> Contents -> STM Contents
forall (m :: * -> *) a. Monad m => a -> m a
return Contents
xs
Errored FranzException
e -> FranzException -> STM Contents
forall e a. Exception e => e -> STM a
throwSTM FranzException
e
ResponseStatus Contents
WaitingInstant -> FranzException -> STM Contents
forall e a. Exception e => e -> STM a
throwSTM (FranzException -> STM Contents) -> FranzException -> STM Contents
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
ClientError
FilePath
"fetch/WaitingDelayed: unexpected state WaitingInstant"
STM Response -> IO r
cont (STM Response -> IO r) -> STM Response -> IO r
forall a b. (a -> b) -> a -> b
$ TVar (ResponseStatus Contents) -> STM (ResponseStatus Contents)
forall a. TVar a -> STM a
readTVar TVar (ResponseStatus Contents)
reqVar STM (ResponseStatus Contents)
-> (ResponseStatus Contents -> STM Response) -> STM Response
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ResponseStatus Contents
RequestFinished -> FranzException -> STM Response
forall e a. Exception e => e -> STM a
throwSTM FranzException
requestFinished
Errored FranzException
e -> FranzException -> STM Response
forall e a. Exception e => e -> STM a
throwSTM FranzException
e
ResponseStatus Contents
WaitingInstant -> STM Response
forall a. STM a
retry
Available Contents
xs -> Response -> STM Response
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Response -> STM Response) -> Response -> STM Response
forall a b. (a -> b) -> a -> b
$ Contents -> Response
forall a b. a -> Either a b
Left Contents
xs
ResponseStatus Contents
WaitingDelayed -> Response -> STM Response
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Response -> STM Response) -> Response -> STM Response
forall a b. (a -> b) -> a -> b
$ STM Contents -> Response
forall a b. b -> Either a b
Right STM Contents
getDelayed
fetch LocalConnection{FilePath
Maybe ProcessHandle
FranzReader
connFuse :: Maybe ProcessHandle
connReader :: FranzReader
connDir :: FilePath
connFuse :: Connection -> Maybe ProcessHandle
connReader :: Connection -> FranzReader
connDir :: Connection -> FilePath
..} Query
query STM Response -> IO r
cont
= FranzPrefix
-> FranzReader
-> FranzDirectory
-> Query
-> (FranzException -> IO r)
-> (Stream -> STM (Bool, QueryResult) -> IO r)
-> IO r
forall r.
FranzPrefix
-> FranzReader
-> FranzDirectory
-> Query
-> (FranzException -> IO r)
-> (Stream -> STM (Bool, QueryResult) -> IO r)
-> IO r
handleQuery (FilePath -> FranzPrefix
FranzPrefix FilePath
"") FranzReader
connReader (FilePath -> FranzDirectory
FranzDirectory FilePath
connDir) Query
query
(STM Response -> IO r
cont (STM Response -> IO r)
-> (FranzException -> STM Response) -> FranzException -> IO r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FranzException -> STM Response
forall e a. Exception e => e -> STM a
throwSTM)
((Stream -> STM (Bool, QueryResult) -> IO r) -> IO r)
-> (Stream -> STM (Bool, QueryResult) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Stream
stream STM (Bool, QueryResult)
transaction -> STM (Bool, QueryResult) -> IO (Bool, QueryResult)
forall a. STM a -> IO a
atomically STM (Bool, QueryResult)
transaction IO (Bool, QueryResult) -> ((Bool, QueryResult) -> IO r) -> IO r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(Bool
False, QueryResult
_) -> do
TMVar (Either SomeException Contents)
vResp <- IO (TMVar (Either SomeException Contents))
forall a. IO (TMVar a)
newEmptyTMVarIO
ThreadId
tid <- (IO Contents
-> (Either SomeException Contents -> IO ()) -> IO ThreadId)
-> (Either SomeException Contents -> IO ())
-> IO Contents
-> IO ThreadId
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO Contents
-> (Either SomeException Contents -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either SomeException Contents -> STM ())
-> Either SomeException Contents
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either SomeException Contents)
-> Either SomeException Contents -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException Contents)
vResp) (IO Contents -> IO ThreadId) -> IO Contents -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
QueryResult
result <- STM QueryResult -> IO QueryResult
forall a. STM a -> IO a
atomically (STM QueryResult -> IO QueryResult)
-> STM QueryResult -> IO QueryResult
forall a b. (a -> b) -> a -> b
$ do
(Bool
ready, QueryResult
result) <- STM (Bool, QueryResult)
transaction
Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard Bool
ready
QueryResult -> STM QueryResult
forall (f :: * -> *) a. Applicative f => a -> f a
pure QueryResult
result
Stream -> QueryResult -> IO Contents
readContents Stream
stream QueryResult
result
STM Response -> IO r
cont (Response -> STM Response
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Response -> STM Response) -> Response -> STM Response
forall a b. (a -> b) -> a -> b
$ STM Contents -> Response
forall a b. b -> Either a b
Right (STM Contents -> Response) -> STM Contents -> Response
forall a b. (a -> b) -> a -> b
$ TMVar (Either SomeException Contents)
-> STM (Either SomeException Contents)
forall a. TMVar a -> STM a
takeTMVar TMVar (Either SomeException Contents)
vResp STM (Either SomeException Contents)
-> (Either SomeException Contents -> STM Contents) -> STM Contents
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SomeException -> STM Contents)
-> (Contents -> STM Contents)
-> Either SomeException Contents
-> STM Contents
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> STM Contents
forall e a. Exception e => e -> STM a
throwSTM Contents -> STM Contents
forall (f :: * -> *) a. Applicative f => a -> f a
pure)
IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`finally` ThreadId -> FranzException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tid FranzException
requestFinished
(Bool
True, QueryResult
result) -> do
TMVar (Either SomeException Contents)
vResp <- IO (TMVar (Either SomeException Contents))
forall a. IO (TMVar a)
newEmptyTMVarIO
ThreadId
tid <- IO Contents
-> (Either SomeException Contents -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (Stream -> QueryResult -> IO Contents
readContents Stream
stream QueryResult
result) (STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either SomeException Contents -> STM ())
-> Either SomeException Contents
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either SomeException Contents)
-> Either SomeException Contents -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException Contents)
vResp)
STM Response -> IO r
cont (TMVar (Either SomeException Contents)
-> STM (Either SomeException Contents)
forall a. TMVar a -> STM a
takeTMVar TMVar (Either SomeException Contents)
vResp STM (Either SomeException Contents)
-> (Either SomeException Contents -> STM Response) -> STM Response
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SomeException -> STM Response)
-> (Contents -> STM Response)
-> Either SomeException Contents
-> STM Response
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> STM Response
forall e a. Exception e => e -> STM a
throwSTM (Response -> STM Response
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Response -> STM Response)
-> (Contents -> Response) -> Contents -> STM Response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Contents -> Response
forall a b. a -> Either a b
Left))
IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`finally` ThreadId -> FranzException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tid FranzException
requestFinished
requestFinished :: FranzException
requestFinished :: FranzException
requestFinished = FilePath -> FranzException
ClientError FilePath
"request already finished"
fetchSimple :: Connection
-> Int
-> Query
-> IO (Maybe Contents)
fetchSimple :: Connection -> Key -> Query -> IO (Maybe Contents)
fetchSimple Connection
conn Key
timeout Query
req = Connection
-> Query
-> (STM Response -> IO (Maybe Contents))
-> IO (Maybe Contents)
forall r. Connection -> Query -> (STM Response -> IO r) -> IO r
fetch Connection
conn Query
req
((STM Response -> IO (Maybe Contents)) -> IO (Maybe Contents))
-> (STM Response -> IO (Maybe Contents)) -> IO (Maybe Contents)
forall a b. (a -> b) -> a -> b
$ Key -> STM Contents -> IO (Maybe Contents)
forall a. Key -> STM a -> IO (Maybe a)
atomicallyWithin Key
timeout (STM Contents -> IO (Maybe Contents))
-> (STM Response -> STM Contents)
-> STM Response
-> IO (Maybe Contents)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Response -> STM Contents
forall a. STM (Either a (STM a)) -> STM a
awaitResponse
atomicallyWithin :: Int
-> STM a
-> IO (Maybe a)
atomicallyWithin :: Key -> STM a -> IO (Maybe a)
atomicallyWithin Key
timeout STM a
m = do
Delay
d <- Key -> IO Delay
newDelay Key
timeout
STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (STM (Maybe a) -> IO (Maybe a)) -> STM (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ (a -> Maybe a) -> STM a -> STM (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just STM a
m STM (Maybe a) -> STM (Maybe a) -> STM (Maybe a)
forall a. STM a -> STM a -> STM a
`orElse` (Maybe a
forall a. Maybe a
Nothing Maybe a -> STM () -> STM (Maybe a)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Delay -> STM ()
waitDelay Delay
d)