{- | This module contains the @@run@@ function that can be used to launch the website. It also contains the router and the background runner running the tasks. -} {-# LANGUAGE OverloadedStrings, DeriveAnyClass, TemplateHaskell #-} module Web.JobsUi.Run (run) where import Web.JobsUi.Actions import Web.JobsUi.Internal.Types import Data.Time import Data.Maybe import Data.Foldable import Control.Monad import Control.Monad.Trans import Web.Spock import Web.Spock.Config import Control.Concurrent.STM import qualified Data.Sequence as Seq import qualified Data.Text as T import qualified Data.ByteString as BS import qualified Data.Map as M import Control.Exception import Control.Concurrent import Network.HTTP.Types.Status import Data.FileEmbed import System.IO.Unsafe import Control.DeepSeq -- | This function launches the website. -- It get's a Map of named job types which are used -- to define different kinds of jobs a user can select to run. run :: [JobType] -- ^ The jiType should be unique -> IO () run jobtypeList = do let jobtypes = M.fromList $ map (\job@JobType{..} -> (jiType getJobInfo, job)) jobtypeList defaultJobs = noJobs initial <- ServerState <$> newTVarIO defaultJobs <*> newTVarIO (length defaultJobs + 1) spockCfg <- defaultSpockCfg () PCNoDatabase initial _ <- forkIO $ runner initial runSpock 1337 (spock spockCfg{spc_errorHandler=myError} $ app jobtypes) ------------ -- Router -- ------------ -- | We use file-embed to embed the css and other static files into the executable, -- so we can easily ship an executable only. myStaticDir :: M.Map T.Text BS.ByteString myStaticDir = M.mapKeys T.pack $ M.fromList $ $(embedDir "static") -- | Defines the app an routing. Is called from @@run@@. app :: M.Map T.Text JobType -> SpockM () () ServerState () app jobtypes = do get root $ showHistory get ("job" var) $ \i -> showJob $ JobId i -- will either move a waiting job to the done list or -- throw an exeception to the executing thread if the job is currently running, -- to be taken care in the @@runner@@ when cancelled. -- Has no effect on done jobs. get ("job" var "cancel") $ \(JobId -> i) -> do jobsvar <- myjobsVar <$> getState toCancel <- liftIO $ atomically $ do jobs@Jobs{..} <- readTVar jobsvar if fmap jobId running == Just i then do pure $ jobThread =<< running else do case find ((==) i . jobId) $ toList waiting of Nothing -> pure Nothing Just job -> do writeTVar jobsvar $ jobs { waiting = flip Seq.deleteAt waiting (fromJust $ Seq.findIndexL ((==) i . jobId) waiting) , done = job : done } pure Nothing liftIO $ maybe (pure ()) (flip throwTo Job_Cancelled) toCancel redirect "/" getpost ("job" "create") $ do jobsMenu (M.keysSet jobtypes) getpost ("job" "create" var) $ \jobtype -> case M.lookup jobtype jobtypes of Nothing -> do setStatus status404 myError $ Status 404 "No such job type available." Just JobType{ .. } -> createJob getJobInfo -- lookup static things in our embeded @@myStaticDir@@ get wildcard $ \route -> do case M.lookup route myStaticDir of Just fileContent -> bytes fileContent Nothing -> do setStatus status404 myError $ Status 404 "could not find route to url" -- | May be thrown from @@app@@ to forked @@runner@@ thread executing a job data MyException = Job_Cancelled deriving (Show, Exception) -- | Handles running tasks. The only function that moves tasks to running status and from it runner :: ServerState -> IO () runner ServerState{myjobsVar} = forever $ do -- move a running job to the done list (if there is one) endTime <- getZonedTime endedJob <- atomically $ do jobs <- readTVar myjobsVar writeTVar myjobsVar $ case running jobs of Nothing -> jobs Just job -> jobs { running = Nothing , done = job{jobTimeEnded = Just endTime} : done jobs } pure $ running jobs -- notify result forM_ endedJob $ \Job{..} -> case jobFinished of Nothing -> pure () Just result -> void $ forkIO $ jiNotify jobInfo jobPayload result -- move a waiting job to the running job Job{..} <- atomically $ do jobs <- readTVar myjobsVar case Seq.viewr $ waiting jobs of Seq.EmptyR -> retry rest Seq.:> job -> do startTime <- pure $! unsafePerformIO getZonedTime writeTVar myjobsVar $ jobs { waiting = rest , running = Just $ job { jobTimeStarted = Just startTime } } pure job -- execute the new running job mvar <- newEmptyMVar tid <- forkOS $ do catch ( flip finally (putMVar mvar ()) $ do result <- jiExec jobInfo jobPayload -- success deepseq result $ atomically $ modifyTVar myjobsVar $ \jobs -> jobs { running = fmap ( \job -> job { jobFinished = Just $ Success result , jobThread = Nothing } ) (running jobs) } ) -- failure cleanup ( \e -> do atomically $ modifyTVar myjobsVar $ \jobs -> jobs { running = fmap (\job -> job { jobFinished = Just (Error $ T.pack $ displayException (e :: SomeException)) , jobThread = Nothing } ) (running jobs) } ) -- we may use the job thread to cancel a job atomically $ modifyTVar myjobsVar $ \jobs -> jobs { running = fmap (\job -> job { jobThread = Just tid } ) (running jobs) } takeMVar mvar putStrLn $ "Done: #" <> show (getJobId jobId)