module LogicGrowsOnTrees.Parallel.Common.Worker
(
ProgressUpdate(..)
, ProgressUpdateFor
, StolenWorkload(..)
, StolenWorkloadFor
, WorkerRequestQueue
, WorkerRequestQueueFor
, WorkerEnvironment(..)
, WorkerEnvironmentFor
, WorkerTerminationReason(..)
, WorkerTerminationReasonFor
, WorkerPushActionFor
, forkWorkerThread
, sendAbortRequest
, sendProgressUpdateRequest
, sendWorkloadStealRequest
, exploreTreeGeneric
) where
import Prelude hiding (catch)
import Control.Arrow ((&&&))
import Control.Concurrent (forkIO,ThreadId,yield)
import Control.Concurrent.MVar (newEmptyMVar,putMVar,takeMVar)
import Control.Exception (AsyncException(ThreadKilled,UserInterrupt),catch,fromException)
import Control.Monad (liftM)
import Control.Monad.IO.Class
import Data.Composition
import Data.Derive.Serialize
import Data.DeriveTH
import Data.Functor ((<$>))
import Data.IORef (atomicModifyIORef,IORef,newIORef,readIORef)
import qualified Data.IVar as IVar
import Data.IVar (IVar)
import Data.Maybe (fromMaybe)
import Data.Monoid ((<>),Monoid(..))
import Data.Sequence ((|>),(><))
import Data.Serialize
import qualified Data.Sequence as Seq
import Data.Void (Void,absurd)
import qualified System.Log.Logger as Logger
import System.Log.Logger (Priority(DEBUG))
import System.Log.Logger.TH
import LogicGrowsOnTrees hiding (exploreTree,exploreTreeT,exploreTreeUntilFirst,exploreTreeTUntilFirst)
import LogicGrowsOnTrees.Checkpoint
import LogicGrowsOnTrees.Parallel.ExplorationMode
import LogicGrowsOnTrees.Parallel.Purity
import LogicGrowsOnTrees.Path
import LogicGrowsOnTrees.Workload
data ProgressUpdate progress = ProgressUpdate
{ progressUpdateProgress :: progress
, progressUpdateRemainingWorkload :: Workload
} deriving (Eq,Show)
$( derive makeSerialize ''ProgressUpdate )
type ProgressUpdateFor exploration_mode = ProgressUpdate (ProgressFor exploration_mode)
data StolenWorkload progress = StolenWorkload
{ stolenWorkloadProgressUpdate :: ProgressUpdate progress
, stolenWorkload :: Workload
} deriving (Eq,Show)
$( derive makeSerialize ''StolenWorkload )
type StolenWorkloadFor exploration_mode = StolenWorkload (ProgressFor exploration_mode)
data WorkerRequest progress =
AbortRequested
| ProgressUpdateRequested (ProgressUpdate progress → IO ())
| WorkloadStealRequested (Maybe (StolenWorkload progress) → IO ())
type WorkerRequestQueue progress = IORef [WorkerRequest progress]
type WorkerRequestQueueFor exploration_mode = WorkerRequestQueue (ProgressFor exploration_mode)
data WorkerEnvironment progress = WorkerEnvironment
{ workerInitialPath :: Path
, workerThreadId :: ThreadId
, workerPendingRequests :: WorkerRequestQueue progress
, workerTerminationFlag :: IVar ()
}
type WorkerEnvironmentFor exploration_mode = WorkerEnvironment (ProgressFor exploration_mode)
data WorkerTerminationReason worker_final_progress =
WorkerFinished worker_final_progress
| WorkerFailed String
| WorkerAborted
deriving (Eq,Show)
instance Functor WorkerTerminationReason where
fmap f (WorkerFinished x) = WorkerFinished (f x)
fmap _ (WorkerFailed x) = WorkerFailed x
fmap _ WorkerAborted = WorkerAborted
type WorkerTerminationReasonFor exploration_mode = WorkerTerminationReason (WorkerFinishedProgressFor exploration_mode)
type family WorkerPushActionFor exploration_mode :: *
type instance WorkerPushActionFor (AllMode result) = Void → ()
type instance WorkerPushActionFor (FirstMode result) = Void → ()
type instance WorkerPushActionFor (FoundModeUsingPull result) = Void → ()
type instance WorkerPushActionFor (FoundModeUsingPush result) = ProgressUpdate (Progress result) → IO ()
data TreeFunctionsForPurity (m :: * → *) (n :: * → *) (α :: *) =
MonadIO n ⇒ TreeFunctionsForPurity
{ walk :: Path → TreeT m α → n (TreeT m α)
, step :: ExplorationTState m α → n (Maybe α,Maybe (ExplorationTState m α))
, run :: (∀ β. n β → IO β)
}
deriveLoggers "Logger" [DEBUG]
forkWorkerThread ::
ExplorationMode exploration_mode →
Purity m n →
(WorkerTerminationReasonFor exploration_mode → IO ()) →
TreeT m (ResultFor exploration_mode) →
Workload →
WorkerPushActionFor exploration_mode
→
IO (WorkerEnvironmentFor exploration_mode)
forkWorkerThread
exploration_mode
purity
finishedCallback
tree
(Workload initial_path initial_checkpoint)
push
= do
TreeFunctionsForPurity{..} ← case getTreeFunctionsForPurity purity of x → return x
pending_requests_ref ← newIORef []
let
loop1 (!result) cursor exploration_state =
liftIO (readIORef pending_requests_ref) >>= \pending_requests →
case pending_requests of
[] → loop3 result cursor exploration_state
_ → debugM "Worker thread's request queue is non-empty."
>> (liftM reverse . liftIO $ atomicModifyIORef pending_requests_ref (const [] &&& id))
>>= loop2 result cursor exploration_state
loop2 result cursor exploration_state@(ExplorationTState context checkpoint tree) requests =
case requests of
[] → liftIO yield >> loop3 result cursor exploration_state
AbortRequested:_ → do
debugM "Worker theread received abort request."
return WorkerAborted
ProgressUpdateRequested submitProgress:rest_requests → do
debugM "Worker thread received progress update request."
liftIO . submitProgress $ computeProgressUpdate exploration_mode result initial_path cursor context checkpoint
loop2 initial_intermediate_value cursor exploration_state rest_requests
WorkloadStealRequested submitMaybeWorkload:rest_requests → do
debugM "Worker thread received workload steal."
case tryStealWorkload initial_path cursor context of
Nothing → do
liftIO $ submitMaybeWorkload Nothing
loop2 result cursor exploration_state rest_requests
Just (new_cursor,new_context,workload) → do
liftIO . submitMaybeWorkload . Just $
StolenWorkload
(computeProgressUpdate exploration_mode result initial_path new_cursor new_context checkpoint)
workload
loop2 initial_intermediate_value new_cursor (ExplorationTState new_context checkpoint tree) rest_requests
loop3 result cursor exploration_state
= do
(maybe_solution,maybe_new_exploration_state) ← step exploration_state
case maybe_new_exploration_state of
Nothing →
let explored_checkpoint =
checkpointFromInitialPath initial_path
.
checkpointFromCursor cursor
$
Explored
in return . WorkerFinished $
case exploration_mode of
AllMode →
Progress
explored_checkpoint
(maybe result (result <>) maybe_solution)
FirstMode →
Progress
explored_checkpoint
maybe_solution
FoundModeUsingPull _ →
Progress
explored_checkpoint
(maybe result (result <>) maybe_solution)
FoundModeUsingPush _ →
Progress
explored_checkpoint
(fromMaybe mempty maybe_solution)
Just new_exploration_state@(ExplorationTState context checkpoint _) →
let new_checkpoint = checkpointFromSetting initial_path cursor context checkpoint
in case maybe_solution of
Nothing → loop1 result cursor new_exploration_state
Just (!solution) →
case exploration_mode of
AllMode → loop1 (result <> solution) cursor new_exploration_state
FirstMode → return . WorkerFinished $ Progress new_checkpoint (Just solution)
FoundModeUsingPull f →
let new_result = result <> solution
in if f new_result
then return . WorkerFinished $ Progress new_checkpoint new_result
else loop1 new_result cursor new_exploration_state
FoundModeUsingPush _ → do
liftIO . push $
ProgressUpdate
(Progress new_checkpoint solution)
(workloadFromSetting initial_path cursor context checkpoint)
loop1 () cursor new_exploration_state
finished_flag ← IVar.new
thread_id ← forkIO $ do
termination_reason ←
debugM "Worker thread has been forked." >>
(run $
walk initial_path tree
>>=
loop1 initial_intermediate_value Seq.empty
.
initialExplorationState initial_checkpoint
)
`catch`
(\e → case fromException e of
Just ThreadKilled → return WorkerAborted
Just UserInterrupt → return WorkerAborted
_ → return $ WorkerFailed (show e)
)
debugM $ "Worker thread has terminated with reason " ++
case termination_reason of
WorkerFinished _ → "finished."
WorkerFailed message → "failed: " ++ show message
WorkerAborted → "aborted."
IVar.write finished_flag ()
finishedCallback termination_reason
return $
WorkerEnvironment
initial_path
thread_id
pending_requests_ref
finished_flag
where
initial_intermediate_value = initialWorkerIntermediateValue exploration_mode
sendAbortRequest :: WorkerRequestQueue progress → IO ()
sendAbortRequest = flip sendRequest AbortRequested
sendProgressUpdateRequest ::
WorkerRequestQueue progress →
(ProgressUpdate progress → IO ()) →
IO ()
sendProgressUpdateRequest queue = sendRequest queue . ProgressUpdateRequested
sendWorkloadStealRequest ::
WorkerRequestQueue progress →
(Maybe (StolenWorkload progress) → IO ()) →
IO ()
sendWorkloadStealRequest queue = sendRequest queue . WorkloadStealRequested
sendRequest :: WorkerRequestQueue progress → WorkerRequest progress → IO ()
sendRequest queue request = atomicModifyIORef queue ((request:) &&& const ())
exploreTreeGeneric ::
( WorkerPushActionFor exploration_mode ~ (Void → ())
, ResultFor exploration_mode ~ α
) ⇒
ExplorationMode exploration_mode →
Purity m n →
TreeT m α →
IO (WorkerTerminationReason (FinalResultFor exploration_mode))
exploreTreeGeneric exploration_mode purity tree = do
final_progress_mvar ← newEmptyMVar
_ ← forkWorkerThread
exploration_mode
purity
(putMVar final_progress_mvar)
tree
entire_workload
absurd
final_progress ← takeMVar final_progress_mvar
return . flip fmap final_progress $ \progress →
case exploration_mode of
AllMode → progressResult progress
FirstMode → Progress (progressCheckpoint progress) <$> progressResult progress
FoundModeUsingPull f →
if f (progressResult progress)
then Right progress
else Left (progressResult progress)
_ → error "should never reach here due to incompatible types"
checkpointFromSetting ::
Path →
CheckpointCursor →
Context m α →
Checkpoint →
Checkpoint
checkpointFromSetting initial_path cursor context =
checkpointFromInitialPath initial_path
.
checkpointFromCursor cursor
.
checkpointFromContext context
computeProgressUpdate ::
ResultFor exploration_mode ~ α ⇒
ExplorationMode exploration_mode →
WorkerIntermediateValueFor exploration_mode →
Path →
CheckpointCursor →
Context m α →
Checkpoint →
ProgressUpdate (ProgressFor exploration_mode)
computeProgressUpdate exploration_mode result initial_path cursor context checkpoint =
ProgressUpdate
(case exploration_mode of
AllMode → Progress full_checkpoint result
FirstMode → full_checkpoint
FoundModeUsingPull _ → Progress full_checkpoint result
FoundModeUsingPush _ → Progress full_checkpoint mempty
)
(workloadFromSetting initial_path cursor context checkpoint)
where
full_checkpoint = checkpointFromSetting initial_path cursor context checkpoint
getTreeFunctionsForPurity :: Purity m n → TreeFunctionsForPurity m n α
getTreeFunctionsForPurity Pure = TreeFunctionsForPurity{..}
where
walk = return .* sendTreeDownPath
step = return . stepThroughTreeStartingFromCheckpoint
run = id
getTreeFunctionsForPurity (ImpureAtopIO run) = TreeFunctionsForPurity{..}
where
walk = sendTreeTDownPath
step = stepThroughTreeTStartingFromCheckpoint
tryStealWorkload ::
Path →
CheckpointCursor →
Context m α →
Maybe (CheckpointCursor,Context m α,Workload)
tryStealWorkload initial_path cursor context = go cursor (reverse context)
where
go _ [] = Nothing
go cursor (CacheContextStep cache:rest_context) =
go (cursor |> CachePointD cache) rest_context
go cursor (RightBranchContextStep:rest_context) =
go (cursor |> ChoicePointD RightBranch Explored) rest_context
go cursor (LeftBranchContextStep other_checkpoint _:rest_context) =
Just (cursor |> ChoicePointD LeftBranch Unexplored
,reverse rest_context
,Workload
((initial_path >< pathFromCursor cursor) |> ChoiceStep RightBranch)
other_checkpoint
)
workloadFromSetting ::
Path →
CheckpointCursor →
Context m α →
Checkpoint →
Workload
workloadFromSetting initial_path cursor context =
Workload (initial_path >< pathFromCursor cursor)
.
checkpointFromContext context