module Database.TransferDB.DumpDB where
import Prelude hiding (fail, log)
import System.IO (Handle, withBinaryFile, IOMode(ReadMode, WriteMode),
hSeek, SeekMode(AbsoluteSeek, SeekFromEnd), hFlush, hGetBuf, hPutBuf,
BufferMode(BlockBuffering), hSetBuffering, hSetBinaryMode)
import System.IO.Temp (withTempFile)
import System.Clock (Clock(Monotonic), TimeSpec(sec), getTime, diffTimeSpec, toNanoSecs)
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (TVar, newTVar, modifyTVar, readTVar, writeTVar,
TQueue, newTQueue, readTQueue, writeTQueue,
TMVar, newTMVar, newEmptyTMVar, takeTMVar, putTMVar,
STM, atomically, check, orElse, retry)
import Control.Monad(foldM, replicateM_, join)
import Control.Monad.Trans.Reader (ReaderT, runReaderT, asks, ask, withReaderT)
import Control.Monad.Trans.Maybe (MaybeT, runMaybeT)
import Control.Monad.Trans.StringError (runStringErrorT)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Fail (MonadFail, fail)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Logging (log, debugS, withStderrLogging)
import Data.Time.Clock (getCurrentTime)
import Data.String (fromString)
import Data.Text (Text)
import Data.List (intercalate)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as C
import qualified Data.Map.Strict as Map
import Foreign.Marshal.Alloc (alloca, allocaBytes)
import Foreign.Ptr (Ptr, castPtr)
import Foreign.Storable (Storable(peek, poke))
import Database.TransferDB.DumpDB.Format
import Database.TransferDB.Commons (HasDBInfo(..), DBInfo(DBInfo), withConnection', connect', forAllTables, faillog, finally', finally)
import SQL.CLI (SQLHENV, SQLHDBC, SQLHSTMT, SQLINTEGER, SQLSMALLINT, SQLPOINTER, SQLHANDLE, SQLLEN, sql_handle_env, sql_handle_dbc, sql_handle_stmt, sql_no_nulls, sql_null_data)
import SQL.CLI.Utils (ColumnInfo(..), SQLConfig(..),
collectColumnsInfo, collectColumnsInfo', allocHandle, columns,
freeHandle, execDirect, forAllRecordsWithEndAndFail, forAllData, getData, disconnect)
import SQL.CLI.ODBC (odbcImplementation, setupEnv)
import SQL.ODBC (sql_char,
sql_varchar,
sql_longvarchar,
sql_wchar,
sql_wvarchar,
sql_wlongvarchar,
sql_decimal,
sql_numeric,
sql_bit,
sql_tinyint,
sql_smallint,
sql_integer,
sql_bigint,
sql_real,
sql_float,
sql_double,
sql_binary,
sql_varbinary,
sql_longvarbinary,
sql_type_date,
sql_type_time,
sql_type_timestamp,
sql_interval_year,
sql_interval_month,
sql_interval_day,
sql_interval_hour,
sql_interval_minute,
sql_interval_second,
sql_interval_year_to_month,
sql_interval_day_to_hour,
sql_interval_day_to_minute,
sql_interval_day_to_second,
sql_interval_hour_to_minute,
sql_interval_hour_to_second,
sql_interval_minute_to_second,
sql_guid,
sql_c_char,
sql_c_wchar,
sql_c_bit,
sql_c_tinyint,
sql_c_short,
sql_c_long,
sql_c_sbigint,
sql_c_float,
sql_c_double,
sql_c_binary,
sql_c_type_date,
sql_c_type_time,
sql_c_type_timestamp,
sql_c_interval_year,
sql_c_interval_month,
sql_c_interval_day,
sql_c_interval_hour,
sql_c_interval_minute,
sql_c_interval_second,
sql_c_interval_year_to_month,
sql_c_interval_day_to_hour,
sql_c_interval_day_to_minute,
sql_c_interval_day_to_second,
sql_c_interval_hour_to_minute,
sql_c_interval_hour_to_second,
sql_c_interval_minute_to_second,
sql_c_guid,
odbcCTypeLen)
logsrc :: Text
logsrc = fromString "Database.TransferDB.DumpDB"
maxChunkSize :: (Num a) => a
maxChunkSize = 2 * 1024 * 1024
bufSize :: Int
bufSize = fromIntegral maxChunkSize
fileBuffering :: BufferMode
fileBuffering = BlockBuffering (Just $ 10 * 1024 * 1024)
type StatisticsMap = Map.Map C.ByteString (Int, Int)
data DumpConfig = DumpConfig {
dump_DSN :: String,
dump_UserName :: String,
dump_Password :: String,
dump_Schema :: String,
dump_Description :: String,
dump_FilePath :: FilePath,
dump_ParallelThreads :: Int,
dump_StatisticsVar :: TVar StatisticsMap,
dump_StartTime :: TimeSpec
}
instance HasDBInfo DumpConfig where
extractDBInfo cfg = DBInfo (dump_DSN cfg) (dump_UserName cfg) (dump_Password cfg) (dump_Schema cfg)
data RestoreConfig = RestoreConfig {
restore_DSN :: String,
restore_UserName :: String,
restore_Password :: String,
restore_Schema :: String,
restore_FilePath :: FilePath
}
instance HasDBInfo RestoreConfig where
extractDBInfo cfg = DBInfo (restore_DSN cfg) (restore_UserName cfg) (restore_Password cfg) (restore_Schema cfg)
dump :: (MonadIO m, MonadFail m, MonadBaseControl IO m) => ReaderT DumpConfig m ()
dump = withStderrLogging $ do
filename <- asks dump_FilePath
config <- ask
liftIO $ withBinaryFile filename WriteMode (\handle -> do
result <- runMaybeT $ runReaderT (hDump handle) config
maybe (fail "Database dump failed") return result)
hDump :: (MonadIO m, MonadFail m) => Handle -> ReaderT DumpConfig (MaybeT m) ()
hDump handle = do
liftIO $ B.hPut handle $ writeVersion V1
header <- makeHeader
liftIO $ B.hPut handle $ writeHeader header
dumpTables handle
makeHeader :: (MonadIO m) => ReaderT DumpConfig m HeaderV1
makeHeader = do
timestamp <- liftIO getCurrentTime
description <- asks dump_Description
return $ HeaderV1 maxChunkSize timestamp (C.pack description)
data ThreadedDump = ThreadedDump {
threads_TablesChan :: TQueue String,
threads_AllTablesPublishedVar :: TVar Bool,
threads_WorkerThreadsVar :: TVar Int,
threads_HandleVar :: TMVar Handle,
threads_Config :: DumpConfig,
threads_HEnv :: SQLHENV,
threads_AllocHandleChan :: TQueue (SQLSMALLINT, SQLHANDLE, TMVar SQLHANDLE)
}
instance HasDBInfo ThreadedDump where
extractDBInfo = extractDBInfo . threads_Config
dumpTables :: (MonadIO m, MonadFail m) => Handle -> ReaderT DumpConfig (MaybeT m) ()
dumpTables handle = do
threads <- asks dump_ParallelThreads
case threads of
0 -> do liftIO $ log $ fromString "Dumping tables without using threads"
_ <- withConnection' (\ _ hdbc -> withReaderT SingleThreaded $ forAllTables hdbc (0, 0) (dumpTable handle hdbc))
return ()
_ -> do liftIO $ log $ fromString $ "Dumping tables using " ++ (show threads) ++ " threads"
tablesChan <- liftIO $ atomically newTQueue
allTablesPublished <- liftIO $ atomically $ newTVar False
workerThreads <- liftIO $ atomically $ newTVar 0
dumpFileHandle <- liftIO $ atomically $ newTMVar handle
henv <- setupEnv
allocHandleChan <- liftIO $ atomically $ newTQueue
finally (liftIO $ log (fromString $ "free environment handle " ++ (show henv)) >> freeHandle sql_handle_env henv) $ do
withReaderT (\ cfg -> ThreadedDump tablesChan allTablesPublished workerThreads dumpFileHandle cfg henv allocHandleChan) $ do
startWorkerThreads
publishTables
waitForWorkToEnd
publishTables :: (MonadIO m, MonadFail m) => ReaderT ThreadedDump m ()
publishTables = do
env <- ask
henv <- asks threads_HEnv
publishEndVar <- asks threads_AllTablesPublishedVar
hdbc <- connect' henv
let freehdbc = liftIO $ freeHandle sql_handle_dbc hdbc
liftIO $ do
result <- runMaybeT $ runReaderT (finally freehdbc $ forAllTables hdbc 0 publishTable) env
maybe (log $ fromString $ "publishTables failed") (\ n -> log $ fromString $ "all tables have been published: " ++ (show n)) result
atomically $ writeTVar publishEndVar True
return ()
publishTable :: (MonadIO m, MonadFail m) => Int -> String -> ReaderT ThreadedDump m Int
publishTable crt tableName = do
liftIO $ log $ fromString $ "publish table: " ++ tableName
tablesChan <- asks threads_TablesChan
liftIO $ atomically $ writeTQueue tablesChan tableName
return (crt + 1)
startWorkerThreads :: (MonadIO m, MonadFail m) => ReaderT ThreadedDump m ()
startWorkerThreads = do
count <- asks (dump_ParallelThreads . threads_Config)
replicateM_ count startWorkerThread
startWorkerThread :: (MonadIO m, MonadFail m) => ReaderT ThreadedDump m ()
startWorkerThread = do
env <- ask
henv <- asks threads_HEnv
hdbc <- connect' henv
let freehdbc = disconnect hdbc >> (liftIO $ freeHandle sql_handle_dbc hdbc)
threadsCountVar <- asks threads_WorkerThreadsVar
t <- liftIO $ atomically $ do crt <- readTVar threadsCountVar
writeTVar threadsCountVar (crt + 1)
return (crt + 1)
liftIO $ log $ fromString $ "Starting thread " ++ (show t)
_ <- liftIO $ forkIO $ withTempFile "." "transfer-db.dmp"
(\ path handle -> do
log $ fromString $ "started worker thread: " ++ path
hSetBuffering handle fileBuffering
hSetBinaryMode handle True
result <- runMaybeT $ runReaderT (finally freehdbc $ dumpThread handle henv hdbc) env
log $ fromString $ "worker thread ended " ++ (maybe "with failure: " (\ _ -> "with success: ") result) ++ path
atomically $ modifyTVar threadsCountVar (subtract 1)
)
return ()
dumpThread :: (MonadIO m, MonadFail m) => Handle -> SQLHENV -> SQLHDBC -> ReaderT ThreadedDump m ()
dumpThread htmpfile _ hdbc = do
tablesChan <- asks threads_TablesChan
allTablesPublishedVar <- asks threads_AllTablesPublishedVar
handleVar <- asks threads_HandleVar
env <- ask
liftIO $ log $ fromString "entered dumpThread"
let dumpNextTables :: STM (IO ())
dumpNextTables = orElse dumpNextTables' waitTableOrEnd
dumpNextTables' :: STM (IO ())
dumpNextTables' = do
tableName <- readTQueue tablesChan
return $ do
log $ fromString $ "start dumping table: " ++ tableName
result <- runStringErrorT $ runReaderT (dumpTable htmpfile hdbc (0, 0) tableName) (MultiThreaded env)
either (\s -> log $ fromString $ "dumping table " ++ tableName ++ " failed: " ++ s) (\ _ -> return ()) result
join $ atomically $ dumpNextTables
waitTableOrEnd :: STM (IO ())
waitTableOrEnd = do
allTablesPublished <- readTVar allTablesPublishedVar
if allTablesPublished then return $ (log $ fromString $ "allTablesPublished = " ++ (show allTablesPublished)) >> finalizeDump else retry
finalizeDump :: IO ()
finalizeDump = do
log $ fromString $ "finalizing dump thread"
hdumpfile <- atomically $ takeTMVar handleVar
copyTmpToDumpFile htmpfile hdumpfile
atomically $ putTMVar handleVar hdumpfile
liftIO $ join $ atomically dumpNextTables
copyTmpToDumpFile :: Handle -> Handle -> IO ()
copyTmpToDumpFile htmp hdmp = do
hFlush htmp
hSeek htmp AbsoluteSeek 0
hSeek hdmp SeekFromEnd 0
allocaBytes bufSize
(\ buf -> let copyFile = do
sz <- hGetBuf htmp buf bufSize
if sz > 0
then do hPutBuf hdmp buf sz
if sz >= bufSize then copyFile else return ()
else return ()
in copyFile )
waitForWorkToEnd :: (MonadIO m) => ReaderT ThreadedDump m ()
waitForWorkToEnd = do
threadsCountVar <- asks threads_WorkerThreadsVar
allocHandleChan <- asks threads_AllocHandleChan
let waitIO = join $ atomically $ orElse (readTVar threadsCountVar >>= check . (<= 0) >> (return $ return ())) (allocHandleT allocHandleChan >>= \ io -> return (io >> waitIO))
liftIO $ waitIO
liftIO $ log $ fromString $ "all worker threads have finished"
allocHandleT :: (MonadIO m, MonadFail m) => TQueue (SQLSMALLINT, SQLHANDLE, TMVar SQLHANDLE) -> STM (m ())
allocHandleT chan = do
(hType, hParent, retVar) <- readTQueue chan
return $ allocHandle hType hParent >>= liftIO . atomically . (putTMVar retVar)
allocHandleReq :: (MonadIO m, MonadFail m) => SQLSMALLINT -> SQLHANDLE -> ReaderT ThreadedDump m SQLHANDLE
allocHandleReq htype hparent = do
allocHandleChan <- asks threads_AllocHandleChan
resultVar <- liftIO $ atomically $ newEmptyTMVar
liftIO $ atomically $ writeTQueue allocHandleChan (htype, hparent, resultVar)
liftIO $ atomically $ takeTMVar resultVar
data SingleOrMulti = SingleThreaded DumpConfig | MultiThreaded ThreadedDump
dumpConfig :: SingleOrMulti -> DumpConfig
dumpConfig (SingleThreaded x) = x
dumpConfig (MultiThreaded x) = threads_Config x
instance HasDBInfo SingleOrMulti where
extractDBInfo = extractDBInfo . dumpConfig
allocHandleSM :: (MonadIO m, MonadFail m) => SQLSMALLINT -> SQLHANDLE -> ReaderT SingleOrMulti m SQLHANDLE
allocHandleSM htype hparent = do
env <- ask
case env of
SingleThreaded _ -> allocHandle htype hparent
MultiThreaded threadeEnv -> withReaderT (const threadeEnv) $ allocHandleReq htype hparent
collectColumnsInfoSM :: (MonadIO m, MonadFail m) => SQLHDBC
-> String
-> String
-> ReaderT SingleOrMulti m [ColumnInfo]
collectColumnsInfoSM hdbc schemaName tableName = do
env <- ask
case env of
SingleThreaded _ -> withReaderT (const odbcImplementation) $ collectColumnsInfo hdbc schemaName tableName
MultiThreaded x -> withReaderT (const x) $ do
hstmt <- allocHandleReq sql_handle_stmt hdbc
columns hstmt Nothing (Just schemaName) (Just tableName) Nothing
withReaderT (const odbcImplementation) $ collectColumnsInfo' hstmt
dumpTable :: (MonadIO m, MonadFail m) => Handle -> SQLHDBC -> (Int, Int) -> String -> ReaderT SingleOrMulti m (Int, Int)
dumpTable handle hdbc _ tableName = do
schema <- extractSchema hdbc tableName
liftIO $ debugS logsrc $ fromString $ "Schema " ++ (C.unpack $ schema_DBSchemaName schema) ++ "." ++ (C.unpack $ schema_TableName schema)
liftIO $ sequence_ $ map debugFieldInfo $ schema_Fields schema
liftIO $ B.hPut handle $ writeSchema schema
(!recs, !bytes) <- dumpTableData handle hdbc schema
liftIO $ B.hPut handle writeEOT
return (recs, bytes)
debugFieldInfo :: FieldInfoV1 -> IO ()
debugFieldInfo f = do
debugS logsrc $ fromString $ "\tfi_ColumnName: " ++ (C.unpack $ fi_ColumnName f)
debugS logsrc $ fromString $ "\tfi_DataType: " ++ (show $ fi_DataType f)
debugS logsrc $ fromString $ "\tfi_ColumnSize: " ++ (maybe "(null)" show $ fi_ColumnSize f)
debugS logsrc $ fromString $ "\tfi_BufferLength: " ++ (maybe "(null)" show $ fi_BufferLength f)
debugS logsrc $ fromString $ "\tfi_DecimalDigits: " ++ (maybe "(null)" show $ fi_DecimalDigits f)
debugS logsrc $ fromString $ "\tfi_NumPrecRadix: " ++ (maybe "(null)" show $ fi_NumPrecRadix f)
debugS logsrc $ fromString $ "\tfi_Nullabe: " ++ (show $ fi_Nullable f)
extractSchema :: (MonadIO m, MonadFail m) => SQLHDBC -> String -> ReaderT SingleOrMulti m SchemaV1
extractSchema hdbc tableName = do
schemaName <- asks (dump_Schema.dumpConfig)
fields <- extractSchemaFields hdbc tableName
return $ SchemaV1 (C.pack schemaName) (C.pack tableName) fields
extractSchemaFields :: (MonadIO m, MonadFail m) => SQLHDBC -> String -> ReaderT SingleOrMulti m [FieldInfoV1]
extractSchemaFields hdbc tableName = do
schemaName <- asks (dump_Schema.dumpConfig)
cols <- collectColumnsInfoSM hdbc schemaName tableName
return $ map makeFieldInfo cols
makeFieldInfo :: ColumnInfo -> FieldInfoV1
makeFieldInfo ci = FieldInfoV1 { fi_ColumnName = C.pack $ ci_ColumnName ci,
fi_DataType = ci_DataType ci,
fi_ColumnSize = ci_ColumnSize ci,
fi_BufferLength = ci_BufferLength ci,
fi_DecimalDigits = ci_DecimalDigits ci,
fi_NumPrecRadix = ci_NumPrecRadix ci,
fi_Nullable = ci_Nullable ci,
fi_OrdinalPosition = ci_OrdinalPosition ci}
dumpTableData :: (MonadIO m, MonadFail m) => Handle -> SQLHDBC -> SchemaV1 -> ReaderT SingleOrMulti m (Int, Int)
dumpTableData handle hdbc schema = do
let tableName' = schema_QualifiedTableName schema
tableName = C.unpack tableName'
liftIO $ log $ fromString $ "dumping table: " ++ tableName
select <- makeSelectSql schema
hstmt <- allocHandleSM sql_handle_stmt hdbc
env <- ask
finally' ((log $ fromString $ "freeHandle for " ++ tableName) >> freeHandle sql_handle_stmt hstmt) $ do
execDirect hstmt select (faillog $ "param data requested for select '" ++ select ++ "'")
result <- liftIO $ allocaBytes (fromIntegral maxChunkSize)
(\ p_transferBuf -> alloca
(\ p_transferLenOrInd -> do
let dumpAction = dumpRecord handle hstmt schema p_transferBuf maxChunkSize p_transferLenOrInd
endAction hstmt x = logstats tableName' x >> return x
failAction hstmt x s = logstats tableName' x >> fail s
runStringErrorT $ runReaderT (forAllRecordsWithEndAndFail hstmt dumpAction (endAction hstmt) (failAction hstmt) (0,0)) env))
(cnt, size) <- either (\ s -> faillog $ "transfer table " ++ (C.unpack $ schema_TableName schema) ++ " failed: " ++ s) return result
liftIO $ log $ fromString $ "Finished dupming table " ++ (C.unpack $ schema_DBSchemaName schema)
++ "." ++ (C.unpack $ schema_TableName schema) ++ "; dumped " ++ (show cnt)
++ " records of " ++ (show size) ++ " bytes"
size `seq` cnt `seq` return (cnt, size)
dumpRecord :: (MonadIO m, MonadFail m) => Handle -> SQLHSTMT -> SchemaV1 -> SQLPOINTER -> SQLLEN -> Ptr SQLLEN -> (Int, Int) -> ReaderT SingleOrMulti m (Int, Int)
dumpRecord handle hstmt schema p_buf bufLen p_lenOrInd (cnt, sz) = do
liftIO $ B.hPut handle writeRI
let fields = zip (map fromIntegral [1 .. length fields']) fields'
fields' = schema_Fields schema
sz' <- foldM (dumpField handle hstmt p_buf bufLen p_lenOrInd) sz fields
let cnt' = cnt + 1
if cnt' `mod` 100000 == 0
then do let tableName = schema_QualifiedTableName schema
logStatistics tableName cnt' sz'
else return ()
cnt' `seq` sz' `seq` return (cnt', sz')
logStatistics :: (MonadIO m, MonadFail m) => C.ByteString -> Int -> Int -> ReaderT SingleOrMulti m ()
logStatistics logkey cnt sz = do
sizesVar <- asks readSizesVar
(totalCnt, totalSz) <- liftIO $ atomically $ do
sizesMap <- readTVar sizesVar
let updatedSizesMap = Map.insert logkey (cnt, sz) sizesMap
writeTVar sizesVar updatedSizesMap
return $ Map.foldr' (\ (c1, s1) (c2, s2) -> (c1 + c2, s1 + s2)) (0, 0) updatedSizesMap
startTime <- asks readStartTime
crtTime <- liftIO $ getTime Monotonic
let duration = diffTimeSpec crtTime startTime
szRate = (fromIntegral totalSz) * 1000000000 `div` (toNanoSecs duration)
liftIO $ log $ fromString $ ">>>>>> Running for " ++ (show $ sec duration) ++ " seconds"
liftIO $ log $ fromString $ ">>>>>> (" ++ (C.unpack logkey) ++ ") " ++ (show totalCnt) ++ " records / " ++ (show $ totalSz) ++ " bytes" ++ ", " ++ (show szRate) ++ " bytes/sec"
logstats :: (MonadIO m, MonadFail m) => C.ByteString -> (Int, Int) -> ReaderT SingleOrMulti m ()
logstats hstmt = uncurry $ logStatistics hstmt
readSizesVar :: SingleOrMulti -> TVar StatisticsMap
readSizesVar (SingleThreaded x) = dump_StatisticsVar x
readSizesVar (MultiThreaded x) = (dump_StatisticsVar.threads_Config) x
readStartTime :: SingleOrMulti -> TimeSpec
readStartTime (SingleThreaded x) = dump_StartTime x
readStartTime (MultiThreaded x) = (dump_StartTime.threads_Config) x
dumpField :: (MonadIO m, MonadFail m) => Handle -> SQLHSTMT -> SQLPOINTER -> SQLLEN -> Ptr SQLLEN -> Int -> (SQLSMALLINT, FieldInfoV1) -> m Int
dumpField handle hstmt p_buf buflen p_lenOrInd sz (crt, fld) =
runReaderT dumpField' $ DumpFieldSpec handle hstmt p_buf buflen p_lenOrInd sz crt fld
data DumpFieldSpec = DumpFieldSpec {
dmpfld_Handle :: Handle,
dmpfld_HStmt :: SQLHSTMT,
dmpfld_Buf :: SQLPOINTER,
dmpfld_BufLen :: SQLLEN,
dmpfld_LenOrInd :: Ptr SQLLEN,
dmpfld_Size :: Int,
dmpfld_Crt :: SQLSMALLINT,
dmpfld_Field :: FieldInfoV1
}
dumpField' :: (MonadIO m, MonadFail m) => ReaderT DumpFieldSpec m Int
dumpField' = do
dataType <- asks $ fi_DataType . dmpfld_Field
case dataType of
_ | dataType == sql_char -> dumpVarLenField sql_c_char
| dataType == sql_varchar -> dumpVarLenField sql_c_char
| dataType == sql_longvarchar -> dumpVarLenField sql_c_char
| dataType == sql_wchar -> dumpVarLenField sql_c_wchar
| dataType == sql_wvarchar -> dumpVarLenField sql_c_wchar
| dataType == sql_wlongvarchar -> dumpVarLenField sql_c_wchar
| dataType == sql_decimal -> dumpVarLenField sql_c_char
| dataType == sql_numeric -> dumpVarLenField sql_c_char
| dataType == sql_bit -> dumpFixedField sql_c_bit
| dataType == sql_tinyint -> dumpFixedField sql_c_tinyint
| dataType == sql_smallint -> dumpFixedField sql_c_short
| dataType == sql_integer -> dumpFixedField sql_c_long
| dataType == sql_bigint -> dumpFixedField sql_c_sbigint
| dataType == sql_real -> dumpFixedField sql_c_float
| dataType == sql_float -> dumpFixedField sql_c_double
| dataType == sql_double -> dumpFixedField sql_c_double
| dataType == sql_binary -> dumpVarLenField sql_c_binary
| dataType == sql_varbinary -> dumpVarLenField sql_c_binary
| dataType == sql_longvarbinary -> dumpVarLenField sql_c_binary
| dataType == sql_type_date -> dumpFixedField sql_c_type_date
| dataType == sql_type_time -> dumpFixedField sql_c_type_time
| dataType == sql_type_timestamp -> dumpFixedField sql_c_type_timestamp
| dataType == sql_interval_year -> dumpFixedField sql_c_interval_year
| dataType == sql_interval_month -> dumpFixedField sql_c_interval_month
| dataType == sql_interval_day -> dumpFixedField sql_c_interval_day
| dataType == sql_interval_hour -> dumpFixedField sql_c_interval_hour
| dataType == sql_interval_minute -> dumpFixedField sql_c_interval_minute
| dataType == sql_interval_second -> dumpFixedField sql_c_interval_second
| dataType == sql_interval_year_to_month -> dumpFixedField sql_c_interval_year_to_month
| dataType == sql_interval_day_to_hour -> dumpFixedField sql_c_interval_day_to_hour
| dataType == sql_interval_day_to_minute -> dumpFixedField sql_c_interval_day_to_minute
| dataType == sql_interval_day_to_second -> dumpFixedField sql_c_interval_day_to_second
| dataType == sql_interval_hour_to_minute -> dumpFixedField sql_c_interval_hour_to_minute
| dataType == sql_interval_hour_to_second -> dumpFixedField sql_c_interval_hour_to_second
| dataType == sql_interval_minute_to_second -> dumpFixedField sql_c_interval_minute_to_second
| dataType == sql_guid -> dumpFixedField sql_c_guid
| otherwise -> dumpUnknownFieldType
dumpVarLenField :: (MonadIO m, MonadFail m) => SQLSMALLINT -> ReaderT DumpFieldSpec m Int
dumpVarLenField bufferType = do
bufferSize <- asks dmpfld_BufLen
hstmt <- asks dmpfld_HStmt
colnum <- asks dmpfld_Crt
p_buffer <- asks dmpfld_Buf
p_LenOrInd <- asks dmpfld_LenOrInd
size <- asks dmpfld_Size
lenlen <- octetLengthOfChunkSize
(_, size') <- forAllData hstmt colnum bufferType p_buffer bufferSize p_LenOrInd (dumpChunk lenlen) (0, size)
size' `seq` return size'
octetLengthOfChunkSize :: (Num a, Monad m) => ReaderT DumpFieldSpec m a
octetLengthOfChunkSize = do
bufferSize <- asks dmpfld_BufLen
fieldSize <- asks $ fi_BufferLength . dmpfld_Field
let
chunkSize = (fromIntegral bufferSize) 1
lenlen = maybe lenlen' (\sz -> if sz < chunkSize
then if sz <= 256
then 1
else if sz <= 65536
then 2
else 4
else lenlen') fieldSize
lenlen' = if chunkSize <= 256
then 1
else if chunkSize <= 65536
then 2
else 4
return lenlen
dumpChunk :: (MonadIO m, MonadFail m) => Int -> (Int, Int) -> ReaderT DumpFieldSpec m (Int, Int)
dumpChunk lenlen (chunkNo, size) = do
nullable <- asks $ fi_Nullable.dmpfld_Field
size' <- if chunkNo > 0 || nullable == sql_no_nulls
then dumpChunk' lenlen size
else do p_lenOrInd <- asks dmpfld_LenOrInd
lenOrInd <- liftIO $ peek p_lenOrInd
if lenOrInd == sql_null_data
then dumpNullIndicator Null size
else do size'' <- dumpNullIndicator NotNull size
dumpChunk' lenlen size''
size' `seq` return (chunkNo+1, size')
dumpNullIndicator :: (MonadIO m) => NullIndicator -> Int -> ReaderT DumpFieldSpec m Int
dumpNullIndicator indicator size = do
let bs = writeNullIndicator indicator
handle <- asks dmpfld_Handle
liftIO $ B.hPut handle bs
let size' = size + (B.length bs)
size' `seq` return size'
dumpChunk' :: (MonadIO m, MonadFail m) => Int -> Int -> ReaderT DumpFieldSpec m Int
dumpChunk' lenlen size = do
columnName <- asks $ fi_ColumnName.dmpfld_Field
p_buf <- asks dmpfld_Buf
p_lenOrInd <- asks dmpfld_LenOrInd
handle <- asks dmpfld_Handle
lenOrInd <- liftIO $ peek p_lenOrInd
buffersize <- asks dmpfld_BufLen
bs <- if lenOrInd > 1
then let datalen = fromIntegral lenlen
buffersize' = (fromIntegral buffersize) 1
chunklen = if datalen > buffersize' then buffersize' else datalen
in return $ writeChunk (fromIntegral lenlen) chunklen (castPtr p_buf)
else fail $ "dumpChunk' received unexpected null field (" ++ (show lenOrInd) ++ "); column " ++ (C.unpack columnName)
liftIO $ B.hPut handle bs
let size' = size + (B.length bs)
size' `seq` return size'
dumpFixedField :: (MonadIO m, MonadFail m) => SQLSMALLINT -> ReaderT DumpFieldSpec m Int
dumpFixedField bufferType = do
bufferSize <- asks dmpfld_BufLen
hstmt <- asks dmpfld_HStmt
colnum <- asks dmpfld_Crt
p_buffer <- asks dmpfld_Buf
p_LenOrInd <- asks dmpfld_LenOrInd
size <- asks dmpfld_Size
liftIO $ poke p_LenOrInd 0
getData hstmt colnum bufferType p_buffer bufferSize p_LenOrInd
lenOrInd <- liftIO $ peek p_LenOrInd
nullable <- asks $ fi_Nullable.dmpfld_Field
columnName <- asks $ fi_ColumnName.dmpfld_Field
if nullable == sql_no_nulls
then if lenOrInd == sql_null_data
then fail $ "null value read from not nullable field " ++ (C.unpack columnName)
else dumpFixedField' bufferType size lenOrInd
else if lenOrInd == sql_null_data
then dumpNullIndicator Null size
else do size' <- dumpNullIndicator NotNull size
dumpFixedField' bufferType size' lenOrInd
dumpFixedField' :: (MonadIO m, MonadFail m) => SQLSMALLINT -> Int -> SQLLEN -> ReaderT DumpFieldSpec m Int
dumpFixedField' bufferType size lenOrInd = do
let wellKnownSize = odbcCTypeLen bufferType
columnName <- asks $ fi_ColumnName.dmpfld_Field
handle <- asks $ dmpfld_Handle
p_buffer <- asks $ dmpfld_Buf
bs <- if maybe False (lenOrInd /=) wellKnownSize
then fail $ "the actual length of the field (" ++ (show lenOrInd) ++ ") is different from the schema size of the field (" ++ (show wellKnownSize) ++ "): " ++ (C.unpack columnName)
else return $ writePlainBuf (castPtr p_buffer) (fromIntegral lenOrInd)
liftIO $ B.hPut handle bs
let size' = size + (B.length bs)
size' `seq` return size'
dumpUnknownFieldType :: (MonadIO m, MonadFail m) => ReaderT DumpFieldSpec m Int
dumpUnknownFieldType = fail "dumpUnknownFieldType not implemented"
makeSelectSql :: (MonadIO m, MonadFail m) => SchemaV1 -> m String
makeSelectSql schema =
let tableName = C.unpack $ schema_TableName schema
schemaName = C.unpack $ schema_DBSchemaName schema
qualifiedTableName = case schemaName of
[] -> tableName
s -> s ++ "." ++ tableName
fieldsList = intercalate ", " $ map (C.unpack.fi_ColumnName) fields
fields = schema_Fields schema
in
return $ "select " ++ fieldsList ++ " from " ++ qualifiedTableName
restore :: (MonadIO m, MonadFail m, MonadBaseControl IO m) => ReaderT RestoreConfig m ()
restore = withStderrLogging $ return ()