{-# LANGUAGE DuplicateRecordFields, TypeFamilies #-}
module Network.Greskell.WebSocket.Client.Impl
where
import Control.Applicative ((<$>), (<*>))
import Control.Concurrent.STM
( STM, atomically,
TVar, newTVarIO, readTVar, writeTVar
)
import Control.Exception.Safe
( throw, Typeable, Exception, SomeException, catch
)
import Data.Aeson (Object)
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Types as Aeson (Parser)
import Data.Greskell.Greskell (ToGreskell(GreskellReturn), toGremlin)
import Data.Greskell.GraphSON (GraphSON, gsonValue, GValue, FromGraphSON(..), parseEither)
import Data.Greskell.AsIterator (AsIterator(IteratorItem))
import Data.Monoid (mempty)
import Data.Vector (Vector, (!))
import Data.Text (Text)
import Data.Traversable (traverse)
import Data.Vector (Vector)
import Network.Greskell.WebSocket.Client.Options (Options)
import qualified Network.Greskell.WebSocket.Client.Options as Opt
import Network.Greskell.WebSocket.Connection
( Host, Port, Connection, ResponseHandle
)
import qualified Network.Greskell.WebSocket.Connection as Conn
import qualified Network.Greskell.WebSocket.Request.Standard as ReqStd
import Network.Greskell.WebSocket.Response (ResponseCode, ResponseMessage)
import qualified Network.Greskell.WebSocket.Response as Res
import Network.Greskell.WebSocket.Util (slurp, drain)
data Client =
Client
{ clientOpts :: Options,
clientConn :: Connection GValue
}
connect :: Host -> Port -> IO Client
connect = connectWith Opt.defOptions
connectWith :: Options -> Host -> Port -> IO Client
connectWith opts host port = do
conn <- Conn.connect (Opt.connectionSettings opts) host port
return $ Client { clientOpts = opts,
clientConn = conn
}
close :: Client -> IO ()
close c = Conn.close $ clientConn c
data HandleState =
HandleOpen
| HandleClose
| HandleError SomeException
deriving (Show)
data ResultHandle v =
ResultHandle
{ rhResHandle :: ResponseHandle GValue,
rhParseGValue :: GValue -> Either String (Vector v),
rhResultCache :: TVar (Vector v),
rhNextResultIndex :: TVar Int,
rhState :: TVar HandleState
}
submitBase :: FromGraphSON r => Client -> Text -> Maybe Object -> IO (ResultHandle r)
submitBase client script bindings = do
rh <- Conn.sendRequest conn op
(cache, index, state) <- (,,) <$> newTVarIO mempty <*> newTVarIO 0 <*> newTVarIO HandleOpen
return $ ResultHandle { rhResHandle = rh,
rhParseGValue = parseEither,
rhResultCache = cache,
rhNextResultIndex = index,
rhState = state
}
where
conn = clientConn client
opts = clientOpts client
op = ReqStd.OpEval { ReqStd.batchSize = Opt.batchSize opts,
ReqStd.gremlin = script,
ReqStd.bindings = bindings,
ReqStd.language = Opt.language opts,
ReqStd.aliases = Opt.aliases opts,
ReqStd.scriptEvaluationTimeout = Opt.scriptEvaluationTimeout opts
}
submit :: (ToGreskell g, r ~ GreskellReturn g, AsIterator r, v ~ IteratorItem r, FromGraphSON v)
=> Client
-> g
-> Maybe Object
-> IO (ResultHandle v)
submit client greskell bindings = submitBase client (toGremlin greskell) bindings
submitPair :: (ToGreskell g, r ~ GreskellReturn g, AsIterator r, v ~ IteratorItem r, FromGraphSON v)
=> Client
-> (g, Object)
-> IO (ResultHandle v)
submitPair c (g, b) = submit c g (Just b)
submitRaw :: Client
-> Text
-> Maybe Object
-> IO (ResultHandle GValue)
submitRaw = submitBase
data SubmitException =
ResponseError (ResponseMessage GValue)
| ParseError (ResponseMessage GValue) String
deriving (Show,Typeable)
instance Exception SubmitException
nextResult :: ResultHandle v -> IO (Maybe v)
nextResult = atomically . nextResultSTM
nextResultSTM :: ResultHandle v -> STM (Maybe v)
nextResultSTM rh = do
cur_state <- readTVar $ rhState rh
case cur_state of
HandleError err -> throw err
HandleClose -> return Nothing
HandleOpen -> doNext `withExceptionSTM` gotoError
where
doNext = do
mret <- getNext
case mret of
Nothing -> writeTVar (rhState rh) HandleClose
_ -> return ()
return mret
getNext = do
mnext <- getNextCachedResult rh
case mnext of
Just v -> return $ Just v
Nothing -> loadResponse rh
withExceptionSTM main finish =
main `catch` (\ex -> finish ex >> throw ex)
gotoError ex = writeTVar (rhState rh) $ HandleError ex
getNextCachedResult :: ResultHandle v -> STM (Maybe v)
getNextCachedResult rh = do
(cache, index) <- (,) <$> (readTVar $ rhResultCache rh) <*> (readTVar $ rhNextResultIndex rh)
if index < length cache
then fromCache cache index
else return Nothing
where
fromCache cache index = do
writeTVar (rhNextResultIndex rh) $ index + 1
return $ Just (cache ! index)
loadResponse :: ResultHandle v -> STM (Maybe v)
loadResponse rh = parseResponse =<< (Conn.nextResponseSTM $ rhResHandle rh)
where
parseResponse Nothing = return Nothing
parseResponse (Just res) =
case Res.code $ Res.status res of
Res.Success -> parseData res
Res.NoContent -> return Nothing
Res.PartialContent -> parseData res
_ -> throw $ ResponseError res
parseData res =
case rhParseGValue rh $ Res.resultData $ Res.result res of
Left err -> throw $ ParseError res err
Right parsed -> do
writeTVar (rhResultCache rh) parsed
if length parsed == 0
then do
writeTVar (rhNextResultIndex rh) 0
return Nothing
else do
writeTVar (rhNextResultIndex rh) 1
return $ Just (parsed ! 0)
slurpResults :: ResultHandle v -> IO (Vector v)
slurpResults h = slurp $ nextResult h
drainResults :: ResultHandle v -> IO ()
drainResults h = drain $ nextResult h