{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Control.Funflow.External.Coordinator.Redis
( Redis (..)
, RedisPreconnected (..)
) where
import qualified Control.Funflow.ContentHashable as CHash
import Control.Funflow.External
import Control.Funflow.External.Coordinator
import Control.Lens
import Control.Monad.Except
import Control.Monad.Fix (fix)
import Data.Store
import qualified Database.Redis as R
import GHC.Conc
import System.Clock (fromNanoSecs)
data Redis = Redis
instance Coordinator Redis where
type Config Redis = R.ConnectInfo
type Hook Redis = R.Connection
initialise = liftIO . R.connect
submitTask conn td =
liftIO $
R.runRedis conn $ do
void $ R.rpush "jobs_queue" [encode (jid, td ^. tdTask)]
void $ R.set jid (encode Pending)
where
jid = CHash.toBytes $ td ^. tdOutput
queueSize conn = liftIO $ R.runRedis conn $
fromIntegral . either (const 0) id <$> R.llen "jobs_queue"
taskInfo conn chash = liftIO $
R.runRedis conn $ do
eoutput <- R.get $ CHash.toBytes chash
case eoutput of
Left r -> fail $ "Redis fail: " ++ show r
Right Nothing -> return UnknownTask
Right (Just bs) -> case decode bs of
Left r -> fail $ "Decode fail: " ++ show r
Right ti -> return $ KnownTask ti
awaitTask conn chash = liftIO . R.runRedis conn $
fix $ \waitGet -> do
ti <- taskInfo conn chash
case ti of
UnknownTask -> return UnknownTask
info@(KnownTask (Completed _)) -> return info
info@(KnownTask (Failed _ _)) -> return info
_ -> do
liftIO $ threadDelay 500000
waitGet
updateTaskStatus conn chash status = liftIO $
R.runRedis conn
$ void $ R.set (CHash.toBytes chash) (encode status)
popTask conn executor = liftIO . R.runRedis conn $ do
job <- R.brpoplpush "jobs_queue" "job_running" 1
case job of
Left r -> fail $ "redis fail " ++ show r
Right Nothing -> return Nothing
Right (Just bs) -> case decode bs of
Left r -> fail $ "Decode fail: " ++ show r
Right (chashbytes, task) ->
case CHash.fromBytes chashbytes of
Just chash -> do
let status = Running $ ExecutionInfo executor (fromNanoSecs 0)
_ <- R.set chashbytes (encode status)
return . Just $ TaskDescription chash task
Nothing -> fail "Cannot decode content hash."
dropTasks conn = liftIO . R.runRedis conn $ do
job <- R.del ["jobs_queue"]
case job of
Left r -> fail $ "redis fail " ++ show r
Right _ -> return ()
data RedisPreconnected = RedisPreconnected
newtype Preconnected = Preconnected R.Connection
instance Coordinator RedisPreconnected where
type Config RedisPreconnected = R.Connection
type Hook RedisPreconnected = Preconnected
initialise = return . Preconnected
submitTask (Preconnected conn) = submitTask conn
queueSize (Preconnected conn) = queueSize conn
taskInfo (Preconnected conn) = taskInfo conn
awaitTask (Preconnected conn) = awaitTask conn
updateTaskStatus (Preconnected conn) = updateTaskStatus conn
popTask (Preconnected conn) = popTask conn
dropTasks (Preconnected conn) = dropTasks conn