{-# LANGUAGE LambdaCase, OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE TypeFamilies #-}
module Database.Franz.Client
  ( FranzPath(..)
  , fromFranzPath
  , toFranzPath
  , defaultPort
  , Connection
  , withConnection
  , connect
  , disconnect
  , StreamName(..)
  , Query(..)
  , ItemRef(..)
  , RequestType(..)
  , defQuery
  , Response
  , awaitResponse
  , Contents
  , fetch
  , fetchSimple
  , atomicallyWithin
  , FranzException(..)) where

import Control.Arrow ((&&&))
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.Delay (newDelay, waitDelay)
import Control.Exception
import Control.Monad
import qualified Data.ByteString.Char8 as B
import Data.ConcurrentResourceMap
import Data.IORef
import Data.IORef.Unboxed
import qualified Data.IntMap.Strict as IM
import Data.Serialize hiding (getInt64le)
import Database.Franz.Contents
import Database.Franz.Internal
import Database.Franz.Protocol
import Database.Franz.Reader
import Database.Franz.Server
import Database.Franz.URI
import qualified Network.Socket as S
import qualified Network.Socket.ByteString as SB
import System.Process (ProcessHandle)
import System.Directory
import System.FilePath
import System.IO.Temp

-- The protocol
--
-- Client                     Server
---  | ---- Archive prefix ---> |  Mounts P if possible
---  | <--- apiVersion -------- |
---  | ---- RawRequest i p ---> |
---  | ---- RawRequest j q ---> |
---  | ---- RawRequest k r ---> |
---  | <--- ResponseInstant i - |
---  | <--- result for p -----  |
---  | <--- ResponseWait j ---- |
---  | <--- ResponseWait k ---- |
---  | <--- ResponseDelayed j - |
---  | <--- result for q -----  |
--   | ----  RawClean i ---->   |
--   | ----  RawClean j ---->   |
--   | ----  RawClean k ---->   |

newtype ConnStateMap v = ConnStateMap (IM.IntMap v)

instance ResourceMap ConnStateMap where
  type Key ConnStateMap = Int
  empty :: ConnStateMap v
empty = IntMap v -> ConnStateMap v
forall v. IntMap v -> ConnStateMap v
ConnStateMap IntMap v
forall a. IntMap a
IM.empty
  delete :: Key ConnStateMap -> ConnStateMap v -> ConnStateMap v
delete Key ConnStateMap
k (ConnStateMap IntMap v
m) = IntMap v -> ConnStateMap v
forall v. IntMap v -> ConnStateMap v
ConnStateMap (Key -> IntMap v -> IntMap v
forall a. Key -> IntMap a -> IntMap a
IM.delete Key
Key ConnStateMap
k IntMap v
m)
  insert :: Key ConnStateMap -> v -> ConnStateMap v -> ConnStateMap v
insert Key ConnStateMap
k v
v (ConnStateMap IntMap v
m) = IntMap v -> ConnStateMap v
forall v. IntMap v -> ConnStateMap v
ConnStateMap (Key -> v -> IntMap v -> IntMap v
forall a. Key -> a -> IntMap a -> IntMap a
IM.insert Key
Key ConnStateMap
k v
v IntMap v
m)
  lookup :: Key ConnStateMap -> ConnStateMap v -> Maybe v
lookup Key ConnStateMap
k (ConnStateMap IntMap v
m) = Key -> IntMap v -> Maybe v
forall a. Key -> IntMap a -> Maybe a
IM.lookup Key
Key ConnStateMap
k IntMap v
m

data Connection = Connection
  { Connection -> MVar Socket
connSocket :: MVar S.Socket
  , Connection -> Counter
connReqId :: !Counter
  , Connection
-> ConcurrentResourceMap
     ConnStateMap (TVar (ResponseStatus Contents))
connStates :: !(ConcurrentResourceMap
                     ConnStateMap
                     (TVar (ResponseStatus Contents)))
  , Connection -> ThreadId
connThread :: !ThreadId
  }
  | LocalConnection
  { Connection -> FilePath
connDir :: FilePath
  , Connection -> FranzReader
connReader :: FranzReader
  , Connection -> Maybe ProcessHandle
connFuse :: Maybe ProcessHandle
  }

data ResponseStatus a = WaitingInstant
    | WaitingDelayed
    | Errored !FranzException
    | Available !a
    -- | The user cancelled the request.
    | RequestFinished
    deriving (Key -> ResponseStatus a -> ShowS
[ResponseStatus a] -> ShowS
ResponseStatus a -> FilePath
(Key -> ResponseStatus a -> ShowS)
-> (ResponseStatus a -> FilePath)
-> ([ResponseStatus a] -> ShowS)
-> Show (ResponseStatus a)
forall a. Show a => Key -> ResponseStatus a -> ShowS
forall a. Show a => [ResponseStatus a] -> ShowS
forall a. Show a => ResponseStatus a -> FilePath
forall a.
(Key -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ResponseStatus a] -> ShowS
$cshowList :: forall a. Show a => [ResponseStatus a] -> ShowS
show :: ResponseStatus a -> FilePath
$cshow :: forall a. Show a => ResponseStatus a -> FilePath
showsPrec :: Key -> ResponseStatus a -> ShowS
$cshowsPrec :: forall a. Show a => Key -> ResponseStatus a -> ShowS
Show, a -> ResponseStatus b -> ResponseStatus a
(a -> b) -> ResponseStatus a -> ResponseStatus b
(forall a b. (a -> b) -> ResponseStatus a -> ResponseStatus b)
-> (forall a b. a -> ResponseStatus b -> ResponseStatus a)
-> Functor ResponseStatus
forall a b. a -> ResponseStatus b -> ResponseStatus a
forall a b. (a -> b) -> ResponseStatus a -> ResponseStatus b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> ResponseStatus b -> ResponseStatus a
$c<$ :: forall a b. a -> ResponseStatus b -> ResponseStatus a
fmap :: (a -> b) -> ResponseStatus a -> ResponseStatus b
$cfmap :: forall a b. (a -> b) -> ResponseStatus a -> ResponseStatus b
Functor)

withConnection :: FranzPath -> (Connection -> IO r) -> IO r
withConnection :: FranzPath -> (Connection -> IO r) -> IO r
withConnection FranzPath
path = IO Connection
-> (Connection -> IO ()) -> (Connection -> IO r) -> IO r
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (FranzPath -> IO Connection
connect FranzPath
path) Connection -> IO ()
disconnect

connect :: FranzPath -> IO Connection
connect :: FranzPath -> IO Connection
connect (FranzPath FilePath
host PortNumber
port FilePath
dir) = do
  let hints :: AddrInfo
hints = AddrInfo
S.defaultHints { addrFlags :: [AddrInfoFlag]
S.addrFlags = [AddrInfoFlag
S.AI_NUMERICSERV], addrSocketType :: SocketType
S.addrSocketType = SocketType
S.Stream }
  AddrInfo
addr:[AddrInfo]
_ <- Maybe AddrInfo -> Maybe FilePath -> Maybe FilePath -> IO [AddrInfo]
S.getAddrInfo (AddrInfo -> Maybe AddrInfo
forall a. a -> Maybe a
Just AddrInfo
hints) (FilePath -> Maybe FilePath
forall a. a -> Maybe a
Just FilePath
host) (FilePath -> Maybe FilePath
forall a. a -> Maybe a
Just (FilePath -> Maybe FilePath) -> FilePath -> Maybe FilePath
forall a b. (a -> b) -> a -> b
$ PortNumber -> FilePath
forall a. Show a => a -> FilePath
show PortNumber
port)
  Socket
sock <- Family -> SocketType -> ProtocolNumber -> IO Socket
S.socket (AddrInfo -> Family
S.addrFamily AddrInfo
addr) SocketType
S.Stream (AddrInfo -> ProtocolNumber
S.addrProtocol AddrInfo
addr)
  Socket -> SocketOption -> Key -> IO ()
S.setSocketOption Socket
sock SocketOption
S.NoDelay Key
1
  Socket -> SockAddr -> IO ()
S.connect Socket
sock (SockAddr -> IO ()) -> SockAddr -> IO ()
forall a b. (a -> b) -> a -> b
$ AddrInfo -> SockAddr
S.addrAddress AddrInfo
addr
  Socket -> ByteString -> IO ()
SB.sendAll Socket
sock (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> ByteString
forall a. Serialize a => a -> ByteString
encode FilePath
dir
  ByteString
readyMsg <- Socket -> Key -> IO ByteString
SB.recv Socket
sock Key
4096
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString
readyMsg ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
apiVersion) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ case ByteString -> Either FilePath ResponseHeader
forall a. Serialize a => ByteString -> Either FilePath a
decode ByteString
readyMsg of
    Right (ResponseError Key
_ FranzException
e) -> FranzException -> IO ()
forall e a. Exception e => e -> IO a
throwIO FranzException
e
    Either FilePath ResponseHeader
e -> FranzException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO ()) -> FranzException -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
ClientError (FilePath -> FranzException) -> FilePath -> FranzException
forall a b. (a -> b) -> a -> b
$ FilePath
"Database.Franz.Network.connect: Unexpected response: " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Either FilePath ResponseHeader -> FilePath
forall a. Show a => a -> FilePath
show Either FilePath ResponseHeader
e

  MVar Socket
