{-| Module : CorrectionPlan Description : Given the plan and the execution log, it generates a new plan and an SQL script meant to correct eventual failures License : GPL-3 Maintainer : mihai.giurgeanu@gmail.com Stability : experimental Portability : GHC -} {-# LANGUAGE FlexibleContexts, DeriveGeneric #-} module CorrectionPlan where import Prelude hiding (fail, log) import Control.Logging (log, withStderrLogging) import Control.Monad.Fail (MonadFail, fail) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Control (MonadBaseControl) import Control.Monad.Trans.Reader (ReaderT, asks, ask) import Control.Monad.Trans.State (StateT, gets, modify', execStateT) import qualified Data.ByteString.Lazy as B import qualified Data.ByteString as B1 (hPutStr) import Data.Csv (FromRecord) import Data.Csv.Streaming (decode, Records(Cons, Nil), HasHeader(NoHeader)) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.List (foldl') import Data.String(IsString(fromString)) import Data.Set (Set) import qualified Data.Set as Set import Data.Yaml.Aeson (encode) import GHC.Generics (Generic) import TransferPlan (TransferPlan(TransferPlan, plan_Source, plan_Destination), Batch(Batch), BatchItem(BatchItem)) import System.IO (Handle, hSetBinaryMode, hPutStrLn) data CorrectionConfig = CorrectionConfig { correction_Plan :: TransferPlan, -- ^ the original plan correction_LogHandle :: Handle, -- ^ the input file handle to the analyzed execution log correction_ScriptHandle :: Handle, -- ^ the output file handle to the generated correction sql script correction_PlanHandle :: Handle -- ^ the output file handle to the generated correction plan } -- | generates the contents of 2 files: a sql script and a new yaml transfer plan -- to be run to correct the possible errors in running a previous plan, preventing duplicated -- data or missing data. -- -- The 2 files are meant to correct the following errors: -- -- 1. some items in the original transfer plan were loaded 2 ore more times (so the data is duplicated in the destination table) -- 2. some items were skipped in the previous run, so data is missing in the destination table. -- -- In the first case, the table containing duplicated records is truncated (with a statement in -- the sql script) and one or more batches are generated to load again the data in that table. -- -- In the second case, one ore more batches are generated in the new plan to load the -- items that were missed at the first run. -- -- Please note, that the original plan should not contain duplicated data transfers, that is, items with -- the same table source name and same where condition. generateCorrections :: (MonadIO m, MonadFail m, MonadBaseControl IO m) => ReaderT CorrectionConfig m () generateCorrections = withStderrLogging $ do logh <- asks correction_LogHandle liftIO $ hSetBinaryMode logh True logstream <- liftIO $ B.hGetContents logh cfg <- ask let plan = correction_Plan cfg itemsMap' = itemsMap plan tablesMap'= tablesMap plan initialCorrectionState = CorrectionState cfg 0 Set.empty Set.empty Set.empty (Map.keysSet itemsMap') tablesMap' s <- lift $ execStateT (processLogRecords $ decode NoHeader logstream) initialCorrectionState source <- asks (plan_Source . correction_Plan) destination <- asks (plan_Destination . correction_Plan) batches <- fromItems (correctionState_GeneratedItems s) itemsMap' let newPlan = TransferPlan source destination batches planh <- asks correction_PlanHandle liftIO $ B1.hPutStr planh $ encode newPlan -- | generate new plan batches list using original batch names and original items fromItems :: (MonadIO m) => Set (String, String) -> Map (String, String) (String, BatchItem) -> m [Batch] fromItems items m = Set.foldl' insertItem (return Map.empty) items >>= return . Map.elems where insertItem m' i = do m'' <- m' let originalinfo = Map.lookup i m case originalinfo of Nothing -> do liftIO $ log $ fromString $ "Error: Item " ++ (show i) ++ " not found in original plan. The item will not be included in the correction plan. This is a bug, please report it!" return m'' Just (batchName, batchItem) -> let existingBatch = Map.lookup batchName m'' in case existingBatch of Nothing -> return $ Map.insert batchName (Batch batchName [batchItem]) m'' Just (Batch _ bis) -> return $ Map.insert batchName (Batch batchName (bis ++ [batchItem])) m'' -- | keeps the state of generating the correction scripts and batches during the processing of the log records data CorrectionState = CorrectionState { correctionState_Config :: CorrectionConfig, -- ^ the current configuration correctionState_Rec :: Int, -- ^ the current record number correctionState_Truncated :: Set String, -- ^ the 'Set' of tables for which truncate script have been generated (due to duplication) correctionState_UniqueRecords :: Set (String, String), -- ^ the 'Set' of (source table, condition) found in processing the logs correctionState_GeneratedItems:: Set (String, String), -- ^ the 'Set' of (table name, where condition) representing items already added to the new correction plan correctionState_SkippedItems :: Set (String, String), -- ^ the 'Set' of (table name, where condition) representing items not found in the transfer log correctionState_TableItems :: Map String (Set (String, String)) -- ^ the 'Map' from table name to the 'Set' of (table name, where condition) indicating the items needed to reload the table } -- | create a map from the items in the batch to the (batch name, 'BatchItem') pair; -- for each item, it will be considered -- only the table name and the where condition itemsMap :: TransferPlan -> Map (String, String) (String, BatchItem) itemsMap (TransferPlan _ _ bs) = foldl' addBatchToItemsMap Map.empty bs -- | adds the items inside a batch to an items map in the sense of 'itemsMap' addBatchToItemsMap :: Map (String, String) (String, BatchItem) -> Batch -> Map (String, String) (String, BatchItem) addBatchToItemsMap m (Batch name items) = foldl' (\ m' b@(BatchItem t _ w) -> Map.insert (makeItemKey t w) (name, b) m') m items -- | create a map from the table names to the list of (table name, where condition) pairs from -- all 'BatchItem' elements in the original plan used to load the given table tablesMap :: TransferPlan -> Map String (Set (String, String)) tablesMap (TransferPlan _ _ bs) = foldl' addBatchToTablesMap Map.empty bs -- | add the items inside the batch to the 'tablesMap' result addBatchToTablesMap :: Map String (Set (String, String)) -> Batch -> Map String (Set (String, String)) addBatchToTablesMap m (Batch _ items) = foldl' (\ m' (BatchItem t _ w) -> Map.insertWith Set.union t (Set.singleton $ makeItemKey t w) m') m items -- | make an item key, that is a (table name, where condition) pair, from the table name and -- the where condition as found in a 'BatchItem' makeItemKey :: String -> Maybe String -> (String, String) makeItemKey t w = (t, maybe "(-)" (\ w' -> "(" ++ w' ++ ")") w) -- | it sequentially traverses the log records and generates the sql correction script and -- the correction plan processLogRecords :: (MonadIO m, MonadFail m) => Records LogRecord -> StateT CorrectionState m () processLogRecords (Cons r rs) = do n <- gets correctionState_Rec modify' (\ s -> s {correctionState_Rec = n + 1}) processLogRecord r processLogRecords rs processLogRecords (Nil err rest) = do n <- gets correctionState_Rec let restStr = map (toEnum.fromIntegral) $ B.unpack $ B.take 50 rest case err of Nothing -> do liftIO $ log $ fromString $ "All log records have been processed at record " ++ (show n) ++ ", at: '" ++ restStr ++ "'" generatedItems <- gets correctionState_GeneratedItems skippedItems <- gets correctionState_SkippedItems modify' (\ s -> s { correctionState_GeneratedItems = generatedItems `Set.union` skippedItems}) Just msg -> do liftIO $ log $ fromString $ "Parsing log records failed with error: " ++ msg ++ " when reading record " ++ (show $ n + 1) ++ " at: '" ++ restStr ++ "'" liftIO $ log $ fromString "Warning: because of the error parsing the tranfer log, no skipped items are included in the generated corrections plan." -- | process a single log record; it checks if the record is duplicated and, if it is, it -- generates the sql script and the plan to correct the duplicate processing of the record processLogRecord :: (MonadIO m) => Either String LogRecord -> StateT CorrectionState m () processLogRecord (Left msg) = do n <- gets correctionState_Rec liftIO $ log $ fromString $ "Error processing log record " ++ (show n) ++ ": " ++ msg processLogRecord (Right r@(LogRecord _ t _ _ _ w)) = do urs <- gets correctionState_UniqueRecords n <- gets correctionState_Rec if Set.member (t, w) urs then processDuplicatedRecord r else do modify' (\s -> s { correctionState_UniqueRecords = Set.insert (t, w) urs}) srs <- gets correctionState_SkippedItems if Set.member (t, w) srs then modify' (\ s -> s {correctionState_SkippedItems = Set.delete (t, w) srs}) else liftIO $ log $ fromString $ "WARN at record " ++ (show n) ++ ": item not duplicated but not found in the skipped items (" ++ (show r) ++ ")" -- | processes a duplicated record; generates the sql script to truncate the table -- containing duplicated records and generates the items to reimport the table, based -- on the original plan; updates the 'CorrectionState' structure processDuplicatedRecord :: (MonadIO m) => LogRecord -> StateT CorrectionState m () processDuplicatedRecord r@(LogRecord _ t _ _ _ w) = do liftIO $ log $ fromString $ "Item <" ++ (show r) ++ "> is duplicated" truncated <- gets correctionState_Truncated if Set.member t truncated then liftIO $ log $ fromString $ "Table " ++ t ++ " already reloaded in the new plan by a previous duplicated item. Skip the table this time." else do sqlh <- gets (correction_ScriptHandle . correctionState_Config) liftIO $ hPutStrLn sqlh $ "TRUNCATE TABLE " ++ t ++ ";" generateItemsForDuplicatedRecord r modify' (\ s -> s { correctionState_Truncated = Set.insert t truncated}) -- | updates the 'CorrectionState' state by generating new plan items to reload -- data for the table where duplicated records were found generateItemsForDuplicatedRecord :: (MonadIO m) => LogRecord -> StateT CorrectionState m () generateItemsForDuplicatedRecord (LogRecord _ t _ _ _ _) = do tableItems <- gets correctionState_TableItems let items' = Map.lookup t tableItems case items' of Nothing -> liftIO $ log $ fromString $ "Error: duplicated items found in the log for table " ++ t ++ " but no items for this table could be found in the original plan. This seems to be a bug. Please report it!" Just items -> do generatedItems <- gets correctionState_GeneratedItems modify' (\s -> s {correctionState_GeneratedItems = Set.union generatedItems items}) -- | a transfer log record; a transfer log record is generated for each transferred item -- in a batch, after the trasfer succeeded; analyzing the log we can spot the erros, that is -- items that have not been transferred successfully or items that have been transferred twice -- because the plan was ran multiple times data LogRecord = LogRecord { log_Batch :: !String, log_SourceTable :: !String, log_DestTable :: !String, log_Count :: !Int, log_Size :: !Int, log_Where :: !String } deriving (Show, Generic) instance FromRecord LogRecord