module Spark.Core.Internal.ContextInteractive(
SparkInteractiveException,
createSparkSessionDef,
exec1Def,
closeSparkSessionDef
) where
import qualified Data.Vector as V
import Control.Exception
import Control.Monad.Catch(throwM)
import Data.IORef
import Data.Typeable
import Control.Monad.State(runStateT)
import Data.Text
import System.IO.Unsafe(unsafePerformIO)
import Control.Monad.Logger(runStdoutLoggingT)
import Spark.Core.Internal.ContextStructures
import Spark.Core.Internal.DatasetStructures
import Spark.Core.Internal.ContextIOInternal
import Spark.Core.Internal.RowGenericsFrom(FromSQL)
import Spark.Core.Internal.Utilities
import Spark.Core.StructuresInternal
import Spark.Core.Try
_globalSessionRef :: IORef (Maybe SparkSession)
_globalSessionRef = unsafePerformIO (newIORef Nothing)
data SparkInteractiveException = SparkInteractiveException {
_sieInner :: NodeError
} deriving Typeable
instance Show SparkInteractiveException where
show (SparkInteractiveException inner) =
show inner
instance Exception SparkInteractiveException
createSparkSessionDef :: (HasCallStack) => SparkSessionConf -> IO ()
createSparkSessionDef conf = do
current <- _currentSession
case current of
Nothing ->
return ()
Just _ ->
_throw "A default context already exist. If you wish to modify the exsting context, you must use modifySparkConfDef"
new <- createSparkSession' conf
_setSession new
return ()
exec1Def :: (FromSQL a, HasCallStack) => LocalData a -> IO a
exec1Def ld = do
mCtx <- _currentSession
case mCtx of
Nothing ->
_throw "No default context found. You must first create a default spark context with createSparkSessionDef"
Just ctx -> do
(res, newSt) <- (runStateT . runStdoutLoggingT . executeCommand1) ld ctx
_setSession newSt
case res of
Right x ->
return x
Left err ->
throwM (SparkInteractiveException err)
closeSparkSessionDef :: (HasCallStack) => IO ()
closeSparkSessionDef = do
_ <- _removeSession
return ()
_currentSession :: (HasCallStack) => IO (Maybe SparkSession)
_currentSession = readIORef _globalSessionRef
_setSession :: (HasCallStack) => SparkSession -> IO ()
_setSession st = writeIORef _globalSessionRef (Just st)
_removeSession :: (HasCallStack) => IO (Maybe SparkSession)
_removeSession = do
current <- _currentSession
_ <- writeIORef _globalSessionRef Nothing
return current
_throw :: (HasCallStack) => Text -> IO a
_throw txt = throwM $
SparkInteractiveException Error {
ePath = NodePath V.empty,
eMessage = txt
}