connSocket <- Socket -> IO (MVar Socket)
forall a. a -> IO (MVar a)
newMVar Socket
sock
  Counter
connReqId <- Key -> IO Counter
newCounter Key
0
  ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connStates <- IO
  (ConcurrentResourceMap
     ConnStateMap (TVar (ResponseStatus Contents)))
forall (m :: * -> *) r.
ResourceMap m =>
IO (ConcurrentResourceMap m r)
newResourceMap
  IORef ByteString
buf <- ByteString -> IO (IORef ByteString)
forall a. a -> IO (IORef a)
newIORef ByteString
B.empty
  let -- Get a reference to shared state for the request if it exists.
      withRequest :: Key
-> (Maybe (ResponseStatus Contents)
    -> STM (Maybe (ResponseStatus Contents)))
-> IO ()
withRequest Key
i Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
f = ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
-> Key ConnStateMap
-> (TVar (ResponseStatus Contents) -> IO ())
-> (Maybe (TVar (ResponseStatus Contents)) -> IO ())
-> IO ()
forall (m :: * -> *) r a.
ResourceMap m =>
ConcurrentResourceMap m r
-> Key m -> (r -> IO ()) -> (Maybe r -> IO a) -> IO a
withInitialisedResource ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connStates Key
Key ConnStateMap
i (\TVar (ResponseStatus Contents)
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) ((Maybe (TVar (ResponseStatus Contents)) -> IO ()) -> IO ())
-> (Maybe (TVar (ResponseStatus Contents)) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \case
        Maybe (TVar (ResponseStatus Contents))
Nothing ->
          -- If it throws an exception on no value, great, it will
          -- float out here. If it returns a value, it'll just be
          -- ignore as we can't do anything with it anyway.
          IO (Maybe (ResponseStatus Contents)) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe (ResponseStatus Contents)) -> IO ())
