module Database.Neo4j.Transactional.Cypher (
Result(..), Stats(..), ParamValue(..), Params, newparam, emptyStats, TransError, Transaction,
loneQuery, runTransaction, cypher, rollback, commit, keepalive, commitWith, rollbackAndLeave,
isSuccess, fromResult, fromSuccess
) where
import Control.Monad.Reader
import Control.Monad.State.Strict
import Control.Monad.Trans.Except
import Control.Applicative ((<$>), (<*>))
import Data.Aeson ((.=), (.:))
import Data.List (find)
import Data.Maybe (fromMaybe)
import Text.Read (readMaybe)
import qualified Control.Exception as Exc
import qualified Control.Monad.Trans.Resource as R
import qualified Data.Aeson as J
import qualified Data.Acquire as A
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
import qualified Data.Text as T
import qualified Data.Vector as V
import qualified Network.HTTP.Types as HT
import Database.Neo4j.Types
import Database.Neo4j.Http
import Database.Neo4j.Cypher (ParamValue(..), Params, newparam)
import qualified Database.Neo4j.Graph as G
data Stats = Stats {containsUpdates :: Bool,
nodesCreated :: Integer,
nodesDeleted :: Integer,
propsSet :: Integer,
relsCreated :: Integer,
relsDeleted :: Integer,
lblsAdded :: Integer,
lblsRemoved :: Integer,
idxAdded :: Integer,
idxRemoved :: Integer,
constAdded :: Integer,
constRemoved :: Integer} deriving (Eq, Show)
data Result = Result {cols :: [T.Text], vals :: [[J.Value]], graph :: [G.Graph], stats :: Stats} deriving (Show, Eq)
emptyStats :: Stats
emptyStats = Stats False 0 0 0 0 0 0 0 0 0 0 0
emptyResponse :: Result
emptyResponse = Result [] [] [] emptyStats
newtype CypherNode = CypherNode {runCypherNode :: (Node, [Label])} deriving (Eq, Show)
newtype CypherRel = CypherRel {runCypherRel :: Relationship} deriving (Eq, Show)
readDefault :: Read a => a -> String -> a
readDefault d = fromMaybe d . readMaybe
readIntDefault :: Integer -> String -> Integer
readIntDefault = readDefault
instance J.FromJSON Stats where
parseJSON (J.Object o) = Stats <$> o .: "contains_updates" <*>
o .: "nodes_created" <*>
o .: "nodes_deleted" <*>
o .: "properties_set" <*>
o .: "relationships_created" <*>
o .: "relationship_deleted" <*>
o .: "labels_added" <*>
o .: "labels_removed" <*>
o .: "indexes_added" <*>
o .: "indexes_removed" <*>
o .: "constraints_added" <*>
o .: "constraints_removed"
parseJSON _ = mzero
instance J.FromJSON CypherNode where
parseJSON (J.Object o) = CypherNode <$> ((,) <$> parseNode <*> (o .: "labels" >>= J.parseJSON))
where parseNode = do
idStr <- o .: "id"
props <- o .: "properties" >>= J.parseJSON
return $ Node (getNodePath $ readIntDefault 0 idStr) props
parseJSON _ = mzero
instance J.FromJSON CypherRel where
parseJSON (J.Object o) = CypherRel <$> (Relationship <$> (relId <$> o .: "id") <*> o .: "type" <*>
(o .: "properties" >>= J.parseJSON) <*> (nodeId <$> o .: "startNode") <*> (nodeId <$> o .: "endNode") )
where relId = getRelPath . readIntDefault 0
nodeId = getNodePath . readIntDefault 0
parseJSON _ = mzero
buildGraph :: [CypherNode] -> [CypherRel] -> G.Graph
buildGraph ns = foldl addRel (foldl addNode G.empty ns)
where addNode g cn = let (n, lbls) = runCypherNode cn in G.setNodeLabels n lbls (n `G.addNode` g)
addRel g cr = let r = runCypherRel cr in r `G.addRelationship` g
newtype DataElem = DataElem {runDataElem :: ([J.Value], G.Graph)} deriving (Eq, Show)
instance J.FromJSON DataElem where
parseJSON (J.Object o) = DataElem <$> ((,) <$> (o .: "row" >>= J.parseJSON) <*> (o .: "graph" >>= parseGraph))
where parseGraph (J.Object g) = buildGraph <$> (g .: "nodes" >>= J.parseJSON) <*>
(g .: "relationships" >>= J.parseJSON)
parseGraph _ = mzero
parseJSON _ = mzero
instance J.FromJSON Result where
parseJSON (J.Object o) = Result <$> (o .: "columns" >>= J.parseJSON) <*> (fst <$> pData) <*> (snd <$> pData)
<*> (o .: "stats" >>= J.parseJSON)
where parseData v = parseDataElem <$> J.parseJSON v
parseDataElem ds = unzip $ map runDataElem ds
pData = o .: "data" >>= parseData
parseJSON _ = mzero
transAPI :: S.ByteString
transAPI = "/db/data/transaction"
type TransError = (T.Text, T.Text)
type TransactionId = S.ByteString
data TransState = TransInit | TransStarted R.ReleaseKey TransactionId | TransDone
type Transaction a = ExceptT TransError (ReaderT Connection (StateT TransState (R.ResourceT IO))) a
newtype Response = Response {runResponse :: Either TransError Result} deriving (Show, Eq)
instance J.FromJSON Response where
parseJSON (J.Object o) = do
errs <- o .: "errors" >>= parseErrs
case errs of
Just err -> return $ Response (Left err)
Nothing -> Response . Right <$> (o .: "results" >>= parseResult)
where parseErrs (J.Array es) = if V.null es then return Nothing else Just <$> parseErr (V.head es)
parseErrs _ = mzero
parseErr (J.Object e) = (,) <$> e .: "code" <*> e .: "message"
parseErr _ = mzero
parseResult (J.Array rs) = if V.null rs then return emptyResponse else J.parseJSON $ V.head rs
parseResult _ = mzero
parseJSON _ = mzero
loneQuery :: T.Text -> Params -> Neo4j (Either TransError Result)
loneQuery = transactionReq (transAPI <> "/commit")
queryBody :: T.Text -> Params -> L.ByteString
queryBody cmd params = J.encode $ J.object ["statements" .= [
J.object ["statement" .= cmd, resultSpec, includeStats,
"parameters" .= J.toJSON params]]]
where resultSpec = "resultDataContents" .= ["row", "graph" :: T.Text]
includeStats = "includeStats" .= True
transactionReq :: S.ByteString -> T.Text -> Params -> Neo4j (Either TransError Result)
transactionReq path cmd params = Neo4j $ \conn -> runResponse <$> httpCreate conn path (queryBody cmd params)
runTransaction :: Transaction a -> Neo4j (Either TransError a)
runTransaction t = Neo4j $ \conn ->
R.runResourceT (fst <$> runStateT (runReaderT (runExceptT $ catchErrors t) conn) TransInit)
catchErrors :: Transaction a -> Transaction a
catchErrors t = catchE t handle
where handle err@("Rollback", _) = ExceptT $ return (Left err)
handle err = do
st <- lift get
unless (isDone st) rollback
ExceptT $ return (Left err)
isDone TransDone = True
isDone _ = False
cypher :: T.Text -> Params -> Transaction Result
cypher cmd params = do
conn <- ask
st <- lift get
res <- case st of
TransInit -> do
(key, (resp, headers)) <- lift $ lift $ lift $ reqNewTrans conn
lift $ put (TransStarted key $ transIdFromHeaders headers)
return resp
TransStarted _ transId -> liftIO $ reqTransCreated conn transId
TransDone -> liftIO $ Exc.throw TransactionEndedExc
ExceptT $ return $ runResponse res
where reqNewTrans conn = acquireTrans conn $ httpCreateWithHeaders conn transAPI (queryBody cmd params)
reqTransCreated conn path = httpCreate conn path (queryBody cmd params)
rollback :: Transaction ()
rollback = do
conn <- ask
st <- lift get
case st of
TransInit -> return ()
TransStarted key transId -> do
liftIO $ rollbackReq conn transId
void $ R.unprotect key
lift $ put TransDone
TransDone -> liftIO $ Exc.throw TransactionEndedExc
rollbackAndLeave :: T.Text -> Transaction ()
rollbackAndLeave msg = do
conn <- ask
st <- lift get
case st of
TransInit -> throwE ("Rollback", msg)
TransStarted key transId -> do
liftIO $ rollbackReq conn transId
void $ R.unprotect key
lift $ put TransDone
throwE ("Rollback", msg)
TransDone -> liftIO $ Exc.throw TransactionEndedExc
commit :: Transaction ()
commit = do
conn <- ask
st <- lift get
case st of
TransInit -> return ()
TransStarted key transId -> do
liftIO $ commitReq conn transId
void $ R.unprotect key
lift $ put TransDone
TransDone -> liftIO $ Exc.throw TransactionEndedExc
commitWith :: T.Text -> Params -> Transaction Result
commitWith cmd params = do
conn <- ask
st <- lift get
res <- case st of
TransInit -> liftIO $ req conn (transAPI <> "/commit")
TransStarted key transId -> do
void $ R.unprotect key
liftIO $ req conn (transId <> "/commit")
TransDone -> liftIO $ Exc.throw TransactionEndedExc
lift $ put TransDone
ExceptT $ return $ runResponse res
where req conn path = httpCreate conn path (queryBody cmd params)
keepalive :: Transaction ()
keepalive = do
conn <- ask
st <- lift get
case st of
TransInit -> return ()
TransStarted _ transId -> liftIO $ keepaliveReq conn transId
TransDone -> liftIO $ Exc.throw TransactionEndedExc
transIdFromHeaders :: HT.ResponseHeaders -> TransactionId
transIdFromHeaders headers = commitId $ fromMaybe (transAPI <> "-1") (snd <$> find ((==HT.hLocation) . fst) headers)
where commitId loc = snd $ S.breakSubstring transAPI loc
acquireTrans :: Connection -> IO (a, HT.ResponseHeaders) -> R.ResourceT IO (R.ReleaseKey, (a, HT.ResponseHeaders))
acquireTrans conn req = A.allocateAcquire (A.mkAcquireType req freeRes)
where freeRes (_, headers) A.ReleaseNormal = let transId = transIdFromHeaders headers in commitReq conn transId
freeRes (_, headers) _ = let transId = transIdFromHeaders headers in rollbackReq conn transId
commitReq :: Connection -> TransactionId -> IO ()
commitReq conn trId = void $ httpReq conn HT.methodPost (trId <> "/commit") "" (const True)
rollbackReq :: Connection -> TransactionId -> IO ()
rollbackReq conn trId = void $ httpReq conn HT.methodDelete trId "" (const True)
keepaliveReq :: Connection -> TransactionId -> IO ()
keepaliveReq conn trId = void $ httpReq conn HT.methodPost trId keepaliveBody (const True)
where keepaliveBody = J.encode $ J.object ["statements" .= ([] :: [Integer])]
fromResult :: Result -> Either TransError Result -> Result
fromResult def (Left _) = def
fromResult _ (Right resp) = resp
fromSuccess :: Either TransError Result -> Result
fromSuccess (Left _) = error "Cypher.fromSuccess but is Error"
fromSuccess (Right resp) = resp
isSuccess :: Either TransError Result -> Bool
isSuccess (Left _) = False
isSuccess (Right _) = True