-- | This module is used to store all the
-- utility functions used for testing.
module Network.Flink.Test (testStatefulFunc) where

import Control.Monad.Except (runExceptT)
import Control.Monad.Reader (ReaderT (runReaderT))
import Control.Monad.State (StateT (..))
import Data.Either.Combinators (fromRight')
import Data.Functor (($>))
import Network.Flink.Internal.Stateful
  ( Env (Env),
    Function (runFunction),
    FunctionState,
    newState,
  )

-- | Function for testing stateful functions locally.
--
-- Runs function given some initial state and
-- input message. The final state of the function
-- is returned along with all messages, egress, etc.
-- that were queued up to be sent to Flink during execution.
--
-- Function contains __unsafe__ code, don't use for anything but test cases.
testStatefulFunc ::
  -- | Initial state for test run
  s ->
  -- | Function to run
  (a -> Function s ()) ->
  -- | Input message
  a ->
  IO (FunctionState s)
testStatefulFunc :: s -> (a -> Function s ()) -> a -> IO (FunctionState s)
testStatefulFunc initialCtx :: s
initialCtx func :: a -> Function s ()
func input :: a
input = FunctionState s -> IO (FunctionState s)
runner (s -> FunctionState s
forall a. a -> FunctionState a
newState s
initialCtx)
  where
    env :: Env
env = Text -> Text -> Text -> Env
Env "test_namespace" "test_function" "placeholder_id"
    runner :: FunctionState s -> IO (FunctionState s)
runner state :: FunctionState s
state = do
      (res :: Either FlinkError ()
res, state' :: FunctionState s
state') <- ReaderT Env IO (Either FlinkError (), FunctionState s)
-> Env -> IO (Either FlinkError (), FunctionState s)
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError ())
-> FunctionState s
-> ReaderT Env IO (Either FlinkError (), FunctionState s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT (ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) ()
-> StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) ()
 -> StateT
      (FunctionState s) (ReaderT Env IO) (Either FlinkError ()))
-> ExceptT
     FlinkError (StateT (FunctionState s) (ReaderT Env IO)) ()
-> StateT (FunctionState s) (ReaderT Env IO) (Either FlinkError ())
forall a b. (a -> b) -> a -> b
$ Function s ()
-> ExceptT
     FlinkError (StateT (FunctionState s) (ReaderT Env IO)) ()
forall s a.
Function s a
-> ExceptT FlinkError (StateT (FunctionState s) (ReaderT Env IO)) a
runFunction (a -> Function s ()
func a
input)) FunctionState s
state) Env
env
      FunctionState s -> IO (FunctionState s)
forall (m :: * -> *) a. Monad m => a -> m a
return (FunctionState s -> IO (FunctionState s))
-> FunctionState s -> IO (FunctionState s)
forall a b. (a -> b) -> a -> b
$ Either FlinkError (FunctionState s) -> FunctionState s
forall a b. Either a b -> b
fromRight' (Either FlinkError ()
res Either FlinkError ()
-> FunctionState s -> Either FlinkError (FunctionState s)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> FunctionState s
state')