-> IO (Maybe (ResponseStatus Contents)) -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (Maybe (ResponseStatus Contents))
-> IO (Maybe (ResponseStatus Contents))
forall a. STM a -> IO a
atomically (Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
f Maybe (ResponseStatus Contents)
forall a. Maybe a
Nothing)
        Just TVar (ResponseStatus Contents)
reqVar -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (ResponseStatus Contents) -> STM (ResponseStatus Contents)
forall a. TVar a -> STM a
readTVar TVar (ResponseStatus Contents)
reqVar STM (ResponseStatus Contents)
-> (ResponseStatus Contents -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          -- If request is finished, do nothing to the content.
          ResponseStatus Contents
RequestFinished -> STM (Maybe (ResponseStatus Contents)) -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM (Maybe (ResponseStatus Contents)) -> STM ())
-> STM (Maybe (ResponseStatus Contents)) -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
f Maybe (ResponseStatus Contents)
forall a. Maybe a
Nothing
          ResponseStatus Contents
s -> Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
f (ResponseStatus Contents -> Maybe (ResponseStatus Contents)
forall a. a -> Maybe a
Just ResponseStatus Contents
s) STM (Maybe (ResponseStatus Contents))
-> (Maybe (ResponseStatus Contents) -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (ResponseStatus Contents -> STM ())
-> Maybe (ResponseStatus Contents) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TVar (ResponseStatus Contents) -> ResponseStatus Contents -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (ResponseStatus Contents)
reqVar)

      runGetThrow :: Get a -> IO a
      runGetThrow :: Get a -> IO a
runGetThrow Get a
g = IORef ByteString -> Socket -> Get a -> IO (Either FilePath a)
forall a.
IORef ByteString -> Socket -> Get a -> IO (Either FilePath a)
runGetRecv IORef ByteString
buf Socket
sock Get a
g
        IO (Either FilePath a) -> (Either FilePath a -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (FilePath -> IO a) -> (a -> IO a) -> Either FilePath a -> IO a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (FranzException -> IO a
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO a)
-> (FilePath -> FranzException) -> FilePath -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> FranzException
ClientError) a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure

  ThreadId
