{-# LANGUAGE GADTs                      #-}
{-# LANGUAGE MultiParamTypeClasses      #-}
{-# LANGUAGE OverloadedStrings          #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE TypeFamilies               #-}

-- | Redis-based co-ordinator for Funflow.
--
--   There are two co-ordinators defined in this module. They differ in whether
--   they open a new connection to Redis or re-use an existing one. Other than
--   that they behave identically.
module Control.Funflow.External.Coordinator.Redis
  ( Redis (..)
  , RedisPreconnected (..)
  ) where

import qualified Control.Funflow.ContentHashable      as CHash
import           Control.Funflow.External
import           Control.Funflow.External.Coordinator
import           Control.Lens
import           Control.Monad.Except
import           Control.Monad.Fix                    (fix)
import           Data.Store
import qualified Database.Redis                       as R
import           GHC.Conc
import           System.Clock                         (fromNanoSecs)

data Redis = Redis

instance Coordinator Redis where
  type Config Redis = R.ConnectInfo
  type Hook Redis = R.Connection

  -- | Create a redis connection
  initialise = liftIO . R.connect

  submitTask conn td =
    liftIO $
      R.runRedis conn $ do
        void $ R.rpush "jobs_queue" [encode (jid, td ^. tdTask)]
        void $ R.set jid (encode Pending)
    where
      jid = CHash.toBytes $ td ^. tdOutput

  queueSize conn = liftIO $ R.runRedis conn $
    fromIntegral . either (const 0) id <$> R.llen "jobs_queue"

  taskInfo conn chash = liftIO $
    R.runRedis conn $ do
      eoutput <- R.get $ CHash.toBytes chash
      case eoutput of
        Left r -> fail $ "Redis fail: " ++ show r
        Right Nothing -> return UnknownTask
        Right (Just bs) -> case decode bs of
          Left r   -> fail $ "Decode fail: " ++ show r
          Right ti -> return $ KnownTask ti

  awaitTask conn chash = liftIO . R.runRedis conn $
    fix $ \waitGet -> do
      ti <- taskInfo conn chash
      case ti of
        UnknownTask -> return UnknownTask
        info@(KnownTask (Completed _)) -> return info
        info@(KnownTask (Failed _ _)) -> return info
        _ -> do
          liftIO $ threadDelay 500000
          waitGet

  updateTaskStatus conn chash status = liftIO $
    R.runRedis conn
      $ void $ R.set (CHash.toBytes chash) (encode status)

  popTask conn executor = liftIO . R.runRedis conn $ do
    job <- R.brpoplpush "jobs_queue" "job_running" 1
    case job of
      Left r -> fail $ "redis fail " ++ show r
      Right Nothing -> return Nothing
      Right (Just bs) -> case decode bs of
        Left r                          -> fail $ "Decode fail: " ++ show r
        Right (chashbytes, task) ->
          case CHash.fromBytes chashbytes of
            Just chash -> do
              let status = Running $ ExecutionInfo executor (fromNanoSecs 0)
              _ <- R.set chashbytes (encode status)
              return . Just $ TaskDescription chash task
            Nothing    -> fail "Cannot decode content hash."

  dropTasks conn = liftIO . R.runRedis conn $ do
    job <- R.del ["jobs_queue"]
    case job of
      Left r -> fail $ "redis fail " ++ show r
      Right _ -> return ()


data RedisPreconnected = RedisPreconnected

newtype Preconnected = Preconnected R.Connection

-- | Allow a preestablished redis connection to be used.
instance Coordinator RedisPreconnected where
  type Config RedisPreconnected = R.Connection
  type Hook RedisPreconnected = Preconnected

  initialise = return . Preconnected

  submitTask (Preconnected conn) = submitTask conn
  queueSize (Preconnected conn) = queueSize conn
  taskInfo (Preconnected conn) = taskInfo conn
  awaitTask (Preconnected conn) = awaitTask conn
  updateTaskStatus (Preconnected conn) = updateTaskStatus conn
  popTask (Preconnected conn) = popTask conn
  dropTasks (Preconnected conn) = dropTasks conn