module Network.JobQueue.Backend.Zookeeper (
openZookeeperBackend
, newZookeeperBackend
) where
import qualified Database.Zookeeper as Z
import Control.Concurrent
import Control.Concurrent.STM
import Network.JobQueue.Backend.Types
import Network.JobQueue.Backend.Zookeeper.ZookeeperQueue
openZookeeperBackend :: String -> IO Backend
openZookeeperBackend endpoint = do
Z.setDebugLevel Z.ZLogError
zvar <- newTVarIO Nothing
stateVar <- newTVarIO Z.ConnectingState
_ <- forkIO $ Z.withZookeeper endpoint 100000 (Just $ watcher stateVar) Nothing $ \z -> do
atomically $ do
state <- readTVar stateVar
case state of
Z.ConnectingState -> retry
_ -> return ()
atomically $ writeTVar zvar (Just z)
atomically $ do
mz <- readTVar zvar
case mz of
Just _ -> retry
Nothing -> return ()
return $ Backend {
bOpenQueue = openQueue zvar
, bClose = atomically $ writeTVar zvar Nothing
}
where
openQueue :: TVar (Maybe Z.Zookeeper) -> String -> IO (ZookeeperQueue)
openQueue zvar queueName = do
z <- atomically $ readTVar zvar >>= maybe retry return
zq <- initZQueue z (basePath queueName) Z.OpenAclUnsafe
return zq
watcher :: TVar Z.State -> Z.Watcher
watcher stateVar _z event state _mZnode = do
case event of
Z.SessionEvent -> atomically $ writeTVar stateVar state
_ -> return ()
newZookeeperBackend :: Z.Zookeeper -> Backend
newZookeeperBackend zh = Backend {
bOpenQueue = \queueName -> initZQueue zh (basePath queueName) Z.OpenAclUnsafe
, bClose = return ()
}
basePath :: String -> String
basePath queueName = case queueName of
'/':_ -> queueName
_ -> '/':queueName