module Database.MySQL.Base
(
MySQLConn
, ConnectInfo(..)
, defaultConnectInfo
, connect
, connectDetail
, close
, ping
, execute
, executeMany
, execute_
, query_
, queryVector_
, query
, queryVector
, prepareStmt
, prepareStmtDetail
, executeStmt
, queryStmt
, queryStmtVector
, closeStmt
, resetStmt
, withTransaction
, Query(..)
, renderParams
, command
, Stream.skipToEof
, NetworkException(..)
, UnconsumedResultSet(..)
, ERRException(..)
, UnexpectedPacket(..)
, DecodePacketException(..)
, WrongParamsCount(..)
, module Database.MySQL.Protocol.Auth
, module Database.MySQL.Protocol.Command
, module Database.MySQL.Protocol.ColumnDef
, module Database.MySQL.Protocol.Packet
, module Database.MySQL.Protocol.MySQLValue
) where
import Control.Applicative
import Control.Exception (mask, onException, throwIO)
import Control.Monad
import qualified Data.ByteString.Lazy as L
import Data.IORef (writeIORef)
import Database.MySQL.Connection
import Database.MySQL.Protocol.Auth
import Database.MySQL.Protocol.ColumnDef
import Database.MySQL.Protocol.Command
import Database.MySQL.Protocol.MySQLValue
import Database.MySQL.Protocol.Packet
import Database.MySQL.Query
import System.IO.Streams (InputStream, OutputStream)
import qualified System.IO.Streams as Stream
import qualified Data.Vector as V
execute :: MySQLConn -> Query -> [MySQLValue] -> IO OK
execute conn qry params = execute_ conn (renderParams qry params)
executeMany :: MySQLConn -> Query -> [[MySQLValue]]-> IO [OK]
executeMany conn@(MySQLConn is os _ _) qry paramsList = do
guardUnconsumed conn
let qry' = L.intercalate ";" $ map (fromQuery . renderParams qry) paramsList
writeCommand (COM_QUERY qry') os
mapM (\ _ -> waitCommandReply is) paramsList
execute_ :: MySQLConn -> Query -> IO OK
execute_ conn (Query qry) = command conn (COM_QUERY qry)
query :: MySQLConn -> Query -> [MySQLValue] -> IO ([ColumnDef], InputStream [MySQLValue])
query conn qry params = query_ conn (renderParams qry params)
queryVector :: MySQLConn -> Query -> [MySQLValue] -> IO (V.Vector ColumnDef, InputStream (V.Vector MySQLValue))
queryVector conn qry params = queryVector_ conn (renderParams qry params)
query_ :: MySQLConn -> Query -> IO ([ColumnDef], InputStream [MySQLValue])
query_ conn@(MySQLConn is os _ consumed) (Query qry) = do
guardUnconsumed conn
writeCommand (COM_QUERY qry) os
p <- readPacket is
if isERR p
then decodeFromPacket p >>= throwIO . ERRException
else do
len <- getFromPacket getLenEncInt p
fields <- replicateM len $ (decodeFromPacket <=< readPacket) is
_ <- readPacket is
writeIORef consumed False
rows <- Stream.makeInputStream $ do
q <- readPacket is
if | isEOF q -> writeIORef consumed True >> return Nothing
| isERR q -> decodeFromPacket q >>= throwIO . ERRException
| otherwise -> Just <$> getFromPacket (getTextRow fields) q
return (fields, rows)
queryVector_ :: MySQLConn -> Query -> IO (V.Vector ColumnDef, InputStream (V.Vector MySQLValue))
queryVector_ conn@(MySQLConn is os _ consumed) (Query qry) = do
guardUnconsumed conn
writeCommand (COM_QUERY qry) os
p <- readPacket is
if isERR p
then decodeFromPacket p >>= throwIO . ERRException
else do
len <- getFromPacket getLenEncInt p
fields <- V.replicateM len $ (decodeFromPacket <=< readPacket) is
_ <- readPacket is
writeIORef consumed False
rows <- Stream.makeInputStream $ do
q <- readPacket is
if | isEOF q -> writeIORef consumed True >> return Nothing
| isERR q -> decodeFromPacket q >>= throwIO . ERRException
| otherwise -> Just <$> getFromPacket (getTextRowVector fields) q
return (fields, rows)
prepareStmt :: MySQLConn -> Query -> IO StmtID
prepareStmt conn@(MySQLConn is os _ _) (Query stmt) = do
guardUnconsumed conn
writeCommand (COM_STMT_PREPARE stmt) os
p <- readPacket is
if isERR p
then decodeFromPacket p >>= throwIO . ERRException
else do
StmtPrepareOK stid colCnt paramCnt _ <- getFromPacket getStmtPrepareOK p
_ <- replicateM_ paramCnt (readPacket is)
_ <- unless (paramCnt == 0) (void (readPacket is))
_ <- replicateM_ colCnt (readPacket is)
_ <- unless (colCnt == 0) (void (readPacket is))
return stid
prepareStmtDetail :: MySQLConn -> Query -> IO (StmtPrepareOK, [ColumnDef], [ColumnDef])
prepareStmtDetail conn@(MySQLConn is os _ _) (Query stmt) = do
guardUnconsumed conn
writeCommand (COM_STMT_PREPARE stmt) os
p <- readPacket is
if isERR p
then decodeFromPacket p >>= throwIO . ERRException
else do
sOK@(StmtPrepareOK _ colCnt paramCnt _) <- getFromPacket getStmtPrepareOK p
pdefs <- replicateM paramCnt ((decodeFromPacket <=< readPacket) is)
_ <- unless (paramCnt == 0) (void (readPacket is))
cdefs <- replicateM colCnt ((decodeFromPacket <=< readPacket) is)
_ <- unless (colCnt == 0) (void (readPacket is))
return (sOK, pdefs, cdefs)
closeStmt :: MySQLConn -> StmtID -> IO ()
closeStmt (MySQLConn _ os _ _) stid = do
writeCommand (COM_STMT_CLOSE stid) os
resetStmt :: MySQLConn -> StmtID -> IO ()
resetStmt (MySQLConn is os _ consumed) stid = do
writeCommand (COM_STMT_RESET stid) os
p <- readPacket is
if isERR p
then decodeFromPacket p >>= throwIO . ERRException
else writeIORef consumed True
executeStmt :: MySQLConn -> StmtID -> [MySQLValue] -> IO OK
executeStmt conn stid params = command conn (COM_STMT_EXECUTE stid params)
queryStmt :: MySQLConn -> StmtID -> [MySQLValue] -> IO ([ColumnDef], InputStream [MySQLValue])
queryStmt conn@(MySQLConn is os _ consumed) stid params = do
guardUnconsumed conn
writeCommand (COM_STMT_EXECUTE stid params) os
p <- readPacket is
if isERR p
then decodeFromPacket p >>= throwIO . ERRException
else do
len <- getFromPacket getLenEncInt p
fields <- replicateM len $ (decodeFromPacket <=< readPacket) is
_ <- readPacket is
writeIORef consumed False
rows <- Stream.makeInputStream $ do
q <- readPacket is
if | isOK q -> Just <$> getFromPacket (getBinaryRow fields len) q
| isEOF q -> writeIORef consumed True >> return Nothing
| isERR q -> decodeFromPacket q >>= throwIO . ERRException
| otherwise -> throwIO (UnexpectedPacket q)
return (fields, rows)
queryStmtVector :: MySQLConn -> StmtID -> [MySQLValue] -> IO (V.Vector ColumnDef, InputStream (V.Vector MySQLValue))
queryStmtVector conn@(MySQLConn is os _ consumed) stid params = do
guardUnconsumed conn
writeCommand (COM_STMT_EXECUTE stid params) os
p <- readPacket is
if isERR p
then decodeFromPacket p >>= throwIO . ERRException
else do
len <- getFromPacket getLenEncInt p
fields <- V.replicateM len $ (decodeFromPacket <=< readPacket) is
_ <- readPacket is
writeIORef consumed False
rows <- Stream.makeInputStream $ do
q <- readPacket is
if | isOK q -> Just <$> getFromPacket (getBinaryRowVector fields len) q
| isEOF q -> writeIORef consumed True >> return Nothing
| isERR q -> decodeFromPacket q >>= throwIO . ERRException
| otherwise -> throwIO (UnexpectedPacket q)
return (fields, rows)
withTransaction :: MySQLConn -> IO a -> IO a
withTransaction conn procedure = mask $ \restore -> do
_ <- execute_ conn "BEGIN"
r <- restore procedure `onException` (execute_ conn "ROLLBACK")
_ <- execute_ conn "COMMIT"
pure r