{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE OverloadedStrings #-} module Database.RethinkDB ( Handle , defaultPort, newHandle, handleDatabase, close, serverInfo -- * High-level query API , run, nextChunk, collect -- * Low-level query API , start, continue, stop, wait, nextResult , Token, Error(..), Response(..), ChangeNotification(..) -- * The Datum type , Datum(..), Array, Object, ToDatum(..), FromDatum(..) , (.=), (.:), (.:?), object -- The Exp type , Exp(..), SomeExp(..) , Bound(..), Order(..) , Sequence(..) , Table, Database, SingleSelection , Res, Result, FromResponse , ConflictResolutionStrategy(..) , emptyOptions , lift , call1, call2 , IsDatum, IsObject, IsSequence ) where import Control.Monad import Control.Concurrent import Control.Concurrent.STM import Data.Monoid ((<>)) import Data.Text (Text) import qualified Data.Text as T import Data.Vector (Vector) import qualified Data.Vector as V import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import qualified Data.Aeson.Types as A import Data.Sequence (Seq, ViewR(..)) import qualified Data.Sequence as S import Data.IORef import Network.Socket (Socket) import Database.RethinkDB.Types import Database.RethinkDB.Types.Datum import Database.RethinkDB.Messages ------------------------------------------------------------------------------ -- Handle data Handle = Handle { hSocket :: !(MVar Socket) -- ^ Any thread can write to the socket. In theory. But I don't think -- Haskell allows atomic writes to a socket, so it is protected inside -- an 'MVar'. -- -- When too many threads write to the socket, this may cause resource -- contention. Users are encouraged to use a resource pool to alleviate -- that. , hTokenRef :: !(IORef Token) -- ^ This is used to allocate new tokens. We use 'atomicModifyIORef' to -- efficiently allocate new tokens. -- -- RethinkDB seems to expect the token to never be zero. So we need to -- start with one and then count up. , hError :: !(TVar (Maybe Error)) -- ^ If there was a fatal error while reading from the socket, it will -- be stored here. If this is set then no further replies will be -- processed. The user needs to close and re-open the handle to recover. , hResponses :: !(TVar (Map Token (Seq (Either Error Response)))) -- ^ Responses to queries. A thread reads the responses from the socket -- and pushes them into the queues. , hReader :: !ThreadId -- ^ Thread which reads from the socket and copies responses into the -- queues in 'hResponses'. , hDatabase :: !(Exp Database) -- ^ The database which should be used when the 'Table' expression -- doesn't specify one. } -- | The default port where RethinkDB accepts client driver connections. defaultPort :: Int defaultPort = 28015 -- | Create a new handle to the RethinkDB server. newHandle :: Text -> Int -> Maybe Text -> Exp Database -> IO Handle newHandle host port mbAuth db = do sock <- createSocket host port -- Do the handshake dance. Note that we currently ignore the reply and -- assume it is "SUCCESS". sendMessage sock (handshakeMessage mbAuth) _reply <- recvMessage sock handshakeReplyParser err <- newTVarIO Nothing responses <- newTVarIO M.empty readerThreadId <- forkIO $ forever $ do res <- recvMessage sock responseMessageParser case res of Left e -> atomically $ do mbError <- readTVar err case mbError of Nothing -> writeTVar err (Just e) Just _ -> pure () Right (Left (token, msg)) -> atomically $ modifyTVar' responses $ M.insertWith mappend token (S.singleton $ Left $ ProtocolError $ T.pack msg) Right (Right r) -> atomically $ modifyTVar' responses $ M.insertWith mappend (responseToken r) (S.singleton $ Right r) return () Handle <$> newMVar sock <*> newIORef 1 <*> pure err <*> pure responses <*> pure readerThreadId <*> pure db -- | The 'Database' which some expressions will use when not explicitly given -- one (eg. 'Table'). handleDatabase :: Handle -> Exp Database handleDatabase = hDatabase -- | Close the given handle. You MUST NOT use the handle after this. close :: Handle -> IO () close handle = do withMVar (hSocket handle) closeSocket killThread (hReader handle) serverInfo :: Handle -> IO (Either Error ServerInfo) serverInfo handle = do token <- atomicModifyIORef' (hTokenRef handle) (\x -> (x + 1, x)) withMVar (hSocket handle) $ \socket -> sendMessage socket (queryMessage token $ singleElementArray 5) nextResult handle token -------------------------------------------------------------------------------- -- * High-level query API -- -- These are synchronous functions, they make it really easy to run a query and -- immediately get its results. -- -- If the result is a sequence, you can either manually iterate through the -- chunks ('nextChunk') or fetch the whole sequence at once ('collect'). -- | Start a new query and wait for its (first) result. If the result is an -- single value ('Datum'), then there will be no further results. If it is -- a sequence, then you must consume results until the sequence ends. run :: (FromResponse (Result a)) => Handle -> Exp a -> IO (Res a) run handle expr = do token <- start handle expr nextResult handle token -- | Get the next chunk of a sequence. It is an error to request the next chunk -- if the sequence is already 'Done', nextChunk :: (FromResponse (Sequence a)) => Handle -> Sequence a -> IO (Either Error (Sequence a)) nextChunk _ (Done _) = return $ Left $ ProtocolError "" nextChunk handle (Partial token _) = do continue handle token nextResult handle token -- | Collect all the values in a sequence and make them available as -- a 'Vector a'. collect :: (FromDatum a) => Handle -> Sequence a -> IO (Either Error (Vector a)) collect _ (Done x) = return $ Right x collect handle s@(Partial _ x) = do chunk <- nextChunk handle s case chunk of Left e -> return $ Left e Right r -> do vals <- collect handle r case vals of Left ve -> return $ Left ve Right v -> return $ Right $ x <> v -------------------------------------------------------------------------------- -- * Low-level query API -- -- These functions map almost verbatim to the wire protocol messages. They are -- asynchronous, you can send multiple queries and get the corresponding -- responses sometime later. -- | Start a new query. Returns the 'Token' which can be used to track its -- progress. start :: Handle -> Exp a -> IO Token start handle term = do token <- atomicModifyIORef' (hTokenRef handle) (\x -> (x + 1, x)) withMVar (hSocket handle) $ \socket -> sendMessage socket (queryMessage token msg) return token where msg = compileTerm (hDatabase handle) $ do term' <- toTerm term options' <- toTerm emptyOptions return $ A.Array $ V.fromList [ A.Number 1 , term' , A.toJSON $ options' ] singleElementArray :: Int -> A.Value singleElementArray x = A.Array $ V.singleton $ A.Number $ fromIntegral x -- | Let the server know that it can send the next response corresponding to -- the given token. continue :: Handle -> Token -> IO () continue handle token = withMVar (hSocket handle) $ \socket -> sendMessage socket (queryMessage token $ singleElementArray 2) -- | Stop (abort?) a query. stop :: Handle -> Token -> IO () stop handle token = withMVar (hSocket handle) $ \socket -> sendMessage socket (queryMessage token $ singleElementArray 3) -- | Wait until a previous query (which was started with the 'noreply' option) -- finishes. wait :: Handle -> Token -> IO () wait handle token = withMVar (hSocket handle) $ \socket -> sendMessage socket (queryMessage token $ singleElementArray 4) -- | This function blocks until there is a response ready for the query with -- the given token. It may block indefinitely if the token refers to a query -- which has already finished or does not exist yet! responseForToken :: Handle -> Token -> IO (Either Error Response) responseForToken h token = atomically $ do m <- readTVar (hResponses h) case M.lookup token m of Nothing -> retry Just s -> case S.viewr s of EmptyR -> retry rest :> a -> do modifyTVar' (hResponses h) $ if S.null rest then M.delete token else M.insert token rest pure a nextResult :: (FromResponse a) => Handle -> Token -> IO (Either Error a) nextResult h token = do mbError <- atomically $ readTVar (hError h) case mbError of Just err -> return $ Left err Nothing -> do errorOrResponse <- responseForToken h token case errorOrResponse of Left err -> return $ Left err Right response -> case responseType response of ClientErrorType -> mkError response ClientError CompileErrorType -> mkError response CompileError RuntimeErrorType -> mkError response RuntimeError _ -> return $ parseMessage parseResponse response Right parseMessage :: (a -> A.Parser b) -> a -> (b -> Either Error c) -> Either Error c parseMessage parser value f = case A.parseEither parser value of Left e -> Left $ ProtocolError $ T.pack e Right v -> f v mkError :: Response -> (Text -> Error) -> IO (Either Error a) mkError r e = return $ case V.toList (responseResult r) of [a] -> parseMessage A.parseJSON a (Left . e) _ -> Left $ ProtocolError $ "mkError: Could not parse error" <> T.pack (show (responseResult r))