-- | A coroutine based, single threaded scheduler for 'Process'es.
module Control.Eff.Concurrent.Process.SingleThreadedScheduler
  ( schedule
  , scheduleWithYield
  , defaultMain
  , singleThreadedIoScheduler
  , LoggingAndIo
  )
where

import           Control.Concurrent             ( yield )
import           Control.Eff
import           Control.Eff.Lift
import           Control.Eff.Log
import           Control.Eff.Concurrent.Process
import           Control.Lens            hiding ( (|>)
                                                , Empty
                                                )
import qualified Data.Sequence                 as Seq
import           Data.Sequence                  ( Seq(..) )
import qualified Data.Map.Strict               as Map
import           GHC.Stack
import           Data.Kind                      ( )
import           Data.Dynamic
import           Data.Maybe

-- | Execute a 'Process' in the current thread, all child processes spawned by
-- 'spawn' will be executed concurrently using a co-routine based, round-robin
-- scheduler.
schedule :: forall r . Eff (Process r ': r) () -> Eff r ()
schedule = scheduleWithYield (return ())

-- | Execute a 'Process' in the current thread, all child processes spawned by
-- 'spawn' will be executed concurrently using a co-routine based, round-robin
-- scheduler.
scheduleWithYield :: forall r . Eff r () -> Eff (Process r ': r) () -> Eff r ()
scheduleWithYield yieldEff mainProcessAction = do
  y <- runAsCoroutine mainProcessAction
  go 1 (Map.singleton 0 Seq.empty) (Seq.singleton (y, 0))
 where

  go
    :: ProcessId
    -> Map.Map ProcessId (Seq Dynamic)
    -> Seq (OnYield r, ProcessId)
    -> Eff r ()
  go _newPid _msgQs Empty = return ()

  go newPid msgQs allProcs@((processState, pid) :<| rest)
    = let
        handleExit = if pid == 0
          then return ()
          else go newPid (msgQs & at pid .~ Nothing) rest
        maybeYield = if pid == 0 then yieldEff else return ()
      in
        case processState of
          OnDone                     -> handleExit

          OnRaiseError _             -> handleExit

          OnExitError  _             -> handleExit

          OnSendShutdown targetPid k -> do
            let allButTarget =
                  Seq.filter (\(_, e) -> e /= pid && e /= targetPid) allProcs
                targets     = Seq.filter (\(_, e) -> e == targetPid) allProcs
                suicide     = targetPid == pid
                targetFound = suicide || not (Seq.null targets)
            if suicide
              then do
                nextK <- k ShutdownRequested
                go newPid msgQs (rest :|> (nextK, pid))
              else do
                let deliverTheGoodNews (targetState, tPid) = do
                      nextTargetState <- case targetState of
                        OnSendShutdown _ tk -> tk ShutdownRequested
                        OnYield tk          -> tk ShutdownRequested
                        OnSelf  tk          -> tk ShutdownRequested
                        OnSend _ _ tk       -> tk ShutdownRequested
                        OnRecv tk           -> tk ShutdownRequested
                        OnSpawn _ tk        -> tk ShutdownRequested
                        OnDone              -> return OnDone
                        OnExitError  er     -> return (OnExitError er)
                        OnRaiseError er     -> return (OnExitError er) -- return (error ("TODO write test "++er))
                      return (nextTargetState, tPid)
                nextTargets <- traverse deliverTheGoodNews targets
                nextK       <- k (ResumeWith targetFound)
                maybeYield
                go newPid
                   msgQs
                   (allButTarget Seq.>< (nextTargets :|> (nextK, pid)))


          OnSelf k -> do
            nextK <- k (ResumeWith pid)
            maybeYield
            go newPid msgQs (rest :|> (nextK, pid))

          OnYield k -> do
            yieldEff
            nextK <- k (ResumeWith ())
            go newPid msgQs (rest :|> (nextK, pid))

          OnSend toPid msg k -> do
            nextK <- k (ResumeWith (msgQs ^. at toPid . to isJust))
            maybeYield
            go newPid
               (msgQs & at toPid . _Just %~ (:|> msg))
               (rest :|> (nextK, pid))

          recv@(OnRecv k) -> case msgQs ^. at pid of
            Nothing -> do
              nextK <- k (OnError (show pid ++ " has no message queue!"))
              maybeYield
              go newPid msgQs (rest :|> (nextK, pid))
            Just Empty -> if Seq.length rest == 0
              then do
                nextK <- k (OnError ("Process " ++ show pid ++ " deadlocked!"))
                maybeYield
                go newPid msgQs (rest :|> (nextK, pid))
              else go newPid msgQs (rest :|> (recv, pid))

            Just (nextMessage :<| restMessages) -> do
              nextK <- k (ResumeWith nextMessage)
              maybeYield
              go newPid
                 (msgQs & at pid . _Just .~ restMessages)
                 (rest :|> (nextK, pid))

          OnSpawn f k -> do
            nextK <- k (ResumeWith newPid)
            fk    <- runAsCoroutine f
            maybeYield
            go (newPid + 1)
               (msgQs & at newPid .~ Just Seq.empty)
               (rest :|> (nextK, pid) :|> (fk, newPid))

data OnYield r where
  OnYield :: (ResumeProcess () -> Eff r (OnYield r))
         -> OnYield r
  OnSelf :: (ResumeProcess ProcessId -> Eff r (OnYield r))
         -> OnYield r
  OnSpawn :: Eff (Process r ': r) ()
          -> (ResumeProcess ProcessId -> Eff r (OnYield r))
          -> OnYield r
  OnDone :: OnYield r
  OnExitError :: String -> OnYield r
  OnRaiseError :: String -> OnYield r
  OnSend :: ProcessId -> Dynamic
         -> (ResumeProcess Bool -> Eff r (OnYield r))
         -> OnYield r
  OnRecv :: (ResumeProcess Dynamic -> Eff r (OnYield r))
         -> OnYield r
  OnSendShutdown :: ProcessId -> (ResumeProcess Bool -> Eff r (OnYield r)) -> OnYield r

runAsCoroutine :: forall r v . Eff (Process r ': r) v -> Eff r (OnYield r)
runAsCoroutine m = handle_relay (const $ return OnDone) cont m
 where
  cont :: Process r x -> (x -> Eff r (OnYield r)) -> Eff r (OnYield r)
  cont YieldProcess         k  = return (OnYield k)
  cont SelfPid              k  = return (OnSelf k)
  cont (Spawn e)            k  = return (OnSpawn e k)
  cont Shutdown             _k = return OnDone
  cont (ExitWithError e   ) _k = return (OnExitError e)
  cont (RaiseError    e   ) _k = return (OnRaiseError e)
  cont (SendMessage tp msg) k  = return (OnSend tp msg k)
  cont ReceiveMessage       k  = return (OnRecv k)
  cont (SendShutdown pid)   k  = return (OnSendShutdown pid k)

-- | The concrete list of 'Eff'ects for running this pure scheduler on @IO@ and
-- with string logging.
type LoggingAndIo =
              '[ Logs String
               , Lift IO
               ]

-- | A 'SchedulerProxy' for 'LoggingAndIo'.
singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo
singleThreadedIoScheduler = SchedulerProxy

-- | Execute a 'Process' using 'schedule' on top of 'Lift' @IO@ and 'Logs'
-- @String@ effects.
defaultMain
  :: HasCallStack
  => Eff '[Process '[Logs String, Lift IO], Logs String, Lift IO] ()
  -> IO ()
defaultMain go =
  runLift $ handleLogsWith (scheduleWithYield (lift yield) go) ($ putStrLn)