module HSBencher.Backend.Fusion
(
defaultFusionPlugin
, getTableId, findTableId, makeTable, ensureColumns
, FusionConfig(..), stdRetry
, fusionSchema, resultToTuple
, PreppedTuple, Schema
, authenticate, prepBenchResult, uploadRows
, uploadBenchResult
, FusionPlug(), FusionCmdLnFlag(..),
)
where
import Control.Monad.Reader
import Control.Concurrent (threadDelay)
import qualified Control.Exception as E
import Data.Maybe (fromJust, fromMaybe)
import Data.Dynamic
import Data.Default (Default(..))
import qualified Data.Set as S
import qualified Data.Map as M
import qualified Data.List as L
import qualified Data.ByteString.Char8 as B
import Data.Time.Clock
import Data.Time.Format ()
import Network.Google.OAuth2 (getCachedTokens, refreshTokens, OAuth2Client(..), OAuth2Tokens(..))
import Network.Google.FusionTables (createTable, createColumn, listTables, listColumns,
bulkImportRows,
TableId, CellType(..), TableMetadata(..), ColumnMetadata(..))
import Network.HTTP.Conduit (HttpException)
import HSBencher.Types
import HSBencher.Internal.Logging (log)
import HSBencher.Internal.Fusion
import Prelude hiding (log)
import System.Console.GetOpt (OptDescr(Option), ArgDescr(..))
defaultFusionPlugin :: FusionPlug
defaultFusionPlugin = FusionPlug
instance Default FusionPlug where
def = defaultFusionPlugin
stdRetry :: String -> OAuth2Client -> OAuth2Tokens -> IO a ->
BenchM (Maybe a)
stdRetry msg client toks action = do
conf <- ask
let retryHook num exn = runReaderT (do
datetime <- lift$ getDateTime
log$ " [fusiontable] Retry #"++show num++" during <"++msg++"> due to HTTPException: " ++ show exn
log$ " [fusiontable] ("++datetime++") Retrying, but first, attempt token refresh..."
stdRetry "refresh tokens" client toks (refreshTokens client toks)
return ()
) conf
liftIO$ retryIORequest action retryHook $
[1,2,4,4,4,4,4,4,8,16]
++ replicate 30 5
getDateTime :: IO String
getDateTime = do
utc <- getCurrentTime
return $ show utc
retryIORequest :: IO a -> (Int -> HttpException -> IO ()) -> [Double] -> IO (Maybe a)
retryIORequest req retryHook times = loop 0 times
where
loop _ [] = return Nothing
loop !num (delay:tl) =
E.catch (fmap Just req) $ \ (exn::HttpException) -> do
retryHook num exn
threadDelay (round$ delay * 1000 * 1000)
loop (num+1) tl
fromJustErr :: String -> Maybe t -> t
fromJustErr msg Nothing = error msg
fromJustErr _ (Just x) = x
getTableId :: OAuth2Client -> String -> BenchM (TableId, [String])
getTableId auth tablename = do
x <- findTableId auth tablename
tid <- case x of
Nothing -> makeTable auth tablename
Just iD -> return iD
order <- ensureColumns auth tid fusionSchema
return (tid, order)
findTableId :: OAuth2Client -> String -> BenchM (Maybe TableId)
findTableId auth tablename = do
log$ " [fusiontable] Fetching access tokens, client ID/secret: "++show (clientId auth, clientSecret auth)
toks <- liftIO$ getCachedTokens auth
log$ " [fusiontable] Retrieved: "++show toks
let atok = B.pack $ accessToken toks
allTables <- fmap (fromJustErr "[fusiontable] getTableId, API call to listTables failed.") $
stdRetry "listTables" auth toks $ listTables atok
log$ " [fusiontable] Retrieved metadata on "++show (length allTables)++" tables"
case filter (\ t -> tab_name t == tablename) allTables of
[] -> do log$ " [fusiontable] No table with name "++show tablename
return Nothing
[t] -> do let tid = (tab_tableId t)
log$ " [fusiontable] Found one table with name "++show tablename ++", ID: "++show tid
return (Just tid)
makeTable :: OAuth2Client -> String -> BenchM TableId
makeTable auth tablename = do
toks <- liftIO$ getCachedTokens auth
let atok = B.pack $ accessToken toks
log$ " [fusiontable] No table with name "++show tablename ++" found, creating..."
Just TableMetadata{tab_tableId} <- stdRetry "createTable" auth toks $
createTable atok tablename fusionSchema
log$ " [fusiontable] Table created with ID "++show tab_tableId
return tab_tableId
ensureColumns :: OAuth2Client -> TableId -> [(String, CellType)] -> BenchM [String]
ensureColumns auth tid ourSchema = do
log$ " [fusiontable] ensureColumns: Ensuring schema: "++show ourSchema
toks <- liftIO$ getCachedTokens auth
log$ " [fusiontable] ensureColumns: Retrieved: "++show toks
let ourColNames = map fst ourSchema
let atok = B.pack $ accessToken toks
let ourSet = S.fromList ourColNames
log$ " [fusiontable] ensureColumns: Checking columns... "
targetColNames <- fmap (map col_name) $ liftIO$ listColumns atok tid
let targetSet = S.fromList targetColNames
missing = S.difference ourSet targetSet
misslist = L.filter (`S.member` missing) ourColNames
extra = S.difference targetSet ourSet
unless (targetColNames == ourColNames) $
log$ "WARNING: HSBencher upload schema (1) did not match server side schema (2):\n (1) "++
show ourSchema ++"\n (2) " ++ show targetColNames
++ "\n HSBencher will try to make do..."
unless (S.null missing) $ do
log$ "WARNING: These fields are missing server-side, creating them: "++show misslist
forM_ misslist $ \ colname -> do
Just ColumnMetadata{col_name, col_columnId} <- stdRetry "createColumn" auth toks $
createColumn atok tid (colname, STRING)
log$ " -> Created column with name,id: "++show (col_name, col_columnId)
unless (S.null extra) $ do
log$ "WARNING: The fusion table has extra fields that HSBencher does not know about: "++
show (S.toList extra)
log$ " Expect null-string entries in these fields! "
return (targetColNames ++ misslist)
uploadBenchResult :: BenchmarkResult -> BenchM ()
uploadBenchResult br = do
(_toks,auth,tid) <- authenticate
let schema = benchmarkResultToSchema br
order <- ensureColumns auth tid schema
let row = prepBenchResult order br
flg <- uploadRows [row]
unless flg $ error "uploadBenchResult: failed to upload rows"
uploadRows :: [PreppedTuple] -> BenchM Bool
uploadRows rows = do
(toks,auth,tid) <- authenticate
let colss = map (map fst) rows
dats = map (map snd) rows
case colss of
[] -> return True
(schema:rst) -> do
unless (all (== schema) rst) $
error ("uploadRows: not all Schemas matched: "++ show (schema, filter (/= schema) rst))
res <- stdRetry "bulkImportRows" auth toks $ bulkImportRows
(B.pack$ accessToken toks) tid schema dats
case res of
Just _ -> do log$ " [fusiontable] Done uploading, run ID "++ (fromJust$ lookup "RUNID" (head rows))
++ " date "++ (fromJust$ lookup "DATETIME" (head rows))
return True
Nothing -> do log$ " [fusiontable] WARNING: Upload failed the maximum number of times. Continuing with benchmarks anyway"
return False
authenticate :: BenchM (OAuth2Tokens, OAuth2Client, TableId)
authenticate = do
conf <- ask
let fusionConfig = getMyConf FusionPlug conf
let FusionConfig{fusionClientID, fusionClientSecret, fusionTableID, serverColumns} = fusionConfig
let (Just cid, Just sec) = (fusionClientID, fusionClientSecret)
auth = OAuth2Client { clientId = cid, clientSecret = sec }
toks <- liftIO$ getCachedTokens auth
let atok = B.pack $ accessToken toks
let tid = fromJust fusionTableID
return (toks,auth,tid)
type PreppedTuple = [(String,String)]
type Schema = [String]
prepBenchResult :: Schema -> BenchmarkResult -> PreppedTuple
prepBenchResult serverColumns br@BenchmarkResult{..} =
let
ourData = M.fromList $ resultToTuple br
ourCols = M.keysSet ourData
targetSet = S.fromList serverColumns
missing = S.difference ourCols targetSet
tuple = [ (key, fromMaybe "" (M.lookup key ourData))
| key <- serverColumns ]
in if S.null missing
then tuple
else error $ "prepBenchResult: benchmark result contained columns absent on server: "++show missing
fusionSchema :: [(String, CellType)]
fusionSchema =
[ ("PROGNAME",STRING)
, ("VARIANT",STRING)
, ("ARGS",STRING)
, ("HOSTNAME",STRING)
, ("MINTIME", NUMBER)
, ("MEDIANTIME", NUMBER)
, ("MAXTIME", NUMBER)
, ("THREADS",NUMBER)
, ("RETRIES",NUMBER)
, ("RUNID",STRING)
, ("CI_BUILD_ID",STRING)
, ("DATETIME",DATETIME)
, ("MINTIME_PRODUCTIVITY", NUMBER)
, ("MEDIANTIME_PRODUCTIVITY", NUMBER)
, ("MAXTIME_PRODUCTIVITY", NUMBER)
, ("ALLTIMES", STRING)
, ("TRIALS", NUMBER)
, ("COMPILER",STRING)
, ("COMPILE_FLAGS",STRING)
, ("RUNTIME_FLAGS",STRING)
, ("ENV_VARS",STRING)
, ("BENCH_VERSION", STRING)
, ("BENCH_FILE", STRING)
, ("UNAME",STRING)
, ("PROCESSOR",STRING)
, ("TOPOLOGY",STRING)
, ("GIT_BRANCH",STRING)
, ("GIT_HASH",STRING)
, ("GIT_DEPTH",NUMBER)
, ("WHO",STRING)
, ("ETC_ISSUE",STRING)
, ("LSPCI",STRING)
, ("FULL_LOG",STRING)
, ("MEDIANTIME_ALLOCRATE", STRING)
, ("MEDIANTIME_MEMFOOTPRINT", STRING)
, ("ALLJITTIMES", STRING)
]
benchmarkResultToSchema :: BenchmarkResult -> [(String, CellType)]
benchmarkResultToSchema bm = fusionSchema ++ map custom (_CUSTOM bm)
where
custom (tag, IntResult _) = (tag,NUMBER)
custom (tag, DoubleResult _) = (tag,NUMBER)
custom (tag, StringResult _) = (tag, STRING)
data FusionPlug = FusionPlug
deriving (Eq,Show,Ord,Read)
instance Plugin FusionPlug where
type PlugConf FusionPlug = FusionConfig
type PlugFlag FusionPlug = FusionCmdLnFlag
plugName _ = "fusion"
plugCmdOpts _ = fusion_cli_options
plugUploadRow _ cfg row = runReaderT (uploadBenchResult row) cfg
plugInitialize p gconf = do
putStrLn " [fusiontable] Fusion table plugin initializing.. First, find config."
gc2 <- let fc@FusionConfig{fusionClientID, fusionClientSecret, fusionTableID} =
getMyConf p gconf in
case (benchsetName gconf, fusionTableID) of
(Nothing,Nothing) -> error "No way to find which fusion table to use! No name given and no explicit table ID."
(_, Just _tid) -> return gconf
(Just name,_) -> do
case (fusionClientID, fusionClientSecret) of
(Just cid, Just sec ) -> do
let auth = OAuth2Client { clientId=cid, clientSecret=sec }
(tid,cols) <- runReaderT (getTableId auth name) gconf
putStrLn$ " [fusiontable] -> Resolved name "++show name++" to table ID " ++show tid
return $! setMyConf p fc{ fusionTableID= Just tid, serverColumns= cols } gconf
(_,_) -> error "When --fusion-upload is activated --clientid and --clientsecret are required (or equiv ENV vars)"
let fc2 = getMyConf p gc2
let (Just cid, Just sec) = (fusionClientID fc2, fusionClientSecret fc2)
authclient = OAuth2Client { clientId = cid, clientSecret = sec }
putStrLn " [fusiontable] Second, lets retrieved cached auth tokens on the file system..."
_toks <- getCachedTokens authclient
return gc2
foldFlags _p flgs cnf0 =
foldr ($) cnf0 (map doFlag flgs)
where
doFlag FusionTest r = r
doFlag (ClientID cid) r = r { fusionClientID = Just cid }
doFlag (ClientSecret s) r = r { fusionClientSecret = Just s }
doFlag (FusionTables m) r =
case m of
Just tid -> r { fusionTableID = Just tid }
Nothing -> r
fusion_cli_options :: (String, [OptDescr FusionCmdLnFlag])
fusion_cli_options =
("Fusion Table Options:",
[ Option [] ["fusion-upload"] (OptArg FusionTables "TABLEID")
"enable fusion table upload. Optionally set TABLEID; otherwise create/discover it."
, Option [] ["clientid"] (ReqArg ClientID "ID")
("Use (and cache auth tokens for) Google client ID\n"++
"Alternatively set by env var HSBENCHER_GOOGLE_CLIENTID")
, Option [] ["clientsecret"] (ReqArg ClientSecret "STR")
("Use Google client secret\n"++
"Alternatively set by env var HSBENCHER_GOOGLE_CLIENTSECRET")
, Option [] ["fusion-test"] (NoArg FusionTest) "Test authentication and list tables if possible."
])
data FusionCmdLnFlag =
FusionTables (Maybe TableId)
| ClientID String
| ClientSecret String
| FusionTest
deriving (Show,Read,Ord,Eq, Typeable)