{-# LANGUAGE DeriveGeneric, LambdaCase, OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE TypeFamilies #-}
module Database.Franz.Server
  ( Settings(..)
  , startServer
  , FranzPrefix(..)
  , defaultPort
  ) where

import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Cont
import Control.Concurrent.STM
import Database.Franz.Internal.Fuse
import Database.Franz.Internal.IO
import Database.Franz.Internal.Protocol
import Database.Franz.Internal.Reader
import Data.ConcurrentResourceMap
import Data.Serialize
import qualified Data.IntMap.Strict as IM
import Data.IORef
import qualified Data.ByteString.Char8 as B
import qualified Data.HashMap.Strict as HM
import Data.Tuple (swap)
import qualified Network.Socket.SendFile.Handle as SF
import qualified Network.Socket.ByteString as SB
import qualified Network.Socket as S
import System.Directory
import System.IO
import System.Process (ProcessHandle, getPid)

data Env = Env
  { Env -> FranzPrefix
prefix :: FranzPrefix
  , Env -> FranzDirectory
path :: FranzDirectory
  , Env -> FranzReader
franzReader :: FranzReader
  , Env -> IORef (IntMap ThreadId)
refThreads :: IORef (IM.IntMap ThreadId) -- ^ thread pool of pending requests
  , Env -> IORef ByteString
recvBuffer :: IORef B.ByteString -- ^ received but unconsumed bytes
  , Env -> MVar Socket
vConn :: MVar S.Socket -- ^ connection to the client
  }

trySTM :: Exception e => STM a -> STM (Either e a)
trySTM :: STM a -> STM (Either e a)
trySTM STM a
m = (a -> Either e a) -> STM a -> STM (Either e a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Either e a
forall a b. b -> Either a b
Right STM a
m STM (Either e a) -> (e -> STM (Either e a)) -> STM (Either e a)
forall e a. Exception e => STM a -> (e -> STM a) -> STM a
`catchSTM` (Either e a -> STM (Either e a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either e a -> STM (Either e a))
-> (e -> Either e a) -> e -> STM (Either e a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> Either e a
forall a b. a -> Either a b
Left)

handleRaw :: Env -> RawRequest -> IO ()
handleRaw :: Env -> RawRequest -> IO ()
handleRaw env :: Env
env@Env{IORef ByteString
IORef (IntMap ThreadId)
MVar Socket
FranzDirectory
FranzPrefix
FranzReader
vConn :: MVar Socket
recvBuffer :: IORef ByteString
refThreads :: IORef (IntMap ThreadId)
franzReader :: FranzReader
path :: FranzDirectory
prefix :: FranzPrefix
vConn :: Env -> MVar Socket
recvBuffer :: Env -> IORef ByteString
refThreads :: Env -> IORef (IntMap ThreadId)
franzReader :: Env -> FranzReader
path :: Env -> FranzDirectory
prefix :: Env -> FranzPrefix
..} (RawRequest ResponseId
reqId Query
req) = do
  let pop :: Either SomeException () -> IO ()
pop Either SomeException ()
result = do
        case Either SomeException ()
result of
          Left SomeException
ex | Just FranzException
e <- SomeException -> Maybe FranzException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex -> Env -> ResponseHeader -> IO ()
sendHeader Env
env (ResponseHeader -> IO ()) -> ResponseHeader -> IO ()
forall a b. (a -> b) -> a -> b
$ ResponseId -> FranzException -> ResponseHeader
ResponseError ResponseId
reqId FranzException
e
          Either SomeException ()
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        IO () -> IO (Maybe ThreadId) -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` Env -> ResponseId -> IO (Maybe ThreadId)
popThread Env
env ResponseId
reqId
  FranzPrefix
-> FranzReader
-> FranzDirectory
-> Query
-> (FranzException -> IO ())
-> (Stream -> STM (Bool, QueryResult) -> IO ())
-> IO ()
forall r.
FranzPrefix
-> FranzReader
-> FranzDirectory
-> Query
-> (FranzException -> IO r)
-> (Stream -> STM (Bool, QueryResult) -> IO r)
-> IO r
handleQuery FranzPrefix
prefix FranzReader
franzReader FranzDirectory
path Query
req FranzException -> IO ()
forall e a. Exception e => e -> IO a
throwIO ((Stream -> STM (Bool, QueryResult) -> IO ()) -> IO ())
-> (Stream -> STM (Bool, QueryResult) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Stream
stream STM (Bool, QueryResult)
query ->
    STM (Either FranzException (Bool, QueryResult))
-> IO (Either FranzException (Bool, QueryResult))
forall a. STM a -> IO a
atomically (STM (Bool, QueryResult)
-> STM (Either FranzException (Bool, QueryResult))
forall e a. Exception e => STM a -> STM (Either e a)
trySTM STM (Bool, QueryResult)
query) IO (Either FranzException (Bool, QueryResult))
-> (Either FranzException (Bool, QueryResult) -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left FranzException
e -> Env -> ResponseHeader -> IO ()
sendHeader Env
env (ResponseHeader -> IO ()) -> ResponseHeader -> IO ()
forall a b. (a -> b) -> a -> b
$ ResponseId -> FranzException -> ResponseHeader
ResponseError ResponseId
reqId FranzException
e
      Right (Bool
ready, QueryResult
offsets)
        | Bool
ready -> Env -> ResponseHeader -> Stream -> QueryResult -> IO ()
sendContents Env
env (ResponseId -> ResponseHeader
Response ResponseId
reqId) Stream
stream QueryResult
offsets
        | Bool
otherwise -> do
          ThreadId
tid <- (IO () -> (Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO () -> IO ThreadId
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally Either SomeException () -> IO ()
pop (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO () -> IO ()
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_
            (STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream -> STM ()
addActivity Stream
stream)
            (Stream -> IO ()
removeActivity Stream
stream) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
              Env -> ResponseHeader -> IO ()
sendHeader Env
env (ResponseHeader -> IO ()) -> ResponseHeader -> IO ()
forall a b. (a -> b) -> a -> b
$ ResponseId -> ResponseHeader
ResponseWait ResponseId
reqId
              IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ STM (Bool, QueryResult)
-> STM (Either FranzException (Bool, QueryResult))
forall e a. Exception e => STM a -> STM (Either e a)
trySTM STM (Bool, QueryResult)
query STM (Either FranzException (Bool, QueryResult))
-> (Either FranzException (Bool, QueryResult) -> STM (IO ()))
-> STM (IO ())
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                Left FranzException
e -> IO () -> STM (IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ Env -> ResponseHeader -> IO ()
sendHeader Env
env (ResponseHeader -> IO ()) -> ResponseHeader -> IO ()
forall a b. (a -> b) -> a -> b
$ ResponseId -> FranzException -> ResponseHeader
ResponseError ResponseId
reqId FranzException
e
                Right (Bool
ready', QueryResult
offsets') -> do
                  Bool -> STM ()
check Bool
ready'
                  IO () -> STM (IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ Env -> ResponseHeader -> Stream -> QueryResult -> IO ()
sendContents Env
env (ResponseId -> ResponseHeader
Response ResponseId
reqId) Stream
stream QueryResult
offsets'
          -- Store the thread ID of the thread yielding a future
          -- response such that we can kill it mid-way if user
          -- sends a cancel request or we're killed with an
          -- exception.
          IORef (IntMap ThreadId)
-> (IntMap ThreadId -> (IntMap ThreadId, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (IntMap ThreadId)
refThreads ((IntMap ThreadId -> (IntMap ThreadId, ())) -> IO ())
-> (IntMap ThreadId -> (IntMap ThreadId, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IntMap ThreadId
m -> (ResponseId -> ThreadId -> IntMap ThreadId -> IntMap ThreadId
forall a. ResponseId -> a -> IntMap a -> IntMap a
IM.insert ResponseId
reqId ThreadId
tid IntMap ThreadId
m, ())
  IO () -> (FranzException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \FranzException
e -> Env -> ResponseHeader -> IO ()
sendHeader Env
env (ResponseHeader -> IO ()) -> ResponseHeader -> IO ()
forall a b. (a -> b) -> a -> b
$ ResponseId -> FranzException -> ResponseHeader
ResponseError ResponseId
reqId FranzException
e

handleRaw Env
env (RawClean ResponseId
reqId) = do
  Maybe ThreadId
tid <- Env -> ResponseId -> IO (Maybe ThreadId)
popThread Env
env ResponseId
reqId
  (ThreadId -> IO ()) -> Maybe ThreadId -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ThreadId -> IO ()
killThread Maybe ThreadId
tid

-- | Pick up a 'ThreadId' from a pool.
popThread :: Env -> Int -> IO (Maybe ThreadId)
popThread :: Env -> ResponseId -> IO (Maybe ThreadId)
popThread Env{IORef ByteString
IORef (IntMap ThreadId)
MVar Socket
FranzDirectory
FranzPrefix
FranzReader
vConn :: MVar Socket
recvBuffer :: IORef ByteString
refThreads :: IORef (IntMap ThreadId)
franzReader :: FranzReader
path :: FranzDirectory
prefix :: FranzPrefix
vConn :: Env -> MVar Socket
recvBuffer :: Env -> IORef ByteString
refThreads :: Env -> IORef (IntMap ThreadId)
franzReader :: Env -> FranzReader
path :: Env -> FranzDirectory
prefix :: Env -> FranzPrefix
..} ResponseId
reqId = IORef (IntMap ThreadId)
-> (IntMap ThreadId -> (IntMap ThreadId, Maybe ThreadId))
-> IO (Maybe ThreadId)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (IntMap ThreadId)
refThreads
  ((IntMap ThreadId -> (IntMap ThreadId, Maybe ThreadId))
 -> IO (Maybe ThreadId))
-> (IntMap ThreadId -> (IntMap ThreadId, Maybe ThreadId))
-> IO (Maybe ThreadId)
forall a b. (a -> b) -> a -> b
$ (Maybe ThreadId, IntMap ThreadId)
-> (IntMap ThreadId, Maybe ThreadId)
forall a b. (a, b) -> (b, a)
swap ((Maybe ThreadId, IntMap ThreadId)
 -> (IntMap ThreadId, Maybe ThreadId))
-> (IntMap ThreadId -> (Maybe ThreadId, IntMap ThreadId))
-> IntMap ThreadId
-> (IntMap ThreadId, Maybe ThreadId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ResponseId -> ThreadId -> Maybe ThreadId)
-> ResponseId
-> IntMap ThreadId
-> (Maybe ThreadId, IntMap ThreadId)
forall a.
(ResponseId -> a -> Maybe a)
-> ResponseId -> IntMap a -> (Maybe a, IntMap a)
IM.updateLookupWithKey (\ResponseId
_ ThreadId
_ -> Maybe ThreadId
forall a. Maybe a
Nothing) ResponseId
reqId

sendHeader :: Env -> ResponseHeader -> IO ()
sendHeader :: Env -> ResponseHeader -> IO ()
sendHeader Env{IORef ByteString
IORef (IntMap ThreadId)
MVar Socket
FranzDirectory
FranzPrefix
FranzReader
vConn :: MVar Socket
recvBuffer :: IORef ByteString
refThreads :: IORef (IntMap ThreadId)
franzReader :: FranzReader
path :: FranzDirectory
prefix :: FranzPrefix
vConn :: Env -> MVar Socket
recvBuffer :: Env -> IORef ByteString
refThreads :: Env -> IORef (IntMap ThreadId)
franzReader :: Env -> FranzReader
path :: Env -> FranzDirectory
prefix :: Env -> FranzPrefix
..} ResponseHeader
x = MVar Socket -> (Socket -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Socket
vConn ((Socket -> IO ()) -> IO ()) -> (Socket -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Socket
conn -> Socket -> ByteString -> IO ()
SB.sendAll Socket
conn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ResponseHeader -> ByteString
forall a. Serialize a => a -> ByteString
encode ResponseHeader
x

sendContents :: Env -> ResponseHeader -> Stream -> QueryResult -> IO ()
sendContents :: Env -> ResponseHeader -> Stream -> QueryResult -> IO ()
sendContents Env{IORef ByteString
IORef (IntMap ThreadId)
MVar Socket
FranzDirectory
FranzPrefix
FranzReader
vConn :: MVar Socket
recvBuffer :: IORef ByteString
refThreads :: IORef (IntMap ThreadId)
franzReader :: FranzReader
path :: FranzDirectory
prefix :: FranzPrefix
vConn :: Env -> MVar Socket
recvBuffer :: Env -> IORef ByteString
refThreads :: Env -> IORef (IntMap ThreadId)
franzReader :: Env -> FranzReader
path :: Env -> FranzDirectory
prefix :: Env -> FranzPrefix
..} ResponseHeader
header Stream{FilePath
Handle
ThreadId
TVar ResponseId
TVar Activity
TVar (IntMap ResponseId)
TVar StreamStatus
HashMap ByteString (TVar (IntMap ResponseId))
Vector ByteString
vActivity :: Stream -> TVar Activity
payloadHandle :: Stream -> Handle
indexHandle :: Stream -> Handle
followThread :: Stream -> ThreadId
vStatus :: Stream -> TVar StreamStatus
vCount :: Stream -> TVar ResponseId
indices :: Stream -> HashMap ByteString (TVar (IntMap ResponseId))
indexNames :: Stream -> Vector ByteString
vOffsets :: Stream -> TVar (IntMap ResponseId)
streamPath :: Stream -> FilePath
vActivity :: TVar Activity
payloadHandle :: Handle
indexHandle :: Handle
followThread :: ThreadId
vStatus :: TVar StreamStatus
vCount :: TVar ResponseId
indices :: HashMap ByteString (TVar (IntMap ResponseId))
indexNames :: Vector ByteString
vOffsets :: TVar (IntMap ResponseId)
streamPath :: FilePath
..} ((ResponseId
s0, ResponseId
p0), (ResponseId
s1, ResponseId
p1)) = MVar Socket -> (Socket -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Socket
vConn ((Socket -> IO ()) -> IO ()) -> (Socket -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Socket
conn -> do
  Socket -> ByteString -> IO ()
SB.sendAll Socket
conn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ (ResponseHeader, PayloadHeader) -> ByteString
forall a. Serialize a => a -> ByteString
encode (ResponseHeader
header, ResponseId
-> ResponseId -> ResponseId -> Vector ByteString -> PayloadHeader
PayloadHeader ResponseId
s0 ResponseId
s1 ResponseId
p0 Vector ByteString
indexNames)
  -- byte offset + number of indices
  let siz :: ResponseId
siz = ResponseId
8 ResponseId -> ResponseId -> ResponseId
forall a. Num a => a -> a -> a
* (Vector ByteString -> ResponseId
forall (t :: * -> *) a. Foldable t => t a -> ResponseId
length Vector ByteString
indexNames ResponseId -> ResponseId -> ResponseId
forall a. Num a => a -> a -> a
+ ResponseId
1)
  -- Send byte offsets and indices
  Socket -> Handle -> Offset -> Offset -> IO ()
SF.sendFile' Socket
conn Handle
indexHandle (ResponseId -> Offset
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ResponseId -> Offset) -> ResponseId -> Offset
forall a b. (a -> b) -> a -> b
$ ResponseId
siz ResponseId -> ResponseId -> ResponseId
forall a. Num a => a -> a -> a
* ResponseId -> ResponseId
forall a. Enum a => a -> a
succ ResponseId
s0) (ResponseId -> Offset
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ResponseId -> Offset) -> ResponseId -> Offset
forall a b. (a -> b) -> a -> b
$ ResponseId
siz ResponseId -> ResponseId -> ResponseId
forall a. Num a => a -> a -> a
* (ResponseId
s1 ResponseId -> ResponseId -> ResponseId
forall a. Num a => a -> a -> a
- ResponseId
s0))
  -- Send payloads
  Socket -> Handle -> Offset -> Offset -> IO ()
SF.sendFile' Socket
conn Handle
payloadHandle (ResponseId -> Offset
forall a b. (Integral a, Num b) => a -> b
fromIntegral ResponseId
p0) (ResponseId -> Offset
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ResponseId -> Offset) -> ResponseId -> Offset
forall a b. (a -> b) -> a -> b
$ ResponseId
p1 ResponseId -> ResponseId -> ResponseId
forall a. Num a => a -> a -> a
- ResponseId
p0)

respond :: Env -> IO ()
respond :: Env -> IO ()
respond env :: Env
env@Env{IORef ByteString
IORef (IntMap ThreadId)
MVar Socket
FranzDirectory
FranzPrefix
FranzReader
vConn :: MVar Socket
recvBuffer :: IORef ByteString
refThreads :: IORef (IntMap ThreadId)
franzReader :: FranzReader
path :: FranzDirectory
prefix :: FranzPrefix
vConn :: Env -> MVar Socket
recvBuffer :: Env -> IORef ByteString
refThreads :: Env -> IORef (IntMap ThreadId)
franzReader :: Env -> FranzReader
path :: Env -> FranzDirectory
prefix :: Env -> FranzPrefix
..} = do
  Socket
recvConn <- MVar Socket -> IO Socket
forall a. MVar a -> IO a
readMVar MVar Socket
vConn
  IORef ByteString
-> Socket -> Get RawRequest -> IO (Either FilePath RawRequest)
forall a.
IORef ByteString -> Socket -> Get a -> IO (Either FilePath a)
runGetRecv IORef ByteString
recvBuffer Socket
recvConn Get RawRequest
forall t. Serialize t => Get t
get IO (Either FilePath RawRequest)
-> (Either FilePath RawRequest -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Right RawRequest
raw -> Env -> RawRequest -> IO ()
handleRaw Env
env RawRequest
raw
    Left FilePath
err -> FranzException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO ()) -> FranzException -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
MalformedRequest FilePath
err

data Settings = Settings
  { Settings -> Double
reapInterval :: Double
  , Settings -> Double
streamLifetime :: Double
  , Settings -> PortNumber
port :: S.PortNumber
  , Settings -> FranzPrefix
livePrefix :: FranzPrefix
  , Settings -> Maybe FranzPrefix
archivePrefix :: Maybe FranzPrefix
  , Settings -> FranzPrefix
mountPrefix :: FranzPrefix
  }

newtype MountMap v = MountMap (HM.HashMap FilePath v)

instance ResourceMap MountMap where
  type Key MountMap = FilePath
  empty :: MountMap v
empty = HashMap FilePath v -> MountMap v
forall v. HashMap FilePath v -> MountMap v
MountMap HashMap FilePath v
forall a. Monoid a => a
mempty
  delete :: Key MountMap -> MountMap v -> MountMap v
delete Key MountMap
k (MountMap HashMap FilePath v
m) = HashMap FilePath v -> MountMap v
forall v. HashMap FilePath v -> MountMap v
MountMap (FilePath -> HashMap FilePath v -> HashMap FilePath v
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete FilePath
Key MountMap
k HashMap FilePath v
m)
  insert :: Key MountMap -> v -> MountMap v -> MountMap v
insert Key MountMap
k v
v (MountMap HashMap FilePath v
m) = HashMap FilePath v -> MountMap v
forall v. HashMap FilePath v -> MountMap v
MountMap (FilePath -> v -> HashMap FilePath v -> HashMap FilePath v
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HM.insert FilePath
Key MountMap
k v
v HashMap FilePath v
m)
  lookup :: Key MountMap -> MountMap v -> Maybe v
lookup Key MountMap
k (MountMap HashMap FilePath v
m) = FilePath -> HashMap FilePath v -> Maybe v
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup FilePath
Key MountMap
k HashMap FilePath v
m

newMountMap :: IO (ConcurrentResourceMap MountMap ProcessHandle)
newMountMap :: IO (ConcurrentResourceMap MountMap ProcessHandle)
newMountMap = IO (ConcurrentResourceMap MountMap ProcessHandle)
forall (m :: * -> *) r.
ResourceMap m =>
IO (ConcurrentResourceMap m r)
newResourceMap

startServer
    :: Settings
    -> IO ()
startServer :: Settings -> IO ()
startServer Settings{Double
Maybe FranzPrefix
PortNumber
FranzPrefix
mountPrefix :: FranzPrefix
archivePrefix :: Maybe FranzPrefix
livePrefix :: FranzPrefix
port :: PortNumber
streamLifetime :: Double
reapInterval :: Double
mountPrefix :: Settings -> FranzPrefix
archivePrefix :: Settings -> Maybe FranzPrefix
livePrefix :: Settings -> FranzPrefix
port :: Settings -> PortNumber
streamLifetime :: Settings -> Double
reapInterval :: Settings -> Double
..} = ContT () IO () -> IO ()
forall (m :: * -> *) r. Monad m => ContT r m r -> m r
evalContT (ContT () IO () -> IO ()) -> ContT () IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  FranzReader
franzReader <- ((FranzReader -> IO ()) -> IO ()) -> ContT () IO FranzReader
forall k (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (FranzReader -> IO ()) -> IO ()
withFranzReader

  IO () -> ContT () IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ContT () IO ()) -> IO () -> ContT () IO ()
forall a b. (a -> b) -> a -> b
$ do
    Maybe FranzPrefix -> (FranzPrefix -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe FranzPrefix
archivePrefix ((FranzPrefix -> IO ()) -> IO ())
-> (FranzPrefix -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(FranzPrefix FilePath
path) -> do
      Bool
e <- FilePath -> IO Bool
doesDirectoryExist FilePath
path
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
e (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> IO ()
forall a. HasCallStack => FilePath -> a
error (FilePath -> IO ()) -> FilePath -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath
"archive prefix " FilePath -> FilePath -> FilePath
forall a. [a] -> [a] -> [a]
++ FilePath
path FilePath -> FilePath -> FilePath
forall a. [a] -> [a] -> [a]
++ FilePath
" doesn't exist"

    ThreadId
_ <- IO ThreadId -> IO ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> IO ThreadId) -> IO ThreadId -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Double -> Double -> FranzReader -> IO ()
reaper Double
reapInterval Double
streamLifetime FranzReader
franzReader
    Handle -> BufferMode -> IO ()
hSetBuffering Handle
stderr BufferMode
LineBuffering

  ConcurrentResourceMap MountMap ProcessHandle
vMounts <- IO (ConcurrentResourceMap MountMap ProcessHandle)
-> ContT () IO (ConcurrentResourceMap MountMap ProcessHandle)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (ConcurrentResourceMap MountMap ProcessHandle)
newMountMap

  let hints :: AddrInfo
hints = AddrInfo
S.defaultHints { addrFlags :: [AddrInfoFlag]
S.addrFlags = [AddrInfoFlag
S.AI_NUMERICHOST, AddrInfoFlag
S.AI_NUMERICSERV], addrSocketType :: SocketType
S.addrSocketType = SocketType
S.Stream }
  AddrInfo
addr:[AddrInfo]
_ <- IO [AddrInfo] -> ContT () IO [AddrInfo]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [AddrInfo] -> ContT () IO [AddrInfo])
-> IO [AddrInfo] -> ContT () IO [AddrInfo]
forall a b. (a -> b) -> a -> b
$ Maybe AddrInfo -> Maybe FilePath -> Maybe FilePath -> IO [AddrInfo]
S.getAddrInfo (AddrInfo -> Maybe AddrInfo
forall a. a -> Maybe a
Just AddrInfo
hints) (FilePath -> Maybe FilePath
forall a. a -> Maybe a
Just FilePath
"0.0.0.0") (FilePath -> Maybe FilePath
forall a. a -> Maybe a
Just (FilePath -> Maybe FilePath) -> FilePath -> Maybe FilePath
forall a b. (a -> b) -> a -> b
$ PortNumber -> FilePath
forall a. Show a => a -> FilePath
show PortNumber
port)
  -- obtain a socket and start listening
  Socket
sock <- ((Socket -> IO ()) -> IO ()) -> ContT () IO Socket
forall k (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Socket -> IO ()) -> IO ()) -> ContT () IO Socket)
-> ((Socket -> IO ()) -> IO ()) -> ContT () IO Socket
forall a b. (a -> b) -> a -> b
$ IO Socket -> (Socket -> IO ()) -> (Socket -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (Family -> SocketType -> ProtocolNumber -> IO Socket
S.socket (AddrInfo -> Family
S.addrFamily AddrInfo
addr) SocketType
S.Stream (AddrInfo -> ProtocolNumber
S.addrProtocol AddrInfo
addr)) Socket -> IO ()
S.close

  IO () -> ContT () IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ContT () IO ()) -> IO () -> ContT () IO ()
forall a b. (a -> b) -> a -> b
$ do
    Socket -> SocketOption -> ResponseId -> IO ()
S.setSocketOption Socket
sock SocketOption
S.ReuseAddr ResponseId
1
    Socket -> SocketOption -> ResponseId -> IO ()
S.setSocketOption Socket
sock SocketOption
S.NoDelay ResponseId
1
    Socket -> SockAddr -> IO ()
S.bind Socket
sock (SockAddr -> IO ()) -> SockAddr -> IO ()
forall a b. (a -> b) -> a -> b
$ AddrInfo -> SockAddr
S.addrAddress AddrInfo
addr
    Socket -> ResponseId -> IO ()
S.listen Socket
sock ResponseId
S.maxListenQueue

  [FilePath] -> ContT () IO ()
forall (m :: * -> *). MonadIO m => [FilePath] -> m ()
logServer [FilePath
"Listening on", PortNumber -> FilePath
forall a. Show a => a -> FilePath
show PortNumber
port]

  IO () -> ContT () IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ContT () IO ()) -> IO () -> ContT () IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    (Socket
conn, SockAddr
connAddr) <- Socket -> IO (Socket, SockAddr)
S.accept Socket
sock

    IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (Settings
-> ConcurrentResourceMap MountMap ProcessHandle
-> FranzReader
-> Socket
-> SockAddr
-> IO ()
accept Settings :: Double
-> Double
-> PortNumber
-> FranzPrefix
-> Maybe FranzPrefix
-> FranzPrefix
-> Settings
Settings{Double
Maybe FranzPrefix
PortNumber
FranzPrefix
mountPrefix :: FranzPrefix
archivePrefix :: Maybe FranzPrefix
livePrefix :: FranzPrefix
port :: PortNumber
streamLifetime :: Double
reapInterval :: Double
mountPrefix :: FranzPrefix
archivePrefix :: Maybe FranzPrefix
livePrefix :: FranzPrefix
port :: PortNumber
streamLifetime :: Double
reapInterval :: Double
..} ConcurrentResourceMap MountMap ProcessHandle
vMounts FranzReader
franzReader Socket
conn SockAddr
connAddr) ((Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \Either SomeException ()
result -> do
      case Either SomeException ()
result of
        Left SomeException
ex -> case SomeException -> Maybe FranzException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
          Just FranzException
e -> Socket -> ByteString -> IO ()
SB.sendAll Socket
conn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ResponseHeader -> ByteString
forall a. Serialize a => a -> ByteString
encode (ResponseHeader -> ByteString) -> ResponseHeader -> ByteString
forall a b. (a -> b) -> a -> b
$ ResponseId -> FranzException -> ResponseHeader
ResponseError (-ResponseId
1) FranzException
e
          Maybe FranzException
Nothing -> [FilePath] -> IO ()
forall (m :: * -> *). MonadIO m => [FilePath] -> m ()
logServer [SomeException -> FilePath
forall a. Show a => a -> FilePath
show SomeException
ex]
        Right ()
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Socket -> IO ()
S.close Socket
conn
      [FilePath] -> IO ()
forall (m :: * -> *). MonadIO m => [FilePath] -> m ()
logServer [SockAddr -> FilePath
forall a. Show a => a -> FilePath
show SockAddr
connAddr, FilePath
"disconnected"]

accept :: Settings -> ConcurrentResourceMap MountMap ProcessHandle -> FranzReader -> S.Socket -> S.SockAddr -> IO ()
accept :: Settings
-> ConcurrentResourceMap MountMap ProcessHandle
-> FranzReader
-> Socket
-> SockAddr
-> IO ()
accept Settings{Double
Maybe FranzPrefix
PortNumber
FranzPrefix
mountPrefix :: FranzPrefix
archivePrefix :: Maybe FranzPrefix
livePrefix :: FranzPrefix
port :: PortNumber
streamLifetime :: Double
reapInterval :: Double
mountPrefix :: Settings -> FranzPrefix
archivePrefix :: Settings -> Maybe FranzPrefix
livePrefix :: Settings -> FranzPrefix
port :: Settings -> PortNumber
streamLifetime :: Settings -> Double
reapInterval :: Settings -> Double
..} ConcurrentResourceMap MountMap ProcessHandle
vMounts FranzReader
franzReader Socket
conn SockAddr
connAddr = do
  -- buffer of received octets
  IORef ByteString
recvBuffer <- ByteString -> IO (IORef ByteString)
forall a. a -> IO (IORef a)
newIORef ByteString
B.empty

  let respondLoop :: FranzPrefix -> FranzDirectory -> IO ()
respondLoop FranzPrefix
prefix path :: FranzDirectory
path@(FranzDirectory FilePath
dir) = do
        Socket -> ByteString -> IO ()
SB.sendAll Socket
conn ByteString
apiVersion
        [FilePath] -> IO ()
forall (m :: * -> *). MonadIO m => [FilePath] -> m ()
logServer [SockAddr -> FilePath
forall a. Show a => a -> FilePath
show SockAddr
connAddr, FilePath -> FilePath
forall a. Show a => a -> FilePath
show FilePath
dir]
        IORef (IntMap ThreadId)
refThreads <- IntMap ThreadId -> IO (IORef (IntMap ThreadId))
forall a. a -> IO (IORef a)
newIORef IntMap ThreadId
forall a. IntMap a
IM.empty
        MVar Socket
vConn <- Socket -> IO (MVar Socket)
forall a. a -> IO (MVar a)
newMVar Socket
conn
        IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Env -> IO ()
respond Env :: FranzPrefix
-> FranzDirectory
-> FranzReader
-> IORef (IntMap ThreadId)
-> IORef ByteString
-> MVar Socket
-> Env
Env{IORef ByteString
IORef (IntMap ThreadId)
MVar Socket
FranzDirectory
FranzPrefix
FranzReader
vConn :: MVar Socket
refThreads :: IORef (IntMap ThreadId)
path :: FranzDirectory
prefix :: FranzPrefix
recvBuffer :: IORef ByteString
franzReader :: FranzReader
vConn :: MVar Socket
recvBuffer :: IORef ByteString
refThreads :: IORef (IntMap ThreadId)
franzReader :: FranzReader
path :: FranzDirectory
prefix :: FranzPrefix
..}) IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` do
          IORef (IntMap ThreadId) -> IO (IntMap ThreadId)
forall a. IORef a -> IO a
readIORef IORef (IntMap ThreadId)
refThreads IO (IntMap ThreadId) -> (IntMap ThreadId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (ThreadId -> IO ()) -> IntMap ThreadId -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ThreadId -> IO ()
killThread

  FranzDirectory
path <- IO FranzDirectory -> IO FranzDirectory
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO FranzDirectory -> IO FranzDirectory)
-> IO FranzDirectory -> IO FranzDirectory
forall a b. (a -> b) -> a -> b
$ IORef ByteString
-> Socket -> Get ByteString -> IO (Either FilePath ByteString)
forall a.
IORef ByteString -> Socket -> Get a -> IO (Either FilePath a)
runGetRecv IORef ByteString
recvBuffer Socket
conn Get ByteString
forall t. Serialize t => Get t
get IO (Either FilePath ByteString)
-> (Either FilePath ByteString -> IO FranzDirectory)
-> IO FranzDirectory
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Left FilePath
_ -> FranzException -> IO FranzDirectory
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO FranzDirectory)
-> FranzException -> IO FranzDirectory
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzException
MalformedRequest FilePath
"Expecting a path"
    Right ByteString
pathBS -> FranzDirectory -> IO FranzDirectory
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FranzDirectory -> IO FranzDirectory)
-> FranzDirectory -> IO FranzDirectory
forall a b. (a -> b) -> a -> b
$ FilePath -> FranzDirectory
FranzDirectory (FilePath -> FranzDirectory) -> FilePath -> FranzDirectory
forall a b. (a -> b) -> a -> b
$ ByteString -> FilePath
B.unpack ByteString
pathBS

  -- when the final reader exits, close all the streams associated to the path
  let closeGroup :: IO ()
closeGroup = do
        HashMap FranzDirectory (HashMap StreamName Stream)
streams <- STM (HashMap FranzDirectory (HashMap StreamName Stream))
-> IO (HashMap FranzDirectory (HashMap StreamName Stream))
forall a. STM a -> IO a
atomically (STM (HashMap FranzDirectory (HashMap StreamName Stream))
 -> IO (HashMap FranzDirectory (HashMap StreamName Stream)))
-> STM (HashMap FranzDirectory (HashMap StreamName Stream))
-> IO (HashMap FranzDirectory (HashMap StreamName Stream))
forall a b. (a -> b) -> a -> b
$ do
          HashMap FranzDirectory (HashMap StreamName Stream)
streams <- TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> STM (HashMap FranzDirectory (HashMap StreamName Stream))
forall a. TVar a -> STM a
readTVar (TVar (HashMap FranzDirectory (HashMap StreamName Stream))
 -> STM (HashMap FranzDirectory (HashMap StreamName Stream)))
-> TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> STM (HashMap FranzDirectory (HashMap StreamName Stream))
forall a b. (a -> b) -> a -> b
$ FranzReader
-> TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams FranzReader
franzReader
          TVar (HashMap FranzDirectory (HashMap StreamName Stream))
-> HashMap FranzDirectory (HashMap StreamName Stream) -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (FranzReader
-> TVar (HashMap FranzDirectory (HashMap StreamName Stream))
vStreams FranzReader
franzReader) (HashMap FranzDirectory (HashMap StreamName Stream) -> STM ())
-> HashMap FranzDirectory (HashMap StreamName Stream) -> STM ()
forall a b. (a -> b) -> a -> b
$ FranzDirectory
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> HashMap FranzDirectory (HashMap StreamName Stream)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete FranzDirectory
path HashMap FranzDirectory (HashMap StreamName Stream)
streams
          HashMap FranzDirectory (HashMap StreamName Stream)
-> STM (HashMap FranzDirectory (HashMap StreamName Stream))
forall (f :: * -> *) a. Applicative f => a -> f a
pure HashMap FranzDirectory (HashMap StreamName Stream)
streams
        Maybe (HashMap StreamName Stream)
-> (HashMap StreamName Stream -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (FranzDirectory
-> HashMap FranzDirectory (HashMap StreamName Stream)
-> Maybe (HashMap StreamName Stream)
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup FranzDirectory
path HashMap FranzDirectory (HashMap StreamName Stream)
streams) ((HashMap StreamName Stream -> IO ()) -> IO ())
-> (HashMap StreamName Stream -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ (Stream -> IO ()) -> HashMap StreamName Stream -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Stream -> IO ()
closeStream

  case Maybe FranzPrefix
archivePrefix of
    -- just start a session without thinking about archives
    Maybe FranzPrefix
Nothing -> FranzPrefix -> FranzDirectory -> IO ()
respondLoop FranzPrefix
livePrefix FranzDirectory
path
    -- Mount a squashfs image and increment the counter
    Just FranzPrefix
prefix | FilePath
src <- FranzPrefix -> FranzDirectory -> FilePath
getFranzDirectory FranzPrefix
prefix FranzDirectory
path -> do
      -- check if an archive exists
      Bool
exist <- FilePath -> IO Bool
doesFileExist FilePath
src
      if Bool
exist
        then ConcurrentResourceMap MountMap ProcessHandle
-> IO () -> FilePath -> FilePath -> IO () -> IO ()
forall a.
ConcurrentResourceMap MountMap ProcessHandle
-> IO () -> FilePath -> FilePath -> IO a -> IO a
withFuse ConcurrentResourceMap MountMap ProcessHandle
vMounts IO ()
closeGroup FilePath
src (FranzPrefix -> FranzDirectory -> FilePath
getFranzDirectory FranzPrefix
mountPrefix FranzDirectory
path) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ FranzPrefix -> FranzDirectory -> IO ()
respondLoop FranzPrefix
mountPrefix FranzDirectory
path
        else do
          [FilePath] -> IO ()
forall (m :: * -> *). MonadIO m => [FilePath] -> m ()
logServer [FilePath
"Archive", FilePath
src, FilePath
"doesn't exist; falling back to live streams"]
          FranzPrefix -> FranzDirectory -> IO ()
respondLoop FranzPrefix
livePrefix FranzDirectory
path

logServer :: MonadIO m => [String] -> m ()
logServer :: [FilePath] -> m ()
logServer = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> ([FilePath] -> IO ()) -> [FilePath] -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> FilePath -> IO ()
hPutStrLn Handle
stderr (FilePath -> IO ())
-> ([FilePath] -> FilePath) -> [FilePath] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [FilePath] -> FilePath
unwords ([FilePath] -> FilePath)
-> ([FilePath] -> [FilePath]) -> [FilePath] -> FilePath
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (:) FilePath
"[server]"

withFuse :: ConcurrentResourceMap MountMap ProcessHandle
  -> IO () -- run when the final user is about to exit
  -> FilePath
  -> FilePath
  -> IO a -> IO a
withFuse :: ConcurrentResourceMap MountMap ProcessHandle
-> IO () -> FilePath -> FilePath -> IO a -> IO a
withFuse ConcurrentResourceMap MountMap ProcessHandle
vMounts IO ()
release FilePath
src FilePath
dst IO a
body = ConcurrentResourceMap MountMap ProcessHandle
-> Key MountMap
-> IO ProcessHandle
-> (ProcessHandle -> IO ())
-> (ProcessHandle -> IO a)
-> IO a
forall (m :: * -> *) r a.
ResourceMap m =>
ConcurrentResourceMap m r
-> Key m -> IO r -> (r -> IO ()) -> (r -> IO a) -> IO a
withSharedResource ConcurrentResourceMap MountMap ProcessHandle
vMounts FilePath
Key MountMap
dst
  (([FilePath] -> IO ())
-> (forall x. FilePath -> IO x)
-> FilePath
-> FilePath
-> IO ProcessHandle
mountFuse [FilePath] -> IO ()
forall (m :: * -> *). MonadIO m => [FilePath] -> m ()
logServer (FranzException -> IO x
forall e a. Exception e => e -> IO a
throwIO (FranzException -> IO x)
-> (FilePath -> FranzException) -> FilePath -> IO x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilePath -> FranzException
InternalError) FilePath
src FilePath
dst)
  (\ProcessHandle
fuse -> do
    IO ()
release
    ([FilePath] -> IO ()) -> ProcessHandle -> FilePath -> IO ()
killFuse [FilePath] -> IO ()
forall (m :: * -> *). MonadIO m => [FilePath] -> m ()
logServer ProcessHandle
fuse FilePath
dst IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` do
      Maybe Pid
pid <- ProcessHandle -> IO (Maybe Pid)
getPid ProcessHandle
fuse
      Maybe Pid -> (Pid -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe Pid
pid ((Pid -> IO ()) -> IO ()) -> (Pid -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Pid
p -> [FilePath] -> IO ()
forall (m :: * -> *). MonadIO m => [FilePath] -> m ()
logServer [FilePath
"Undead squashfuse detected:", Pid -> FilePath
forall a. Show a => a -> FilePath
show Pid
p])
  (IO a -> ProcessHandle -> IO a
forall a b. a -> b -> a
const IO a
body)