connThread <- (IO () -> (Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO () -> IO ThreadId
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally ((SomeException -> IO ())
-> (() -> IO ()) -> Either SomeException () -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure) (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Get ResponseHeader -> IO ResponseHeader
forall a. Get a -> IO a
runGetThrow Get ResponseHeader
forall t. Serialize t => Get t
get IO ResponseHeader -> (ResponseHeader -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Response Key
i -> do
      Contents
resp <- Get Contents -> IO Contents
forall a. Get a -> IO a
runGetThrow Get Contents
getResponse
      Key
-> (Maybe (ResponseStatus Contents)
    -> STM (Maybe (ResponseStatus Contents)))
-> IO ()
withRequest Key
i ((Maybe (ResponseStatus Contents)
  -> STM (Maybe (ResponseStatus Contents)))
 -> IO ())
-> ((ResponseStatus Contents -> STM (ResponseStatus Contents))
    -> Maybe (ResponseStatus Contents)
    -> STM (Maybe (ResponseStatus Contents)))
-> (ResponseStatus Contents -> STM (ResponseStatus Contents))
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ResponseStatus Contents -> STM (ResponseStatus Contents))
-> Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse ((ResponseStatus Contents -> STM (ResponseStatus Contents))
 -> IO ())
-> (ResponseStatus Contents -> STM (ResponseStatus Contents))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \case
        ResponseStatus Contents
WaitingInstant -> ResponseStatus Contents -> STM (ResponseStatus Contents)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Contents -> ResponseStatus Contents
forall a. a -> ResponseStatus a
Available Contents
resp)
        ResponseStatus Contents
WaitingDelayed -> ResponseStatus Contents -> STM (ResponseStatus Contents)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Contents -> ResponseStatus Contents
forall a. a -> ResponseStatus a
Available Contents
resp)
        ResponseStatus Contents
_ -> FranzException -> STM (ResponseStatus Contents)
forall e a. Exception e => e -> STM a
throwSTM (FranzException -> STM (ResponseStatus Contents))
-> FranzException -> STM (ResponseStatus Contents)
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
ClientError (FilePath -> FranzException) -> FilePath -> FranzException
forall a b. (a -> b) -> a -> b
$ FilePath
"Unexpected state on ResponseInstant " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Key -> FilePath
forall a. Show a => a -> FilePath
show Key
i
    ResponseWait Key
i -> Key
-> (Maybe (ResponseStatus Contents)
    -> STM (Maybe (ResponseStatus Contents)))
-> IO ()
withRequest Key
i ((Maybe (ResponseStatus Contents)
  -> STM (Maybe (ResponseStatus Contents)))
 -> IO ())
-> ((ResponseStatus Contents -> STM (ResponseStatus Contents))
    -> Maybe (ResponseStatus Contents)
    -> STM (Maybe (ResponseStatus Contents)))
-> (ResponseStatus Contents -> STM (ResponseStatus Contents))
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ResponseStatus Contents -> STM (ResponseStatus Contents))
-> Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse ((ResponseStatus Contents -> STM (ResponseStatus Contents))
 -> IO ())
-> (ResponseStatus Contents -> STM (ResponseStatus Contents))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \case
      ResponseStatus Contents
WaitingInstant -> ResponseStatus Contents -> STM (ResponseStatus Contents)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ResponseStatus Contents
forall a. ResponseStatus a
WaitingDelayed
      ResponseStatus Contents
_ -> FranzException -> STM (ResponseStatus Contents)
forall e a. Exception e => e -> STM a
throwSTM (FranzException -> STM (ResponseStatus Contents))
-> FranzException -> STM (ResponseStatus Contents)
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
ClientError (FilePath -> FranzException) -> FilePath -> FranzException
forall a b. (a -> b) -> a -> b
$ FilePath
"Unexpected state on ResponseWait " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Key -> FilePath
forall a. Show a => a -> FilePath
show Key
i
    ResponseError Key
i FranzException
e -> Key
-> (Maybe (ResponseStatus Contents)
    -> STM (Maybe (ResponseStatus Contents)))
-> IO ()
withRequest Key
i ((Maybe (ResponseStatus Contents)
  -> STM (Maybe (ResponseStatus Contents)))
 -> IO ())
-> (Maybe (ResponseStatus Contents)
    -> STM (Maybe (ResponseStatus Contents)))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \case
      Maybe (ResponseStatus Contents)
Nothing -> FranzException -> STM (Maybe (ResponseStatus Contents))
forall e a. Exception e => e -> STM a
throwSTM FranzException
e
      Just{} -> Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (ResponseStatus Contents)
 -> STM (Maybe (ResponseStatus Contents)))
-> Maybe (ResponseStatus Contents)
-> STM (Maybe (ResponseStatus Contents))
forall a b. (a -> b) -> a -> b
$ ResponseStatus Contents -> Maybe (ResponseStatus Contents)
forall a. a -> Maybe a
Just (FranzException -> ResponseStatus Contents
forall a. FranzException -> ResponseStatus a
Errored FranzException
e)
  Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection :: MVar Socket
-> Counter
-> ConcurrentResourceMap
     ConnStateMap (TVar (ResponseStatus Contents))
-> ThreadId
-> Connection
Connection{ThreadId
MVar Socket
ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
Counter
connThread :: ThreadId
connStates :: ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connReqId :: Counter
connSocket :: MVar Socket
connThread :: ThreadId
connStates :: ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connReqId :: Counter
connSocket :: MVar Socket
..}
connect (LocalFranzPath FilePath
path) = do
  Bool
isLive <- FilePath -> IO Bool
doesDirectoryExist FilePath
path
  FranzReader
