{-- | Module : Generator Description : Automatic creation of transfer plans from the source db Copyright : (c) Mihai Giurgeanu, 2017 License : GPL-3 Maintainer : mihai.giurgeanu@gmail.com Stability : experimental Portability : Portable --} module Generator where import Prelude hiding (fail, log) import System.IO (hPutStrLn, hPutStr, stderr) import Control.Logging (log, withStderrLogging) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Fail (MonadFail, fail) import Control.Monad.Trans.Reader (ReaderT, withReaderT, asks) import Control.Monad.Trans.Maybe (MaybeT) import Data.String(IsString(fromString)) import SQL.CLI (SQLHDBC, sql_handle_stmt, sql_null_data, sql_char) import SQL.CLI.Utils (tables, allocHandle, getData, forAllRecords, freeHandle, execDirect, forAllData) import Foreign.Marshal.Alloc (alloca, allocaBytes) import Foreign.Storable (peek, poke) import Foreign.C.String (peekCString) import Foreign.Ptr (castPtr) import Text.Printf (printf) import qualified Data.ByteString as B import Database.TransferDB.Commons (ProgramOptions, DBInfo, withConnection', po_Source, po_Dest, dbi_Datasource, dbi_User, dbi_Password, dbi_Schema) import TransferPlan (TransferPlan (TransferPlan), DatabaseScope (Scope), Batch(Batch), BatchItem(BatchItem), ColumnName, batch_Items, batch_Where, plan_Batches) import Data.Yaml.Aeson (encode) import Data.List (intercalate) maxKeyBuffer :: Int maxKeyBuffer = 1024 -- | Writes to standard output a simple transfer plan, including all the tables -- in the source database makeSimplePlan :: ReaderT ProgramOptions (MaybeT IO) () makeSimplePlan = withStderrLogging $ withConnection' (\ _ hdbc -> do let addTable :: (MonadIO m) => Batch -> String -> m Batch addTable batch tableName = do liftIO $ log $ fromString $ "add table " ++ tableName return batch { batch_Items = (batch_Items batch) ++ [BatchItem tableName Nothing Nothing] } plan <- initPlan liftIO $ log $ fromString "Reading tables from source ..." batch <- withTables hdbc (Batch "All tables" []) addTable let plan' = plan { plan_Batches = [batch] } liftIO $ B.putStr $ encode plan') -- | Writes to standard output a transfer plan, counting the number of tables included in each batch, -- and making batches to hold the given number of records makePlanByTables :: Integer -> ReaderT ProgramOptions (MaybeT IO) () makePlanByTables n = withStderrLogging $ do liftIO $ log $ fromString $ "Making plan with " ++ (show n) ++ " tables per batch" withConnection' (\ _ hdbc -> do let addTable :: (MonadIO m) => (Integer, Int, Batch, [Batch]) -> String -> m (Integer, Int, Batch, [Batch]) addTable (crt, crtb, b, bs) tableName = do liftIO $ log $ fromString $ "add table " ++ tableName return $ if crt `mod` n == 0 && crt > 0 then (crt + 1, crtb + 1, newBatch, bs ++ [b]) else (crt + 1, crtb, addItem, bs) where newBatch = Batch (printf "%04d" $ crtb + 1) [batchItem] addItem = b { batch_Items = (batch_Items b) ++ [batchItem] } batchItem = BatchItem tableName Nothing Nothing liftIO $ log $ fromString "Reading tables from source ... " (_, _, lastBatch, closeBatches) <- withTables hdbc (0, 1, Batch "0001" [], []) addTable plan <- initPlan let plan' = plan { plan_Batches = closeBatches ++ [lastBatch] } liftIO $ B.putStr $ encode plan' ) liftIO $ log $ fromString "Plan generated" -- | Writes to standard output a transfer plan, counting the number of records included in each batch, -- and making batches to hold the given number of records makePlanByRows :: Integer -> ReaderT ProgramOptions (MaybeT IO) () makePlanByRows n = withStderrLogging $ do liftIO $ log $ fromString $ "Making a plan with " ++ (show n) ++ " rows transferred in each batch" schemaName <- asks (dbi_Schema . po_Source) withConnection' (\ _ hdbc -> do let addTable :: (MonadIO m, MonadFail m) => (Integer, Int, Batch, [Batch]) -> String -> m (Integer, Int, Batch, [Batch]) addTable (crt, crtb, b, bs) tableName = do liftIO $ log $ fromString $ "add table " ++ tableName rowsstmt <- allocHandle sql_handle_stmt hdbc ks <- keyColumns hdbc tableName let addRow :: (MonadIO m, MonadFail m) => (Integer, Int, Batch, [Batch], [String]) -> m (Integer, Int, Batch, [Batch], [String]) addRow (crt', crtb', b', bs', vs') = do if crt' `mod` 500000 == 0 then liftIO $ hPutStr stderr "." else return () if crt' `mod` n == 0 && crt' > 0 then do newVs <- readKeys let newBatch :: Batch newBatch = Batch (printf "%08d" (crtb' + 1)) [] closedBatch :: Batch closedBatch = b' { batch_Items = (batch_Items b') ++ [closedItem] } closedItem :: BatchItem closedItem = BatchItem tableName (Just ks) (Just closeWhere) closeWhere = intercalate " and " ([ c ++ " >= '" ++ v ++ "'" | (c, v) <- zip ks vs'] ++ [ c ++ " < '" ++ v ++ "'" | (c, v) <- zip ks newVs]) return (crt' + 1, crtb' + 1, newBatch, bs' ++ [closedBatch], newVs) else return (crt' + 1, crtb', b', bs', vs') where readKeys :: (MonadIO m, MonadFail m) => m [String] readKeys = do liftIO $ allocaBytes maxKeyBuffer (\ p_value -> alloca ( \ p_value_len -> let readKey k = do forAllData rowsstmt k sql_char (castPtr p_value) (fromIntegral maxKeyBuffer) p_value_len readChunk "" readChunk :: String -> IO String readChunk v = do len <- peek p_value_len if len > 0 then peekCString p_value >>= return . (v ++) else return v in sequence $ map (readKey.fromIntegral) [1.. (length ks)])) selectKeys = "select " ++ colsList ++ " from " ++ (if length schemaName > 0 then schemaName ++ "." else "") ++ tableName ++ " order by " ++ colsList colsList = intercalate ", " ks execDirect rowsstmt selectKeys (fail $ "unknown parameter data wanted for sql: " ++ selectKeys) liftIO $ log $ fromString $ "reading records from table " ++ tableName (crt', crtb', b', bs', vs') <- forAllRecords rowsstmt addRow (crt, crtb, b, bs, []) liftIO $ freeHandle sql_handle_stmt rowsstmt return $ if crtb' /= crtb then -- if new batch has been created, then we add a new item for the current table -- to get all records having the key greater with the last keys used for that table let whereClause = intercalate " and " [c ++ " >= '" ++ v ++ "'" | (c, v) <- zip ks vs'] item = BatchItem tableName (Just ks) (Just whereClause) in (crt', crtb', b' {batch_Items = (batch_Items b') ++ [item]} , bs') else -- if no new batch was generated (crtb == crtb') then add this table to the current batch (crt', crtb', b' { batch_Items = (batch_Items b') ++ [BatchItem tableName Nothing Nothing] }, bs') liftIO $ log $ fromString "reading tables from source ... " (_, _, lastBatch, closedBatches) <- withTables hdbc (0, 1, Batch "00000001" [], []) addTable plan <- initPlan let plan' = plan { plan_Batches = closedBatches ++ [lastBatch] } liftIO $ B.putStr $ encode plan') liftIO $ log $ fromString "Plan generated" -- | extract the names of the columns making a unique key for a given table keyColumns :: (MonadIO m, MonadFail m) => SQLHDBC -> String -> m [ColumnName] keyColumns _ _ = return ["ROWID"] -- TODO give a real implementation -- | run an action in the current environment on each table name from the current schema, -- passing an accumulator value; returns the value of the accumulor withTables :: (MonadFail m, MonadIO m) => SQLHDBC -> a -> (a -> String -> ReaderT ProgramOptions m a) -> ReaderT ProgramOptions m a withTables hdbc arg f = do tables_stmt <- allocHandle sql_handle_stmt hdbc schema <- asks $ dbi_Schema.po_Source tables tables_stmt Nothing (Just schema) Nothing (Just "TABLE") let readTableName = liftIO $ allocaBytes 255 (\ p_tableName -> alloca (\ p_tableName_ind -> do poke p_tableName_ind 0 _ <- getData tables_stmt 3 sql_char p_tableName 255 p_tableName_ind tableName_ind <- liftIO $ peek p_tableName_ind tableName <- if tableName_ind == sql_null_data then return Nothing else (liftIO . peekCString . castPtr) p_tableName >>= (return.Just) return tableName)) withTableName arg' = do tableName <- readTableName maybe (return arg') (f arg') tableName result <- forAllRecords tables_stmt withTableName arg liftIO $ freeHandle sql_handle_stmt tables_stmt return result initPlan :: (Monad m) => ReaderT ProgramOptions m TransferPlan initPlan = TransferPlan <$> withReaderT po_Source initDatabaseScope <*> withReaderT po_Dest initDatabaseScope <*> return [] initDatabaseScope :: (Monad m) => ReaderT DBInfo m DatabaseScope initDatabaseScope = Scope <$> asks dbi_Datasource <*> asks dbi_User <*> asks dbi_Password <*> asks dbi_Schema