{-# LANGUAGE BlockArguments #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE ScopedTypeVariables #-} module Hercules.Agent.Worker ( main, ) where import Conduit import Control.Concurrent.STM hiding (check) import Control.Exception.Lifted qualified as EL import Control.Monad.Except import Control.Monad.IO.Unlift import Data.Conduit qualified import Data.Conduit.Extras (sinkChan, sinkChanTerminate, sourceChan) import Data.Conduit.Katip.Orphans () import Data.Conduit.Serialization.Binary ( conduitDecode, conduitEncode, ) import Data.IORef import Data.Map qualified as M import Data.Vector (Vector) import Data.Vector qualified as V import Hercules.API.Agent.LifeCycle.ServiceInfo qualified import Hercules.API.Logs.LogEntry (LogEntry) import Hercules.API.Logs.LogEntry qualified as LogEntry import Hercules.API.Logs.LogHello (LogHello (LogHello, clientProtocolVersion, storeProtocolVersion)) import Hercules.API.Logs.LogMessage (LogMessage) import Hercules.API.Logs.LogMessage qualified as LogMessage import Hercules.Agent.Sensitive import Hercules.Agent.Socket qualified as Socket import Hercules.Agent.Worker.Build (runBuild) import Hercules.Agent.Worker.Build.Logger qualified as Logger import Hercules.Agent.Worker.Conduit (takeCWhileStopEarly, withMessageLimit) import Hercules.Agent.Worker.Effect (runEffect) import Hercules.Agent.Worker.Env (HerculesState (..)) import Hercules.Agent.Worker.Error (ExceptionText (exceptionTextMessage), exceptionTextMessage, renderException) import Hercules.Agent.Worker.Evaluate (runEval) import Hercules.Agent.Worker.HerculesStore (setBuilderCallback, withHerculesStore) import Hercules.Agent.Worker.Logging (withKatip) import Hercules.Agent.WorkerProtocol.Command ( Command, ) import Hercules.Agent.WorkerProtocol.Command qualified as Command import Hercules.Agent.WorkerProtocol.Command.Build qualified as Build import Hercules.Agent.WorkerProtocol.Command.BuildResult qualified as BuildResult import Hercules.Agent.WorkerProtocol.Command.Effect qualified as Effect import Hercules.Agent.WorkerProtocol.Command.Eval ( Eval, ) import Hercules.Agent.WorkerProtocol.Command.Eval qualified as Eval import Hercules.Agent.WorkerProtocol.Event ( Event (Exception), ) import Hercules.Agent.WorkerProtocol.Event qualified as Event import Hercules.Agent.WorkerProtocol.LogSettings qualified as LogSettings import Hercules.CNix as CNix import Hercules.CNix.Expr (init, setExtraStackOverflowHandlerToSleep) import Hercules.CNix.Util (installDefaultSigINTHandler) import Hercules.CNix.Verbosity (setShowTrace) import Hercules.Error import Katip import Network.URI qualified import Protolude hiding (bracket, catch, check, evalState, wait, withAsync, yield) import System.Environment qualified as Environment import System.IO (BufferMode (LineBuffering), hSetBuffering) import System.Posix.Signals (Handler (Catch), installHandler, raiseSignal, sigINT, sigTERM) import System.Timeout (timeout) import UnliftIO.Async (wait, withAsync) import Prelude () import Prelude qualified main :: IO () main = do hSetBuffering stderr LineBuffering Hercules.CNix.Expr.init setShowTrace True _ <- installHandler sigTERM (Catch $ raiseSignal sigINT) Nothing installDefaultSigINTHandler Logger.initLogger setExtraStackOverflowHandlerToSleep args <- Environment.getArgs case args of [options] -> taskWorker options _ -> throwIO $ FatalError "worker: Unrecognized command line arguments" -- TODO Make this part of the worker protocol instead parseOptions :: (Read a, Read b, IsString a, IsString b) => Prelude.String -> [(a, b)] parseOptions options = Prelude.read options ++ [ -- narinfo-cache-negative-ttl: Always try requesting narinfos because it may have been built in the meanwhile ("narinfo-cache-negative-ttl", "0"), -- Build concurrency is controlled by hercules-ci-agent, so set it -- to 1 to avoid accidentally consuming too many resources at once. ("max-jobs", "1") ] setOptions :: [Char] -> IO () setOptions options = do for_ (parseOptions options) $ \(k, v) -> do setGlobalOption k v setOption k v taskWorker :: [Char] -> IO () taskWorker options = do setOptions options drvsCompleted_ <- newTVarIO mempty drvBuildAsyncs_ <- newTVarIO mempty drvRebuildAsyncs_ <- newTVarIO mempty drvOutputSubstituteAsyncs_ <- newTVarIO mempty drvsInProgress_ <- newIORef mempty withStore $ \wrappedStore_ -> withHerculesStore wrappedStore_ $ \herculesStore_ -> withKatip $ do liftIO $ setBuilderCallback herculesStore_ mempty ch <- liftIO newChan let st = HerculesState { drvsCompleted = drvsCompleted_, drvBuildAsyncs = drvBuildAsyncs_, drvRebuildAsyncs = drvRebuildAsyncs_, drvOutputSubstituteAsyncs = drvOutputSubstituteAsyncs_, drvsInProgress = drvsInProgress_, herculesStore = herculesStore_, wrappedStore = wrappedStore_, shortcutChannel = ch, extraNixOptions = parseOptions options } let runner :: KatipContextT IO () runner = ( ( do command <- runConduitRes -- Res shouldn't be necessary ( transPipe liftIO (sourceHandle stdin) .| conduitDecode .| printCommands .| await ) >>= \case Just c -> pure c Nothing -> panic "Not a valid starting command" runCommand st ch command ) `safeLiftedCatch` ( \e -> liftIO $ do textual <- renderException e writeChan ch (Just $ Exception $ exceptionTextMessage textual) exitFailure ) ) `EL.finally` ( do liftIO $ writeChan ch Nothing logLocM DebugS "runner done" ) writer = runConduitRes ( sourceChan ch .| conduitEncode .| concatMapC (\x -> [Chunk x, Flush]) .| transPipe liftIO (sinkHandleFlush stdout) ) void $ withAsync runner $ \runnerAsync -> do writer -- runner can stop writer only by passing Nothing in channel (finally) logLocM DebugS "Writer done" wait runnerAsync -- include the potential exception printCommands :: KatipContext m => ConduitT Command Command m () printCommands = mapMC ( \x -> do katipAddContext (sl "command" (show x :: Text)) $ logLocM DebugS "Received command" pure x ) connectCommand :: (MonadUnliftIO m, KatipContext m, MonadThrow m) => Chan (Maybe Event) -> ConduitM Command Event (ResourceT m) () -> m () connectCommand ch conduit = runConduitRes ( sourceHandle stdin .| conduitDecode .| printCommands .| conduit .| sinkChan ch ) runCommand :: (MonadUnliftIO m, KatipContext m, MonadThrow m) => HerculesState -> Chan (Maybe Event) -> Command -> m () -- runCommand' :: HerculesState -> Command -> ConduitM Command Event (ResourceT IO) () runCommand herculesState ch command = do -- TODO don't do this mainThread <- liftIO myThreadId UnliftIO unlift <- askUnliftIO protocolVersion <- liftIO do getStoreProtocolVersion (wrappedStore herculesState) case command of Command.Eval eval -> Logger.withLoggerConduit (logger (Eval.logSettings eval) protocolVersion) $ Logger.withTappedStderr Logger.tapper $ connectCommand ch $ do liftIO $ restrictEval eval void $ liftIO $ flip forkFinally (escalateAs \e -> FatalError $ "Failed to fork: " <> show e) $ unlift $ runConduitRes ( Data.Conduit.handleC ( \e -> do yield . Event.Error . exceptionTextMessage =<< liftIO (renderException e) liftIO $ throwTo mainThread e ) ( do runEval herculesState eval liftIO $ throwTo mainThread ExitSuccess ) .| sinkChanTerminate (shortcutChannel herculesState) ) awaitForever $ \case Command.BuildResult (BuildResult.BuildResult path attempt result) -> do katipAddContext (sl "path" path <> sl "result" (show result :: Text)) $ logLocM DebugS "Received remote build result" storePath <- liftIO $ CNix.parseStorePath (wrappedStore herculesState) (encodeUtf8 path) liftIO $ atomically $ modifyTVar (drvsCompleted herculesState) (M.insert storePath (attempt, result)) _ -> pass Command.Build build -> katipAddNamespace "Build" $ Logger.withLoggerConduit (logger (Build.logSettings build) protocolVersion) $ Logger.withTappedStderr Logger.tapper $ connectCommand ch $ runBuild (wrappedStore herculesState) build Command.Effect effect -> katipAddNamespace "Effect" $ Logger.withLoggerConduit (logger (Effect.logSettings effect) protocolVersion) $ Logger.withTappedStderr Logger.tapper $ connectCommand ch $ do runEffect (extraNixOptions herculesState) (wrappedStore herculesState) effect >>= \case ExitSuccess -> yield $ Event.EffectResult 0 ExitFailure n -> yield $ Event.EffectResult n _ -> panic "Not a valid starting command" restrictEval :: Eval -> IO () restrictEval eval = do setGlobalOption "restrict-eval" "true" setGlobalOption "allowed-uris" $ if Eval.allowInsecureBuiltinFetchers eval then allSchemes else safeSchemes where safeSchemes = "ssh:// https://" allSchemes = safeSchemes <> " http:// git://" logger :: (MonadUnliftIO m, KatipContext m) => LogSettings.LogSettings -> Int -> ConduitM () (Vector LogEntry) m () -> m () logger logSettings_ storeProtocolVersionValue entriesSource = do socketConfig <- liftIO $ makeSocketConfig logSettings_ storeProtocolVersionValue let withPings socket m = withAsync ( liftIO $ forever do -- TODO add ping constructor to Frame or use websocket pings let ping = LogMessage.LogEntries mempty threadDelay 30_000_000 atomically $ Socket.write socket ping ) (const m) Socket.withReliableSocket socketConfig $ \socket -> withPings socket do let conduit = entriesSource .| Logger.unbatch .| Logger.filterProgress .| dropMiddle .| renumber 0 .| batchAndEnd .| socketSink socket batch = Logger.batch .| mapC (LogMessage.LogEntries . V.fromList) batchAndEnd = (foldMapTap (Last . ims) `fuseUpstream` batch) >>= \case Last (Just (i, ms)) -> yield $ LogMessage.End {i = i + 1, ms = ms} Last Nothing -> yield $ LogMessage.End 0 0 where ims (Chunk logEntry) = Just (LogEntry.i logEntry, LogEntry.ms logEntry) ims _ = Nothing renumber i = await >>= traverse_ \case Flush -> yield Flush >> renumber i Chunk e -> do yield $ Chunk e {LogEntry.i = i} renumber (i + 1) runConduit conduit logLocM DebugS "Syncing" liftIO (timeout 600_000_000 $ Socket.syncIO socket) >>= \case Just _ -> pass Nothing -> panic "Could not push logs within 10 minutes after completion" logLocM DebugS "Logger done" dropMiddle :: MonadIO m => ConduitM (Flush LogEntry) (Flush LogEntry) m () dropMiddle = do -- rich logging _ <- takeCWhileStopEarly isChunk richLogLimit -- degrade to text logging (in case rich logging produces excessive non-textual data) visibleLinesOnly .| withMessageLimit isChunk textOnlyLogLimit tailLimit snipStart snip snipped -- Sum must be < 100_000 richLogLimit, textOnlyLogLimit, tailLimit :: Int richLogLimit = 40_000 textOnlyLogLimit = 49_900 tailLimit = 10_000 snipStart :: Monad m => ConduitT (Flush LogEntry) (Flush LogEntry) m () snipStart = yield $ Chunk $ LogEntry.Msg { i = 0, ms = 0, level = 0 {- error -}, msg = "hercules-ci-agent: Soft log limit has been reached. Final log lines will appear when done." } snipped :: Monad m => Int -> ConduitT (Flush LogEntry) (Flush LogEntry) m () snipped n = yield $ Chunk $ LogEntry.Msg { i = 0, ms = 0, level = 0 {- error -}, msg = "hercules-ci-agent: " <> show n <> " log lines were omitted before the last " <> show tailLimit <> "." } snip :: Monad m => Int -> ConduitT (Flush LogEntry) (Flush LogEntry) m () snip n = yield $ Chunk $ LogEntry.Msg { i = 0, ms = 0, level = 0 {- error -}, msg = "hercules-ci-agent: skipping " <> show n <> " log lines." } visibleLinesOnly :: Monad m => ConduitM (Flush LogEntry) (Flush LogEntry) m () visibleLinesOnly = filterC isVisible isVisible :: Flush LogEntry -> Bool isVisible Flush = True isVisible (Chunk LogEntry.Msg {msg = msg}) | msg /= "" = True isVisible (Chunk LogEntry.Start {text = msg}) | msg /= "" = True isVisible (Chunk LogEntry.Result {rtype = LogEntry.ResultTypeBuildLogLine}) = True isVisible _ = False isChunk :: Flush LogEntry -> Bool isChunk Chunk {} = True isChunk _ = False socketSink :: MonadIO m => Socket.Socket r w -> ConduitT w o m () socketSink socket = awaitForever $ liftIO . atomically . Socket.write socket -- | Perform a foldMap while yielding the original values ("tap"). -- -- '<>' is invoked with the new b on the right. foldMapTap :: (Monoid b, Monad m) => (a -> b) -> ConduitT a a m b foldMapTap f = go mempty where go b = await >>= \case Nothing -> pure b Just a -> do yield a go (b <> f a) makeSocketConfig :: MonadIO m => LogSettings.LogSettings -> Int -> IO (Socket.SocketConfig LogMessage Hercules.API.Agent.LifeCycle.ServiceInfo.ServiceInfo m) makeSocketConfig l storeProtocolVersionValue = do clientProtocolVersionValue <- liftIO getClientProtocolVersion baseURL <- case Network.URI.parseURI $ toS $ LogSettings.baseURL l of Just x -> pure x Nothing -> panic "LogSettings: invalid base url" pure Socket.SocketConfig { makeHello = pure $ LogMessage.Hello LogHello { clientProtocolVersion = clientProtocolVersionValue, storeProtocolVersion = storeProtocolVersionValue }, checkVersion = Socket.checkVersion', baseURL = baseURL, path = LogSettings.path l, token = encodeUtf8 $ reveal $ LogSettings.token l }