connReader <- IO FranzReader
newFranzReader
  (FilePath
connDir, Maybe ProcessHandle
connFuse) <- if Bool
isLive
    then (FilePath, Maybe ProcessHandle)
-> IO (FilePath, Maybe ProcessHandle)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FilePath
path, Maybe ProcessHandle
forall a. Maybe a
Nothing)
    else do
      FilePath
tmpDir <- IO FilePath
getCanonicalTemporaryDirectory
      let tmpDir' :: FilePath
tmpDir' = FilePath
tmpDir FilePath -> ShowS
</> FilePath
"franz"
      Bool -> FilePath -> IO ()
createDirectoryIfMissing Bool
True FilePath
tmpDir'
      FilePath
dir <- FilePath -> FilePath -> IO FilePath
createTempDirectory FilePath
tmpDir' (ShowS
takeBaseName FilePath
path)
      ProcessHandle
fuse <- FilePath -> FilePath -> IO ProcessHandle
mountFuse FilePath
path FilePath
dir
      (FilePath, Maybe ProcessHandle)
-> IO (FilePath, Maybe ProcessHandle)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FilePath
dir, ProcessHandle -> Maybe ProcessHandle
forall a. a -> Maybe a
Just ProcessHandle
fuse)
  Connection -> IO Connection
forall (f :: * -> *) a. Applicative f => a -> f a
pure LocalConnection :: FilePath -> FranzReader -> Maybe ProcessHandle -> Connection
LocalConnection{FilePath
Maybe ProcessHandle
FranzReader
connFuse :: Maybe ProcessHandle
connDir :: FilePath
connReader :: FranzReader
connFuse :: Maybe ProcessHandle
connReader :: FranzReader
connDir :: FilePath
..}

disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect Connection{ThreadId
MVar Socket
ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
Counter
connThread :: ThreadId
connStates :: ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connReqId :: Counter
connSocket :: MVar Socket
connThread :: Connection -> ThreadId
connStates :: Connection
-> ConcurrentResourceMap
     ConnStateMap (TVar (ResponseStatus Contents))
connReqId :: Connection -> Counter
connSocket :: Connection -> MVar Socket
..} = do
  ThreadId -> IO ()
