{-- | Module : Database.TransferDB.DumpDB Description : Database agnostic dump Copyright : (c) Mihai Giurgeanu, 2017 License : GPL-3 Maintainer : mihai.giurgeanu@gmail.com Stability : experimental Portability : Portable --} {-# LANGUAGE FlexibleContexts, BangPatterns #-} module Database.TransferDB.DumpDB where import Prelude hiding (fail, log) import System.IO (Handle, withBinaryFile, IOMode(ReadMode, WriteMode), hSeek, SeekMode(AbsoluteSeek, SeekFromEnd), hFlush, hGetBuf, hPutBuf, BufferMode(BlockBuffering), hSetBuffering, hSetBinaryMode) import System.IO.Temp (withTempFile) import System.Clock (Clock(Monotonic), TimeSpec(sec), getTime, diffTimeSpec, toNanoSecs) import Control.Concurrent (forkIO) import Control.Concurrent.STM (TVar, newTVar, modifyTVar, readTVar, writeTVar, TQueue, newTQueue, readTQueue, writeTQueue, TMVar, newTMVar, newEmptyTMVar, takeTMVar, putTMVar, STM, atomically, check, orElse, retry) import Control.Monad(foldM, replicateM_, join) import Control.Monad.Trans.Reader (ReaderT, runReaderT, asks, ask, withReaderT) import Control.Monad.Trans.Maybe (MaybeT, runMaybeT) import Control.Monad.Trans.StringError (runStringErrorT) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Fail (MonadFail, fail) import Control.Monad.Trans.Control (MonadBaseControl) import Control.Logging (log, debugS, withStderrLogging) import Data.Time.Clock (getCurrentTime) import Data.String (fromString) import Data.Text (Text) import Data.List (intercalate) import qualified Data.ByteString as B import qualified Data.ByteString.Char8 as C import qualified Data.Map.Strict as Map import Foreign.Marshal.Alloc (alloca, allocaBytes) import Foreign.Ptr (Ptr, castPtr) import Foreign.Storable (Storable(peek, poke)) import Database.TransferDB.DumpDB.Format import Database.TransferDB.Commons (HasDBInfo(..), DBInfo(DBInfo), withConnection', connect', forAllTables, faillog, finally', finally) import SQL.CLI (SQLHENV, SQLHDBC, SQLHSTMT, SQLINTEGER, SQLSMALLINT, SQLPOINTER, SQLHANDLE, SQLLEN, sql_handle_env, sql_handle_dbc, sql_handle_stmt, sql_no_nulls, sql_null_data) import SQL.CLI.Utils (ColumnInfo(..), SQLConfig(..), collectColumnsInfo, collectColumnsInfo', allocHandle, columns, freeHandle, execDirect, forAllRecordsWithEndAndFail, forAllData, getData, disconnect) import SQL.CLI.ODBC (odbcImplementation, setupEnv) import SQL.ODBC (sql_char, sql_varchar, sql_longvarchar, sql_wchar, sql_wvarchar, sql_wlongvarchar, sql_decimal, sql_numeric, sql_bit, sql_tinyint, sql_smallint, sql_integer, sql_bigint, sql_real, sql_float, sql_double, sql_binary, sql_varbinary, sql_longvarbinary, sql_type_date, sql_type_time, sql_type_timestamp, sql_interval_year, sql_interval_month, sql_interval_day, sql_interval_hour, sql_interval_minute, sql_interval_second, sql_interval_year_to_month, sql_interval_day_to_hour, sql_interval_day_to_minute, sql_interval_day_to_second, sql_interval_hour_to_minute, sql_interval_hour_to_second, sql_interval_minute_to_second, sql_guid, sql_c_char, sql_c_wchar, sql_c_bit, sql_c_tinyint, sql_c_short, sql_c_long, sql_c_sbigint, sql_c_float, sql_c_double, sql_c_binary, sql_c_type_date, sql_c_type_time, sql_c_type_timestamp, sql_c_interval_year, sql_c_interval_month, sql_c_interval_day, sql_c_interval_hour, sql_c_interval_minute, sql_c_interval_second, sql_c_interval_year_to_month, sql_c_interval_day_to_hour, sql_c_interval_day_to_minute, sql_c_interval_day_to_second, sql_c_interval_hour_to_minute, sql_c_interval_hour_to_second, sql_c_interval_minute_to_second, sql_c_guid, odbcCTypeLen) logsrc :: Text logsrc = fromString "Database.TransferDB.DumpDB" -- | the maximum size of a chunk of variable length value maxChunkSize :: (Num a) => a maxChunkSize = 2 * 1024 * 1024 -- | the buffer size used to copy files bufSize :: Int bufSize = fromIntegral maxChunkSize -- | file buffering mode fileBuffering :: BufferMode fileBuffering = BlockBuffering (Just $ 10 * 1024 * 1024) -- | keep statistics by statement handle; for each handle record the -- number of records dumped so far and the total size dumped so far type StatisticsMap = Map.Map C.ByteString (Int, Int) -- | dump database options data DumpConfig = DumpConfig { dump_DSN :: String, -- ^ ODBC data source name dump_UserName :: String, -- ^ user name dump_Password :: String, -- ^ password dump_Schema :: String, -- ^ schema to be dumped dump_Description :: String, -- ^ dump description supplied by the user dump_FilePath :: FilePath, -- ^ the dump file name dump_ParallelThreads :: Int, -- ^ the number of threads to be run in parallel dump_StatisticsVar :: TVar StatisticsMap, -- ^ the global statistics map dump_StartTime :: TimeSpec -- ^ the start time, used to compute the dump rate } instance HasDBInfo DumpConfig where extractDBInfo cfg = DBInfo (dump_DSN cfg) (dump_UserName cfg) (dump_Password cfg) (dump_Schema cfg) -- | restore database options data RestoreConfig = RestoreConfig { restore_DSN :: String, -- ^ ODBC data source name restore_UserName :: String, -- ^ user name restore_Password :: String, -- ^ password restore_Schema :: String, -- ^ schema to be restored restore_FilePath :: FilePath -- ^ the dump file name } instance HasDBInfo RestoreConfig where extractDBInfo cfg = DBInfo (restore_DSN cfg) (restore_UserName cfg) (restore_Password cfg) (restore_Schema cfg) -- | dump a schema from an ODBC database to a binary file dump :: (MonadIO m, MonadFail m, MonadBaseControl IO m) => ReaderT DumpConfig m () -- MonadBaseControl is required for logging dump = withStderrLogging $ do filename <- asks dump_FilePath config <- ask liftIO $ withBinaryFile filename WriteMode (\handle -> do result <- runMaybeT $ runReaderT (hDump handle) config maybe (fail "Database dump failed") return result) -- | dumps the database schema to the file represented by the given handle hDump :: (MonadIO m, MonadFail m) => Handle -> ReaderT DumpConfig (MaybeT m) () hDump handle = do liftIO $ B.hPut handle $ writeVersion V1 header <- makeHeader liftIO $ B.hPut handle $ writeHeader header dumpTables handle -- | create the header of the dump file makeHeader :: (MonadIO m) => ReaderT DumpConfig m HeaderV1 makeHeader = do timestamp <- liftIO getCurrentTime description <- asks dump_Description return $ HeaderV1 maxChunkSize timestamp (C.pack description) -- | global environment for dumping tables in parallel data ThreadedDump = ThreadedDump { threads_TablesChan :: TQueue String, -- ^ the channel to publish the name of tables threads_AllTablesPublishedVar :: TVar Bool, -- ^ when this is true, no more tables will be published on the tables channel threads_WorkerThreadsVar :: TVar Int, -- ^ the number of worker threads that are running; worker threads are the threads that actually dump data threads_HandleVar :: TMVar Handle, -- ^ provides synchronized access to the dump file handle threads_Config :: DumpConfig, -- ^ the dump configuration threads_HEnv :: SQLHENV, -- ^ shared environment allocated in the main thread; unixODBC has problem if the handle is allocated on another thread threads_AllocHandleChan :: TQueue (SQLSMALLINT, SQLHANDLE, TMVar SQLHANDLE) -- ^ channel to call allocHandle on the main thread } instance HasDBInfo ThreadedDump where extractDBInfo = extractDBInfo . threads_Config -- | dumps all tables in a schema, on a given connection dumpTables :: (MonadIO m, MonadFail m) => Handle -> ReaderT DumpConfig (MaybeT m) () dumpTables handle = do threads <- asks dump_ParallelThreads case threads of 0 -> do liftIO $ log $ fromString "Dumping tables without using threads" _ <- withConnection' (\ _ hdbc -> withReaderT SingleThreaded $ forAllTables hdbc (0, 0) (dumpTable handle hdbc)) return () _ -> do liftIO $ log $ fromString $ "Dumping tables using " ++ (show threads) ++ " threads" tablesChan <- liftIO $ atomically newTQueue allTablesPublished <- liftIO $ atomically $ newTVar False workerThreads <- liftIO $ atomically $ newTVar 0 dumpFileHandle <- liftIO $ atomically $ newTMVar handle henv <- setupEnv allocHandleChan <- liftIO $ atomically $ newTQueue finally (liftIO $ log (fromString $ "free environment handle " ++ (show henv)) >> freeHandle sql_handle_env henv) $ do withReaderT (\ cfg -> ThreadedDump tablesChan allTablesPublished workerThreads dumpFileHandle cfg henv allocHandleChan) $ do startWorkerThreads publishTables waitForWorkToEnd -- | publish the tables to the chanel read by the worker threads publishTables :: (MonadIO m, MonadFail m) => ReaderT ThreadedDump m () publishTables = do env <- ask henv <- asks threads_HEnv publishEndVar <- asks threads_AllTablesPublishedVar hdbc <- connect' henv let freehdbc = liftIO $ freeHandle sql_handle_dbc hdbc liftIO $ do result <- runMaybeT $ runReaderT (finally freehdbc $ forAllTables hdbc 0 publishTable) env maybe (log $ fromString $ "publishTables failed") (\ n -> log $ fromString $ "all tables have been published: " ++ (show n)) result atomically $ writeTVar publishEndVar True return () -- | monadic action to publish the table name on the tables channel publishTable :: (MonadIO m, MonadFail m) => Int -> String -> ReaderT ThreadedDump m Int publishTable crt tableName = do liftIO $ log $ fromString $ "publish table: " ++ tableName tablesChan <- asks threads_TablesChan liftIO $ atomically $ writeTQueue tablesChan tableName return (crt + 1) -- | start the worker threads; each worker thread will dump data in a temporary file, -- then will append the contents of the temporary file to the contents of the dump file; the -- append is done synchronized with the other worker threads, so only one worker thread will -- append to the main dump file startWorkerThreads :: (MonadIO m, MonadFail m) => ReaderT ThreadedDump m () startWorkerThreads = do count <- asks (dump_ParallelThreads . threads_Config) replicateM_ count startWorkerThread -- | start one worker thread startWorkerThread :: (MonadIO m, MonadFail m) => ReaderT ThreadedDump m () startWorkerThread = do env <- ask henv <- asks threads_HEnv hdbc <- connect' henv let freehdbc = disconnect hdbc >> (liftIO $ freeHandle sql_handle_dbc hdbc) threadsCountVar <- asks threads_WorkerThreadsVar t <- liftIO $ atomically $ do crt <- readTVar threadsCountVar writeTVar threadsCountVar (crt + 1) return (crt + 1) liftIO $ log $ fromString $ "Starting thread " ++ (show t) _ <- liftIO $ forkIO $ withTempFile "." "transfer-db.dmp" (\ path handle -> do log $ fromString $ "started worker thread: " ++ path hSetBuffering handle fileBuffering hSetBinaryMode handle True result <- runMaybeT $ runReaderT (finally freehdbc $ dumpThread handle henv hdbc) env log $ fromString $ "worker thread ended " ++ (maybe "with failure: " (\ _ -> "with success: ") result) ++ path atomically $ modifyTVar threadsCountVar (subtract 1) ) return () -- | the worker thread; gets a table name from the tables channel and dumps the data in the -- temporary file; in the end, appends the temporary file contents to the end of the dump file contents dumpThread :: (MonadIO m, MonadFail m) => Handle -> SQLHENV -> SQLHDBC -> ReaderT ThreadedDump m () dumpThread htmpfile _ hdbc = do tablesChan <- asks threads_TablesChan allTablesPublishedVar <- asks threads_AllTablesPublishedVar handleVar <- asks threads_HandleVar env <- ask liftIO $ log $ fromString "entered dumpThread" let dumpNextTables :: STM (IO ()) dumpNextTables = orElse dumpNextTables' waitTableOrEnd dumpNextTables' :: STM (IO ()) dumpNextTables' = do tableName <- readTQueue tablesChan return $ do log $ fromString $ "start dumping table: " ++ tableName result <- runStringErrorT $ runReaderT (dumpTable htmpfile hdbc (0, 0) tableName) (MultiThreaded env) either (\s -> log $ fromString $ "dumping table " ++ tableName ++ " failed: " ++ s) (\ _ -> return ()) result join $ atomically $ dumpNextTables waitTableOrEnd :: STM (IO ()) waitTableOrEnd = do allTablesPublished <- readTVar allTablesPublishedVar if allTablesPublished then return $ (log $ fromString $ "allTablesPublished = " ++ (show allTablesPublished)) >> finalizeDump else retry finalizeDump :: IO () finalizeDump = do log $ fromString $ "finalizing dump thread" hdumpfile <- atomically $ takeTMVar handleVar copyTmpToDumpFile htmpfile hdumpfile atomically $ putTMVar handleVar hdumpfile liftIO $ join $ atomically dumpNextTables -- | finalizes the dump by copying the temporary file back into -- the dump file copyTmpToDumpFile :: Handle -> Handle -> IO () copyTmpToDumpFile htmp hdmp = do hFlush htmp hSeek htmp AbsoluteSeek 0 hSeek hdmp SeekFromEnd 0 allocaBytes bufSize (\ buf -> let copyFile = do sz <- hGetBuf htmp buf bufSize if sz > 0 then do hPutBuf hdmp buf sz if sz >= bufSize then copyFile else return () else return () in copyFile ) -- | wait for worker threads to complete the work waitForWorkToEnd :: (MonadIO m) => ReaderT ThreadedDump m () waitForWorkToEnd = do threadsCountVar <- asks threads_WorkerThreadsVar allocHandleChan <- asks threads_AllocHandleChan let waitIO = join $ atomically $ orElse (readTVar threadsCountVar >>= check . (<= 0) >> (return $ return ())) (allocHandleT allocHandleChan >>= \ io -> return (io >> waitIO)) liftIO $ waitIO liftIO $ log $ fromString $ "all worker threads have finished" -- | creates an IO action inside a STM monad to allocate a new handler in the current thread allocHandleT :: (MonadIO m, MonadFail m) => TQueue (SQLSMALLINT, SQLHANDLE, TMVar SQLHANDLE) -> STM (m ()) allocHandleT chan = do (hType, hParent, retVar) <- readTQueue chan return $ allocHandle hType hParent >>= liftIO . atomically . (putTMVar retVar) -- | make a handle alloc request to the main thread and wait for result allocHandleReq :: (MonadIO m, MonadFail m) => SQLSMALLINT -> SQLHANDLE -> ReaderT ThreadedDump m SQLHANDLE allocHandleReq htype hparent = do allocHandleChan <- asks threads_AllocHandleChan resultVar <- liftIO $ atomically $ newEmptyTMVar liftIO $ atomically $ writeTQueue allocHandleChan (htype, hparent, resultVar) liftIO $ atomically $ takeTMVar resultVar -- | environment for either single threaded or multi threaded dump data SingleOrMulti = SingleThreaded DumpConfig | MultiThreaded ThreadedDump -- | extract dumpConfig from a 'SingleOrMulti' structure dumpConfig :: SingleOrMulti -> DumpConfig dumpConfig (SingleThreaded x) = x dumpConfig (MultiThreaded x) = threads_Config x instance HasDBInfo SingleOrMulti where extractDBInfo = extractDBInfo . dumpConfig -- | either directly alloc handle or call 'allocHandleReq' to alloc a handle deppending -- on it is run on a threaded or non threaded environment allocHandleSM :: (MonadIO m, MonadFail m) => SQLSMALLINT -> SQLHANDLE -> ReaderT SingleOrMulti m SQLHANDLE allocHandleSM htype hparent = do env <- ask case env of SingleThreaded _ -> allocHandle htype hparent MultiThreaded threadeEnv -> withReaderT (const threadeEnv) $ allocHandleReq htype hparent -- | workarround for unixODBC bug that requires that all handles should be allocated -- on the main thread collectColumnsInfoSM :: (MonadIO m, MonadFail m) => SQLHDBC -- ^ connection handle -> String -- ^ schema name -> String -- ^ table name -> ReaderT SingleOrMulti m [ColumnInfo] collectColumnsInfoSM hdbc schemaName tableName = do env <- ask case env of SingleThreaded _ -> withReaderT (const odbcImplementation) $ collectColumnsInfo hdbc schemaName tableName MultiThreaded x -> withReaderT (const x) $ do hstmt <- allocHandleReq sql_handle_stmt hdbc columns hstmt Nothing (Just schemaName) (Just tableName) Nothing withReaderT (const odbcImplementation) $ collectColumnsInfo' hstmt -- | dumps a single table dumpTable :: (MonadIO m, MonadFail m) => Handle -> SQLHDBC -> (Int, Int) -> String -> ReaderT SingleOrMulti m (Int, Int) dumpTable handle hdbc _ tableName = do schema <- extractSchema hdbc tableName liftIO $ debugS logsrc $ fromString $ "Schema " ++ (C.unpack $ schema_DBSchemaName schema) ++ "." ++ (C.unpack $ schema_TableName schema) liftIO $ sequence_ $ map debugFieldInfo $ schema_Fields schema liftIO $ B.hPut handle $ writeSchema schema (!recs, !bytes) <- dumpTableData handle hdbc schema liftIO $ B.hPut handle writeEOT return (recs, bytes) -- | logs the content of a 'FieldInfo' structure debugFieldInfo :: FieldInfoV1 -> IO () debugFieldInfo f = do debugS logsrc $ fromString $ "\tfi_ColumnName: " ++ (C.unpack $ fi_ColumnName f) debugS logsrc $ fromString $ "\tfi_DataType: " ++ (show $ fi_DataType f) debugS logsrc $ fromString $ "\tfi_ColumnSize: " ++ (maybe "(null)" show $ fi_ColumnSize f) debugS logsrc $ fromString $ "\tfi_BufferLength: " ++ (maybe "(null)" show $ fi_BufferLength f) debugS logsrc $ fromString $ "\tfi_DecimalDigits: " ++ (maybe "(null)" show $ fi_DecimalDigits f) debugS logsrc $ fromString $ "\tfi_NumPrecRadix: " ++ (maybe "(null)" show $ fi_NumPrecRadix f) debugS logsrc $ fromString $ "\tfi_Nullabe: " ++ (show $ fi_Nullable f) -- | extract schema infromation from the database, using an existing db connection extractSchema :: (MonadIO m, MonadFail m) => SQLHDBC -> String -> ReaderT SingleOrMulti m SchemaV1 extractSchema hdbc tableName = do schemaName <- asks (dump_Schema.dumpConfig) fields <- extractSchemaFields hdbc tableName return $ SchemaV1 (C.pack schemaName) (C.pack tableName) fields -- | extract the fields information from the database extractSchemaFields :: (MonadIO m, MonadFail m) => SQLHDBC -> String -> ReaderT SingleOrMulti m [FieldInfoV1] extractSchemaFields hdbc tableName = do schemaName <- asks (dump_Schema.dumpConfig) cols <- collectColumnsInfoSM hdbc schemaName tableName return $ map makeFieldInfo cols -- | transforms a 'ColumnInfo' structure into a 'FieldInfoV1' structure makeFieldInfo :: ColumnInfo -> FieldInfoV1 makeFieldInfo ci = FieldInfoV1 { fi_ColumnName = C.pack $ ci_ColumnName ci, fi_DataType = ci_DataType ci, fi_ColumnSize = ci_ColumnSize ci, fi_BufferLength = ci_BufferLength ci, fi_DecimalDigits = ci_DecimalDigits ci, fi_NumPrecRadix = ci_NumPrecRadix ci, fi_Nullable = ci_Nullable ci, fi_OrdinalPosition = ci_OrdinalPosition ci} -- | dump the table records to the file, one by one; returns the (number of records, -- size in bytes) of dumped data dumpTableData :: (MonadIO m, MonadFail m) => Handle -> SQLHDBC -> SchemaV1 -> ReaderT SingleOrMulti m (Int, Int) dumpTableData handle hdbc schema = do let tableName' = schema_QualifiedTableName schema tableName = C.unpack tableName' liftIO $ log $ fromString $ "dumping table: " ++ tableName select <- makeSelectSql schema hstmt <- allocHandleSM sql_handle_stmt hdbc env <- ask finally' ((log $ fromString $ "freeHandle for " ++ tableName) >> freeHandle sql_handle_stmt hstmt) $ do execDirect hstmt select (faillog $ "param data requested for select '" ++ select ++ "'") result <- liftIO $ allocaBytes (fromIntegral maxChunkSize) (\ p_transferBuf -> alloca (\ p_transferLenOrInd -> do let dumpAction = dumpRecord handle hstmt schema p_transferBuf maxChunkSize p_transferLenOrInd endAction hstmt x = logstats tableName' x >> return x failAction hstmt x s = logstats tableName' x >> fail s runStringErrorT $ runReaderT (forAllRecordsWithEndAndFail hstmt dumpAction (endAction hstmt) (failAction hstmt) (0,0)) env)) (cnt, size) <- either (\ s -> faillog $ "transfer table " ++ (C.unpack $ schema_TableName schema) ++ " failed: " ++ s) return result liftIO $ log $ fromString $ "Finished dupming table " ++ (C.unpack $ schema_DBSchemaName schema) ++ "." ++ (C.unpack $ schema_TableName schema) ++ "; dumped " ++ (show cnt) ++ " records of " ++ (show size) ++ " bytes" size `seq` cnt `seq` return (cnt, size) -- | dumps the data of one record into the file; it returns the (number of records, total bytes dumped) dumpRecord :: (MonadIO m, MonadFail m) => Handle -> SQLHSTMT -> SchemaV1 -> SQLPOINTER -> SQLLEN -> Ptr SQLLEN -> (Int, Int) -> ReaderT SingleOrMulti m (Int, Int) dumpRecord handle hstmt schema p_buf bufLen p_lenOrInd (cnt, sz) = do liftIO $ B.hPut handle writeRI -- TODO: if an error occurs writing any of the fields and one or more fields are not -- properly written in the file, the dump file will be corrupted let fields = zip (map fromIntegral [1 .. length fields']) fields' fields' = schema_Fields schema sz' <- foldM (dumpField handle hstmt p_buf bufLen p_lenOrInd) sz fields let cnt' = cnt + 1 if cnt' `mod` 100000 == 0 then do let tableName = schema_QualifiedTableName schema logStatistics tableName cnt' sz' else return () cnt' `seq` sz' `seq` return (cnt', sz') -- | monadic action that logs the data dumped until now; it uses a map that records the count of records -- and the size dumped for each statement handle logStatistics :: (MonadIO m, MonadFail m) => C.ByteString -> Int -> Int -> ReaderT SingleOrMulti m () logStatistics logkey cnt sz = do sizesVar <- asks readSizesVar (totalCnt, totalSz) <- liftIO $ atomically $ do sizesMap <- readTVar sizesVar let updatedSizesMap = Map.insert logkey (cnt, sz) sizesMap writeTVar sizesVar updatedSizesMap return $ Map.foldr' (\ (c1, s1) (c2, s2) -> (c1 + c2, s1 + s2)) (0, 0) updatedSizesMap startTime <- asks readStartTime crtTime <- liftIO $ getTime Monotonic let duration = diffTimeSpec crtTime startTime szRate = (fromIntegral totalSz) * 1000000000 `div` (toNanoSecs duration) liftIO $ log $ fromString $ ">>>>>> Running for " ++ (show $ sec duration) ++ " seconds" liftIO $ log $ fromString $ ">>>>>> (" ++ (C.unpack logkey) ++ ") " ++ (show totalCnt) ++ " records / " ++ (show $ totalSz) ++ " bytes" ++ ", " ++ (show szRate) ++ " bytes/sec" -- | uncurried form of ('lotStatistics' hstmt) logstats :: (MonadIO m, MonadFail m) => C.ByteString -> (Int, Int) -> ReaderT SingleOrMulti m () logstats hstmt = uncurry $ logStatistics hstmt -- | get the 'TVar' 'StatiscsMap' from the environment readSizesVar :: SingleOrMulti -> TVar StatisticsMap readSizesVar (SingleThreaded x) = dump_StatisticsVar x readSizesVar (MultiThreaded x) = (dump_StatisticsVar.threads_Config) x -- | get the start time from the environment readStartTime :: SingleOrMulti -> TimeSpec readStartTime (SingleThreaded x) = dump_StartTime x readStartTime (MultiThreaded x) = (dump_StartTime.threads_Config) x -- | dump data of a single field; adds the size of dumped data to the total size -- received as parameter dumpField :: (MonadIO m, MonadFail m) => Handle -> SQLHSTMT -> SQLPOINTER -> SQLLEN -> Ptr SQLLEN -> Int -> (SQLSMALLINT, FieldInfoV1) -> m Int dumpField handle hstmt p_buf buflen p_lenOrInd sz (crt, fld) = runReaderT dumpField' $ DumpFieldSpec handle hstmt p_buf buflen p_lenOrInd sz crt fld -- | dump field data parameters data DumpFieldSpec = DumpFieldSpec { dmpfld_Handle :: Handle, -- ^ dump file handle dmpfld_HStmt :: SQLHSTMT, -- ^ table select statement dmpfld_Buf :: SQLPOINTER, -- ^ data transfer buffer dmpfld_BufLen :: SQLLEN, -- ^ the size of data transfer buffer dmpfld_LenOrInd :: Ptr SQLLEN, -- ^ buffer to get null indicator or the actual size of transferred data dmpfld_Size :: Int, -- ^ the total data transferred prior to this field dmpfld_Crt :: SQLSMALLINT, -- ^ the number in the statement of the field to be dumped dmpfld_Field :: FieldInfoV1 -- ^ the description of the field } -- | ReaderT monadic action that dumps a field into the dump file; returns the total dump size, -- adding to the prior size the size in bytes of dumped data dumpField' :: (MonadIO m, MonadFail m) => ReaderT DumpFieldSpec m Int dumpField' = do dataType <- asks $ fi_DataType . dmpfld_Field case dataType of _ | dataType == sql_char -> dumpVarLenField sql_c_char | dataType == sql_varchar -> dumpVarLenField sql_c_char | dataType == sql_longvarchar -> dumpVarLenField sql_c_char | dataType == sql_wchar -> dumpVarLenField sql_c_wchar | dataType == sql_wvarchar -> dumpVarLenField sql_c_wchar | dataType == sql_wlongvarchar -> dumpVarLenField sql_c_wchar | dataType == sql_decimal -> dumpVarLenField sql_c_char | dataType == sql_numeric -> dumpVarLenField sql_c_char | dataType == sql_bit -> dumpFixedField sql_c_bit | dataType == sql_tinyint -> dumpFixedField sql_c_tinyint | dataType == sql_smallint -> dumpFixedField sql_c_short | dataType == sql_integer -> dumpFixedField sql_c_long | dataType == sql_bigint -> dumpFixedField sql_c_sbigint | dataType == sql_real -> dumpFixedField sql_c_float | dataType == sql_float -> dumpFixedField sql_c_double | dataType == sql_double -> dumpFixedField sql_c_double | dataType == sql_binary -> dumpVarLenField sql_c_binary | dataType == sql_varbinary -> dumpVarLenField sql_c_binary | dataType == sql_longvarbinary -> dumpVarLenField sql_c_binary | dataType == sql_type_date -> dumpFixedField sql_c_type_date | dataType == sql_type_time -> dumpFixedField sql_c_type_time | dataType == sql_type_timestamp -> dumpFixedField sql_c_type_timestamp | dataType == sql_interval_year -> dumpFixedField sql_c_interval_year | dataType == sql_interval_month -> dumpFixedField sql_c_interval_month | dataType == sql_interval_day -> dumpFixedField sql_c_interval_day | dataType == sql_interval_hour -> dumpFixedField sql_c_interval_hour | dataType == sql_interval_minute -> dumpFixedField sql_c_interval_minute | dataType == sql_interval_second -> dumpFixedField sql_c_interval_second | dataType == sql_interval_year_to_month -> dumpFixedField sql_c_interval_year_to_month | dataType == sql_interval_day_to_hour -> dumpFixedField sql_c_interval_day_to_hour | dataType == sql_interval_day_to_minute -> dumpFixedField sql_c_interval_day_to_minute | dataType == sql_interval_day_to_second -> dumpFixedField sql_c_interval_day_to_second | dataType == sql_interval_hour_to_minute -> dumpFixedField sql_c_interval_hour_to_minute | dataType == sql_interval_hour_to_second -> dumpFixedField sql_c_interval_hour_to_second | dataType == sql_interval_minute_to_second -> dumpFixedField sql_c_interval_minute_to_second | dataType == sql_guid -> dumpFixedField sql_c_guid | otherwise -> dumpUnknownFieldType -- | dumps a variable field length; the variable field length will be dumped in chunks -- each chunk having two fields: a length of the chunk and the actual data of the chunk. The -- length of the chunk will be encoded on one, two or four bytes, depending on the maximum -- chunk length and the maximum length of the field taken from the table's schema. dumpVarLenField :: (MonadIO m, MonadFail m) => SQLSMALLINT -> ReaderT DumpFieldSpec m Int dumpVarLenField bufferType = do bufferSize <- asks dmpfld_BufLen hstmt <- asks dmpfld_HStmt colnum <- asks dmpfld_Crt p_buffer <- asks dmpfld_Buf p_LenOrInd <- asks dmpfld_LenOrInd size <- asks dmpfld_Size lenlen <- octetLengthOfChunkSize (_, size') <- forAllData hstmt colnum bufferType p_buffer bufferSize p_LenOrInd (dumpChunk lenlen) (0, size) size' `seq` return size' -- | the octet length of the chunk size; it is calculated based on the transfer buffer size -- and on the maxumum size of the field. octetLengthOfChunkSize :: (Num a, Monad m) => ReaderT DumpFieldSpec m a octetLengthOfChunkSize = do bufferSize <- asks dmpfld_BufLen fieldSize <- asks $ fi_BufferLength . dmpfld_Field let chunkSize = (fromIntegral bufferSize) - 1 -- for \0 character lenlen = maybe lenlen' (\sz -> if sz < chunkSize then if sz <= 256 then 1 else if sz <= 65536 then 2 else 4 else lenlen') fieldSize lenlen' = if chunkSize <= 256 then 1 else if chunkSize <= 65536 then 2 else 4 return lenlen -- | dumps a chunk of data; increments the current size with the number of octets -- dumped and returns this value dumpChunk :: (MonadIO m, MonadFail m) => Int -> (Int, Int) -> ReaderT DumpFieldSpec m (Int, Int) dumpChunk lenlen (chunkNo, size) = do nullable <- asks $ fi_Nullable.dmpfld_Field size' <- if chunkNo > 0 || nullable == sql_no_nulls then dumpChunk' lenlen size else do p_lenOrInd <- asks dmpfld_LenOrInd lenOrInd <- liftIO $ peek p_lenOrInd if lenOrInd == sql_null_data then dumpNullIndicator Null size else do size'' <- dumpNullIndicator NotNull size dumpChunk' lenlen size'' size' `seq` return (chunkNo+1, size') dumpNullIndicator :: (MonadIO m) => NullIndicator -> Int -> ReaderT DumpFieldSpec m Int dumpNullIndicator indicator size = do let bs = writeNullIndicator indicator handle <- asks dmpfld_Handle liftIO $ B.hPut handle bs let size' = size + (B.length bs) size' `seq` return size' -- | dumps a chunk of data for a non null field; increments the current size with the number of octets -- dumped and returns this value dumpChunk' :: (MonadIO m, MonadFail m) => Int -> Int -> ReaderT DumpFieldSpec m Int dumpChunk' lenlen size = do columnName <- asks $ fi_ColumnName.dmpfld_Field p_buf <- asks dmpfld_Buf p_lenOrInd <- asks dmpfld_LenOrInd handle <- asks dmpfld_Handle lenOrInd <- liftIO $ peek p_lenOrInd buffersize <- asks dmpfld_BufLen bs <- if lenOrInd > -1 then let datalen = fromIntegral lenlen buffersize' = (fromIntegral buffersize) - 1 -- reserve space for null terminator chunklen = if datalen > buffersize' then buffersize' else datalen in return $ writeChunk (fromIntegral lenlen) chunklen (castPtr p_buf) else fail $ "dumpChunk' received unexpected null field (" ++ (show lenOrInd) ++ "); column " ++ (C.unpack columnName) liftIO $ B.hPut handle bs let size' = size + (B.length bs) size' `seq` return size' -- | dumps a fixed length field; the buffer length of the field will be taken from the description -- of the field in the table's schema; the dumped data will only contain the data of the field; -- the first parameter represents the ODBC C data type of the data to be read from the database dumpFixedField :: (MonadIO m, MonadFail m) => SQLSMALLINT -> ReaderT DumpFieldSpec m Int dumpFixedField bufferType = do bufferSize <- asks dmpfld_BufLen hstmt <- asks dmpfld_HStmt colnum <- asks dmpfld_Crt p_buffer <- asks dmpfld_Buf p_LenOrInd <- asks dmpfld_LenOrInd size <- asks dmpfld_Size liftIO $ poke p_LenOrInd 0 getData hstmt colnum bufferType p_buffer bufferSize p_LenOrInd lenOrInd <- liftIO $ peek p_LenOrInd nullable <- asks $ fi_Nullable.dmpfld_Field columnName <- asks $ fi_ColumnName.dmpfld_Field if nullable == sql_no_nulls then if lenOrInd == sql_null_data then fail $ "null value read from not nullable field " ++ (C.unpack columnName) else dumpFixedField' bufferType size lenOrInd else if lenOrInd == sql_null_data then dumpNullIndicator Null size else do size' <- dumpNullIndicator NotNull size dumpFixedField' bufferType size' lenOrInd -- | dumps the data of a not null field dumpFixedField' :: (MonadIO m, MonadFail m) => SQLSMALLINT -> Int -> SQLLEN -> ReaderT DumpFieldSpec m Int dumpFixedField' bufferType size lenOrInd = do let wellKnownSize = odbcCTypeLen bufferType columnName <- asks $ fi_ColumnName.dmpfld_Field handle <- asks $ dmpfld_Handle p_buffer <- asks $ dmpfld_Buf bs <- if maybe False (lenOrInd /=) wellKnownSize then fail $ "the actual length of the field (" ++ (show lenOrInd) ++ ") is different from the schema size of the field (" ++ (show wellKnownSize) ++ "): " ++ (C.unpack columnName) else return $ writePlainBuf (castPtr p_buffer) (fromIntegral lenOrInd) liftIO $ B.hPut handle bs let size' = size + (B.length bs) size' `seq` return size' -- | dumps a field of an unknown type; the field will be converted to SQL_C_CHAR and dumped as -- a string of characters dumpUnknownFieldType :: (MonadIO m, MonadFail m) => ReaderT DumpFieldSpec m Int dumpUnknownFieldType = fail "dumpUnknownFieldType not implemented" -- | create the select for dumping table data makeSelectSql :: (MonadIO m, MonadFail m) => SchemaV1 -> m String makeSelectSql schema = let tableName = C.unpack $ schema_TableName schema schemaName = C.unpack $ schema_DBSchemaName schema qualifiedTableName = case schemaName of [] -> tableName s -> s ++ "." ++ tableName fieldsList = intercalate ", " $ map (C.unpack.fi_ColumnName) fields fields = schema_Fields schema in return $ "select " ++ fieldsList ++ " from " ++ qualifiedTableName -- | read the binary dump of a database and restores it in a destination ODBC data source restore :: (MonadIO m, MonadFail m, MonadBaseControl IO m) => ReaderT RestoreConfig m () -- MonadBaseControl is required for logging restore = withStderrLogging $ return ()