killThread ThreadId
connThread
  MVar Socket -> (Socket -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Socket
connSocket Socket -> IO ()
S.close
disconnect LocalConnection{FilePath
Maybe ProcessHandle
FranzReader
connFuse :: Maybe ProcessHandle
connReader :: FranzReader
connDir :: FilePath
connFuse :: Connection -> Maybe ProcessHandle
connReader :: Connection -> FranzReader
connDir :: Connection -> FilePath
..} =
  FranzReader -> IO ()
closeFranzReader FranzReader
connReader
  IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` (ProcessHandle -> IO ()) -> Maybe ProcessHandle -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\ProcessHandle
p -> ProcessHandle -> FilePath -> IO ()
killFuse ProcessHandle
p FilePath
connDir) Maybe ProcessHandle
connFuse

defQuery :: StreamName -> Query
defQuery :: StreamName -> Query
defQuery StreamName
name = Query :: StreamName -> ItemRef -> ItemRef -> RequestType -> Query
Query
  { reqStream :: StreamName
reqStream = StreamName
name
  , reqFrom :: ItemRef
reqFrom = Key -> ItemRef
BySeqNum Key
0
  , reqTo :: ItemRef
reqTo = Key -> ItemRef
BySeqNum Key
0
  , reqType :: RequestType
reqType = RequestType
AllItems
  }

-- | When it is 'Right', it might block until the content arrives.
type Response = Either Contents (STM Contents)

awaitResponse :: STM (Either a (STM a)) -> STM a
awaitResponse :: STM (Either a (STM a)) -> STM a
awaitResponse = (STM (Either a (STM a)) -> (Either a (STM a) -> STM a) -> STM a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=(a -> STM a) -> (STM a -> STM a) -> Either a (STM a) -> STM a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure STM a -> STM a
forall a. a -> a
id)

-- | Fetch requested data from the server.
--
-- Termination of 'fetch' continuation cancels the request, allowing
-- flexible control of its lifetime.
fetch
  :: Connection
  -> Query
  -> (STM Response -> IO r)
  -- ^ Wait for the response in a blocking manner. You should only run
  -- the continuation inside a 'fetch' block: leaking the STM action
  -- and running it outside will result in a 'ClientError' exception.
  -> IO r
fetch :: Connection -> Query -> (STM Response -> IO r) -> IO r
fetch Connection{ThreadId
MVar Socket
ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
Counter
connThread :: ThreadId
connStates :: ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connReqId :: Counter
connSocket :: MVar Socket
connThread :: Connection -> ThreadId
connStates :: Connection
-> ConcurrentResourceMap
     ConnStateMap (TVar (ResponseStatus Contents))
connReqId :: Connection -> Counter
connSocket :: Connection -> MVar Socket
..} Query
req STM Response -> IO r
cont = do
  Key
reqId <- Counter -> Key -> IO Key
atomicAddCounter Counter
connReqId Key
1

  let -- When we exit the scope of the request, ensure that we cancel any
      -- outstanding request and set the appropriate state, lest the user
      -- leaks the resource and tries to re-run the provided action.
      cleanupRequest :: TVar (ResponseStatus Contents) -> IO ()
cleanupRequest TVar (ResponseStatus Contents)
reqVar = do
        let inFlight :: ResponseStatus a -> Bool
inFlight ResponseStatus a
WaitingInstant = Bool
True
            inFlight ResponseStatus a
WaitingDelayed = Bool
True
            inFlight ResponseStatus a
_ = Bool
False
        -- Check set the internal state to RequestFinished while
        -- noting if there's possibly a request still in flight.
        Bool
requestInFlight <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$
          TVar (ResponseStatus Contents)
-> (ResponseStatus Contents -> (Bool, ResponseStatus Contents))
-> STM Bool
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar (ResponseStatus Contents)
reqVar ((ResponseStatus Contents -> (Bool, ResponseStatus Contents))
 -> STM Bool)
-> (ResponseStatus Contents -> (Bool, ResponseStatus Contents))
-> STM Bool
forall a b. (a -> b) -> a -> b
$ ResponseStatus Contents -> Bool
forall a. ResponseStatus a -> Bool
inFlight (ResponseStatus Contents -> Bool)
-> (ResponseStatus Contents -> ResponseStatus Contents)
-> ResponseStatus Contents
-> (Bool, ResponseStatus Contents)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& ResponseStatus Contents
-> ResponseStatus Contents -> ResponseStatus Contents
forall a b. a -> b -> a
const ResponseStatus Contents
forall a. ResponseStatus a
RequestFinished
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
requestInFlight (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Socket -> (Socket -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Socket
connSocket ((Socket -> IO ()) -> IO ()) -> (Socket -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Socket
sock ->
          Socket -> ByteString -> IO ()
SB.sendAll Socket
sock (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ RawRequest -> ByteString
forall a. Serialize a => a -> ByteString
encode (RawRequest -> ByteString) -> RawRequest -> ByteString
forall a b. (a -> b) -> a -> b
$ Key -> RawRequest
RawClean Key
reqId

  -- We use a shared resource map here to ensure that we only hold
  -- onto the share connection state TVar for the duration of making a
  -- fetch request. If anything goes wrong in the middle, we're
  -- certain it'll get removed.
  ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
-> Key ConnStateMap
-> IO (TVar (ResponseStatus Contents))
-> (TVar (ResponseStatus Contents) -> IO ())
-> (TVar (ResponseStatus Contents) -> IO r)
-> IO r
forall (m :: * -> *) r a.
ResourceMap m =>
ConcurrentResourceMap m r
-> Key m -> IO r -> (r -> IO ()) -> (r -> IO a) -> IO a
withSharedResource ConcurrentResourceMap ConnStateMap (TVar (ResponseStatus Contents))
connStates Key
Key ConnStateMap
reqId
    (ResponseStatus Contents -> IO (TVar (ResponseStatus Contents))
forall a. a -> IO (TVar a)
newTVarIO ResponseStatus Contents
forall a. ResponseStatus a
WaitingInstant)
    TVar (ResponseStatus Contents) -> IO ()
cleanupRequest ((TVar (ResponseStatus Contents) -> IO r) -> IO r)
-> (TVar (ResponseStatus Contents) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \TVar (ResponseStatus Contents)
reqVar -> do
    -- Send the user request.
    MVar Socket -> (Socket -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Socket
connSocket ((Socket -> IO ()) -> IO ()) -> (Socket -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Socket
sock -> Socket -> ByteString -> IO ()
SB.sendAll Socket
sock (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ RawRequest -> ByteString
forall a. Serialize a => a -> ByteString
encode
      (RawRequest -> ByteString) -> RawRequest -> ByteString
forall a b. (a -> b) -> a -> b
$ Key -> Query -> RawRequest
RawRequest Key
reqId Query
req

    let

      getDelayed :: STM Contents
getDelayed = TVar (ResponseStatus Contents) -> STM (ResponseStatus Contents)
forall a. TVar a -> STM a
readTVar TVar (ResponseStatus Contents)
reqVar STM (ResponseStatus Contents)
-> (ResponseStatus Contents -> STM Contents) -> STM Contents
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        ResponseStatus Contents
RequestFinished -> FranzException -> STM Contents
forall e a. Exception e => e -> STM a
throwSTM FranzException
requestFinished
        ResponseStatus Contents
WaitingDelayed -> STM Contents
forall a. STM a
retry
        Available Contents
xs -> Contents -> STM Contents
forall (m :: * -> *) a. Monad m => a -> m a
return Contents
xs
        Errored FranzException
e -> FranzException -> STM Contents
forall e a. Exception e => e -> STM a
throwSTM FranzException
e
        ResponseStatus Contents
WaitingInstant -> FranzException -> STM Contents
forall e a. Exception e => e -> STM a
throwSTM (FranzException -> STM Contents) -> FranzException -> STM Contents
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
ClientError
          FilePath
"fetch/WaitingDelayed: unexpected state WaitingInstant"

    -- Run the user's continuation. 'withSharedResource' takes care of
    -- any clean-up necessary.
    STM Response -> IO r
cont (STM Response -> IO r) -> STM Response -> IO r
forall a b. (a -> b) -> a -> b
$ TVar (ResponseStatus Contents) -> STM (ResponseStatus Contents)
forall a. TVar a -> STM a
readTVar TVar (ResponseStatus Contents)
reqVar STM (ResponseStatus Contents)
-> (ResponseStatus Contents -> STM Response) -> STM Response
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      ResponseStatus Contents
RequestFinished -> FranzException -> STM Response
forall e a. Exception e => e -> STM a
throwSTM FranzException
requestFinished
      Errored FranzException
e -> FranzException -> STM Response
forall e a. Exception e => e -> STM a
throwSTM FranzException
e
      ResponseStatus Contents
WaitingInstant -> STM Response
forall a. STM a
retry -- wait for an instant response
      Available Contents
xs -> Response -> STM Response
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Response -> STM Response) -> Response -> STM Response
forall a b. (a -> b) -> a -> b
$ Contents -> Response
forall a b. a -> Either a b
Left Contents
xs
      ResponseStatus Contents
WaitingDelayed -> Response -> STM Response
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Response -> STM Response) -> Response -> STM Response
forall a b. (a -> b) -> a -> b
$ STM Contents -> Response
forall a b. b -> Either a b
Right STM Contents
getDelayed
fetch LocalConnection{FilePath
Maybe ProcessHandle
FranzReader
connFuse :: Maybe ProcessHandle
connReader :: FranzReader
connDir :: FilePath
connFuse :: Connection -> Maybe ProcessHandle
connReader :: Connection -> FranzReader
connDir :: Connection -> FilePath
..} Query
query STM Response -> IO r
cont
  = FranzPrefix
-> FranzReader
-> FranzDirectory
-> Query
-> (FranzException -> IO r)
-> (Stream -> STM (Bool, QueryResult) -> IO r)
-> IO r
forall r.
FranzPrefix
-> FranzReader
-> FranzDirectory
-> Query
-> (FranzException -> IO r)
-> (Stream -> STM (Bool, QueryResult) -> IO r)
-> IO r
handleQuery (FilePath -> FranzPrefix
FranzPrefix FilePath
"") FranzReader
connReader (FilePath -> FranzDirectory
FranzDirectory FilePath
connDir) Query
query
  (STM Response -> IO r
cont (STM Response -> IO r)
-> (FranzException -> STM Response) -> FranzException -> IO r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FranzException -> STM Response
forall e a. Exception e => e -> STM a
throwSTM)
  ((Stream -> STM (Bool, QueryResult) -> IO r) -> IO r)
-> (Stream -> STM (Bool, QueryResult) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Stream
stream STM (Bool, QueryResult)
transaction -> STM (Bool, QueryResult) -> IO (Bool, QueryResult)
forall a. STM a -> IO a
atomically STM (Bool, QueryResult)
transaction IO (Bool, QueryResult) -> ((Bool, QueryResult) -> IO r) -> IO r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    (Bool
False, QueryResult
_) -> do
      TMVar (Either SomeException Contents)
vResp <- IO (TMVar (Either SomeException Contents))
forall a. IO (TMVar a)
newEmptyTMVarIO
      ThreadId
tid <- (IO Contents
 -> (Either SomeException Contents -> IO ()) -> IO ThreadId)
-> (Either SomeException Contents -> IO ())
-> IO Contents
-> IO ThreadId
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO Contents
-> (Either SomeException Contents -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either SomeException Contents -> STM ())
-> Either SomeException Contents
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either SomeException Contents)
-> Either SomeException Contents -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException Contents)
vResp) (IO Contents -> IO ThreadId) -> IO Contents -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
        QueryResult
result <- STM QueryResult -> IO QueryResult
forall a. STM a -> IO a
atomically (STM QueryResult -> IO QueryResult)
-> STM QueryResult -> IO QueryResult
forall a b. (a -> b) -> a -> b
$ do
          (Bool
ready, QueryResult
result) <- STM (Bool, QueryResult)
transaction
          Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard Bool
ready
          QueryResult -> STM QueryResult
forall (f :: * -> *) a. Applicative f => a -> f a
pure QueryResult
result
        Stream -> QueryResult -> IO Contents
readContents Stream
stream QueryResult
result
      STM Response -> IO r
cont (Response -> STM Response
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Response -> STM Response) -> Response -> STM Response
forall a b. (a -> b) -> a -> b
$ STM Contents -> Response
forall a b. b -> Either a b
Right (STM Contents -> Response) -> STM Contents -> Response
forall a b. (a -> b) -> a -> b
$ TMVar (Either SomeException Contents)
-> STM (Either SomeException Contents)
forall a. TMVar a -> STM a
takeTMVar TMVar (Either SomeException Contents)
vResp STM (Either SomeException Contents)
-> (Either SomeException Contents -> STM Contents) -> STM Contents
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SomeException -> STM Contents)
-> (Contents -> STM Contents)
-> Either SomeException Contents
-> STM Contents
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> STM Contents
forall e a. Exception e => e -> STM a
throwSTM Contents -> STM Contents
forall (f :: * -> *) a. Applicative f => a -> f a
pure)
        IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`finally` ThreadId -> FranzException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tid FranzException
requestFinished
    (Bool
True, QueryResult
result) -> do
      TMVar (Either SomeException Contents)
vResp <- IO (TMVar (Either SomeException Contents))
forall a. IO (TMVar a)
newEmptyTMVarIO
      ThreadId
tid <- IO Contents
-> (Either SomeException Contents -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (Stream -> QueryResult -> IO Contents
readContents Stream
stream QueryResult
result) (STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either SomeException Contents -> STM ())
-> Either SomeException Contents
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either SomeException Contents)
-> Either SomeException Contents -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException Contents)
vResp)
      STM Response -> IO r
cont (TMVar (Either SomeException Contents)
-> STM (Either SomeException Contents)
forall a. TMVar a -> STM a
takeTMVar TMVar (Either SomeException Contents)
vResp STM (Either SomeException Contents)
-> (Either SomeException Contents -> STM Response) -> STM Response
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SomeException -> STM Response)
-> (Contents -> STM Response)
-> Either SomeException Contents
-> STM Response
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> STM Response
forall e a. Exception e => e -> STM a
throwSTM (Response -> STM Response
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Response -> STM Response)
-> (Contents -> Response) -> Contents -> STM Response
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Contents -> Response
forall a b. a -> Either a b
Left))
        IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`finally` ThreadId -> FranzException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tid FranzException
requestFinished

requestFinished :: FranzException
requestFinished :: FranzException
requestFinished = FilePath -> FranzException
ClientError FilePath
"request already finished"

-- | Send a single query and wait for the result.
fetchSimple :: Connection
  -> Int -- ^ timeout in microseconds
  -> Query
  -> IO (Maybe Contents)
fetchSimple :: Connection -> Key -> Query -> IO (Maybe Contents)
fetchSimple Connection
conn Key
timeout Query
req = Connection
-> Query
-> (STM Response -> IO (Maybe Contents))
-> IO (Maybe Contents)
forall r. Connection -> Query -> (STM Response -> IO r) -> IO r
fetch Connection
conn Query
req
  ((STM Response -> IO (Maybe Contents)) -> IO (Maybe Contents))
-> (STM Response -> IO (Maybe Contents)) -> IO (Maybe Contents)
forall a b. (a -> b) -> a -> b
$ Key -> STM Contents -> IO (Maybe Contents)
forall a. Key -> STM a -> IO (Maybe a)
atomicallyWithin Key
timeout (STM Contents -> IO (Maybe Contents))
-> (STM Response -> STM Contents)
-> STM Response
-> IO (Maybe Contents)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM Response -> STM Contents
forall a. STM (Either a (STM a)) -> STM a
awaitResponse

atomicallyWithin :: Int -- ^ timeout in microseconds
  -> STM a
  -> IO (Maybe a)
atomicallyWithin :: Key -> STM a -> IO (Maybe a)
atomicallyWithin Key
timeout STM a
m = do
  Delay
d <- Key -> IO Delay
newDelay Key
timeout
  STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (STM (Maybe a) -> IO (Maybe a)) -> STM (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ (a -> Maybe a) -> STM a -> STM (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just STM a
m STM (Maybe a) -> STM (Maybe a) -> STM (Maybe a)
forall a. STM a -> STM a -> STM a
`orElse` (Maybe a
forall a. Maybe a
Nothing Maybe a -> STM () -> STM (Maybe a)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Delay -> STM ()
waitDelay Delay
d)