-- | Lifted "Control.Concurrent".
--
-- For functions that spawn threads, the order of preference for their usage is
-- recommended as follows:
--
-- 1) High level functions from "Effectful.Concurrent.Async" such as
--    'Effectful.Concurrent.Async.withAsync',
--    'Effectful.Concurrent.Async.concurrently' or
--    'Effectful.Concurrent.Async.mapConcurrently'.
--
-- 2) Low level functions from "Effectful.Concurrent.Async" such as
--    'Effectful.Concurrent.Async.async'.
--
-- 3) Low level functions from "Effectful.Concurrent" such as 'forkIO'.
module Effectful.Concurrent
  ( -- * Effect
    Concurrent

    -- ** Handlers
  , runConcurrent

    -- * Basic concurrency operations
  , myThreadId
  , forkIO
  , forkFinally
  , forkIOWithUnmask
  , killThread
  , throwTo

    -- ** Threads with affinity
  , forkOn
  , forkOnWithUnmask
  , getNumCapabilities
  , setNumCapabilities
  , getNumProcessors
  , threadCapability

    -- * Scheduling
  , yield

    -- ** Waiting
  , threadDelay
  , threadWaitRead
  , threadWaitWrite
  , threadWaitReadSTM
  , threadWaitWriteSTM

    -- * Bound threads
  , forkOS
  , forkOSWithUnmask
  , isCurrentThreadBound
  , runInBoundThread
  , runInUnboundThread

    -- * Weak references to ThreadIds
  , mkWeakThreadId

    -- * Re-exports
  , C.rtsSupportsBoundThreads
  ) where

import Control.Exception (Exception, SomeException)
import Data.Bifunctor (second)
import System.Mem.Weak (Weak)
import System.Posix.Types (Fd)
import UnliftIO.STM (STM)
import qualified Control.Concurrent as C
import qualified GHC.Conc as GHC

import Effectful
import Effectful.Concurrent.Effect
import Effectful.Dispatch.Static
import Effectful.Dispatch.Static.Primitive
import Effectful.Dispatch.Static.Unsafe

----------------------------------------
-- Basic concurrency operations

-- | Lifted 'C.myThreadId'.
myThreadId :: Concurrent :> es => Eff es C.ThreadId
myThreadId :: forall (es :: [Effect]). (Concurrent :> es) => Eff es ThreadId
myThreadId = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO ThreadId
C.myThreadId

-- | Lifted 'C.forkIO'.
forkIO :: Concurrent :> es => Eff es () -> Eff es C.ThreadId
forkIO :: forall (es :: [Effect]).
(Concurrent :> es) =>
Eff es () -> Eff es ThreadId
forkIO Eff es ()
k = forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  IO () -> IO ThreadId
C.forkIO forall a b. (a -> b) -> a -> b
$ forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es ()
k Env es
esF

-- | Lifted 'C.forkFinally'.
forkFinally
  :: Concurrent :> es
  => Eff es a
  -> (Either SomeException a -> Eff es ())
  -> Eff es C.ThreadId
forkFinally :: forall (es :: [Effect]) a.
(Concurrent :> es) =>
Eff es a
-> (Either SomeException a -> Eff es ()) -> Eff es ThreadId
forkFinally Eff es a
k Either SomeException a -> Eff es ()
cleanup = forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
C.forkFinally (forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es a
k Env es
esF) ((forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
`unEff` Env es
esF) forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either SomeException a -> Eff es ()
cleanup)

-- | Lifted 'C.forkIOWithUnmask'.
forkIOWithUnmask
  :: Concurrent :> es
  => ((forall a. Eff es a -> Eff es a) -> Eff es ())
  -> Eff es C.ThreadId
forkIOWithUnmask :: forall (es :: [Effect]).
(Concurrent :> es) =>
((forall a. Eff es a -> Eff es a) -> Eff es ()) -> Eff es ThreadId
forkIOWithUnmask = forall a (es :: [Effect]).
(((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> Eff es ThreadId
liftForkWithUnmask ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
C.forkIOWithUnmask

-- | Lifted 'C.killThread'.
killThread :: Concurrent :> es => C.ThreadId -> Eff es ()
killThread :: forall (es :: [Effect]).
(Concurrent :> es) =>
ThreadId -> Eff es ()
killThread = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> IO ()
C.killThread

-- | Lifted 'C.throwTo'.
throwTo :: (Concurrent :> es, Exception e) => C.ThreadId -> e -> Eff es ()
throwTo :: forall (es :: [Effect]) e.
(Concurrent :> es, Exception e) =>
ThreadId -> e -> Eff es ()
throwTo ThreadId
tid = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e. Exception e => ThreadId -> e -> IO ()
C.throwTo ThreadId
tid

----------------------------------------
-- Threads with affinity

-- | Lifted 'C.forkOn'.
forkOn :: Concurrent :> es => Int -> Eff es () -> Eff es C.ThreadId
forkOn :: forall (es :: [Effect]).
(Concurrent :> es) =>
Int -> Eff es () -> Eff es ThreadId
forkOn Int
n Eff es ()
k = forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  Int -> IO () -> IO ThreadId
C.forkOn Int
n (forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es ()
k Env es
esF)

-- | Lifted 'C.forkOnWithUnmask'.
forkOnWithUnmask
  :: Concurrent :> es
  => Int
  -> ((forall a. Eff es a -> Eff es a) -> Eff es ())
  -> Eff es C.ThreadId
forkOnWithUnmask :: forall (es :: [Effect]).
(Concurrent :> es) =>
Int
-> ((forall a. Eff es a -> Eff es a) -> Eff es ())
-> Eff es ThreadId
forkOnWithUnmask Int
n = forall a (es :: [Effect]).
(((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> Eff es ThreadId
liftForkWithUnmask (Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
C.forkOnWithUnmask Int
n)

-- | Lifted 'C.getNumCapabilities'.
getNumCapabilities :: Concurrent :> es => Eff es Int
getNumCapabilities :: forall (es :: [Effect]). (Concurrent :> es) => Eff es Int
getNumCapabilities = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO Int
C.getNumCapabilities

-- | Lifted 'C.setNumCapabilities'.
setNumCapabilities :: Concurrent :> es => Int -> Eff es ()
setNumCapabilities :: forall (es :: [Effect]). (Concurrent :> es) => Int -> Eff es ()
setNumCapabilities = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
C.setNumCapabilities

-- | Lifted 'GHC.getNumProcessors'.
getNumProcessors :: Concurrent :> es => Eff es Int
getNumProcessors :: forall (es :: [Effect]). (Concurrent :> es) => Eff es Int
getNumProcessors = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO Int
GHC.getNumProcessors

-- | Lifted 'C.threadCapability'.
threadCapability :: Concurrent :> es => C.ThreadId -> Eff es (Int, Bool)
threadCapability :: forall (es :: [Effect]).
(Concurrent :> es) =>
ThreadId -> Eff es (Int, Bool)
threadCapability = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> IO (Int, Bool)
C.threadCapability

----------------------------------------
-- Scheduling

-- | Lifted 'C.yield'.
yield :: Concurrent :> es => Eff es ()
yield :: forall (es :: [Effect]). (Concurrent :> es) => Eff es ()
yield = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO ()
C.yield

----------------------------------------
-- Waiting

-- | Lifted 'C.threadDelay'.
threadDelay :: Concurrent :> es => Int -> Eff es ()
threadDelay :: forall (es :: [Effect]). (Concurrent :> es) => Int -> Eff es ()
threadDelay = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
C.threadDelay

-- | Lifted 'C.threadWaitRead'.
threadWaitRead :: Concurrent :> es => Fd -> Eff es ()
threadWaitRead :: forall (es :: [Effect]). (Concurrent :> es) => Fd -> Eff es ()
threadWaitRead = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. Fd -> IO ()
C.threadWaitRead

-- | Lifted 'C.threadWaitWrite'.
threadWaitWrite :: Concurrent :> es => Fd -> Eff es ()
threadWaitWrite :: forall (es :: [Effect]). (Concurrent :> es) => Fd -> Eff es ()
threadWaitWrite = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. Fd -> IO ()
C.threadWaitWrite

-- | Lifted 'C.threadWaitReadSTM'.
threadWaitReadSTM :: Concurrent :> es => Fd -> Eff es (STM (), Eff es ())
threadWaitReadSTM :: forall (es :: [Effect]).
(Concurrent :> es) =>
Fd -> Eff es (STM (), Eff es ())
threadWaitReadSTM Fd
fd = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall a b. (a -> b) -> a -> b
$ do
  forall (p :: Type -> Type -> Type) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall (f :: Type -> Type) a b. Functor f => (a -> b) -> f a -> f b
<$> Fd -> IO (STM (), IO ())
C.threadWaitReadSTM Fd
fd

-- | Lifted 'C.threadWaitWriteSTM'.
threadWaitWriteSTM :: Concurrent :> es => Fd -> Eff es (STM (), Eff es ())
threadWaitWriteSTM :: forall (es :: [Effect]).
(Concurrent :> es) =>
Fd -> Eff es (STM (), Eff es ())
threadWaitWriteSTM Fd
fd = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall a b. (a -> b) -> a -> b
$ do
  forall (p :: Type -> Type -> Type) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall (f :: Type -> Type) a b. Functor f => (a -> b) -> f a -> f b
<$> Fd -> IO (STM (), IO ())
C.threadWaitWriteSTM Fd
fd

----------------------------------------
-- Bound threads

-- | Lifted 'C.forkOS'.
forkOS :: Concurrent :> es => Eff es () -> Eff es C.ThreadId
forkOS :: forall (es :: [Effect]).
(Concurrent :> es) =>
Eff es () -> Eff es ThreadId
forkOS Eff es ()
k = forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  IO () -> IO ThreadId
C.forkOS forall a b. (a -> b) -> a -> b
$ forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es ()
k Env es
esF

-- | Lifted 'E.forkOSWithUnmask'.
forkOSWithUnmask
  :: Concurrent :> es
  => ((forall a. Eff es a -> Eff es a) -> Eff es ())
  -> Eff es C.ThreadId
forkOSWithUnmask :: forall (es :: [Effect]).
(Concurrent :> es) =>
((forall a. Eff es a -> Eff es a) -> Eff es ()) -> Eff es ThreadId
forkOSWithUnmask = forall a (es :: [Effect]).
(((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> Eff es ThreadId
liftForkWithUnmask ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
C.forkOSWithUnmask

-- | Lifted 'C.isCurrentThreadBound'.
isCurrentThreadBound :: Concurrent :> es => Eff es Bool
isCurrentThreadBound :: forall (es :: [Effect]). (Concurrent :> es) => Eff es Bool
isCurrentThreadBound = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO Bool
C.isCurrentThreadBound

-- | Lifted 'C.runInBoundThread'.
runInBoundThread :: Concurrent :> es => Eff es a -> Eff es a
runInBoundThread :: forall (es :: [Effect]) a.
(Concurrent :> es) =>
Eff es a -> Eff es a
runInBoundThread Eff es a
k = forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  forall c. IO c -> IO c
C.runInBoundThread forall a b. (a -> b) -> a -> b
$ forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es a
k Env es
esF

-- | Lifted 'C.runInUnboundThread'.
runInUnboundThread :: Concurrent :> es => Eff es a -> Eff es a
runInUnboundThread :: forall (es :: [Effect]) a.
(Concurrent :> es) =>
Eff es a -> Eff es a
runInUnboundThread Eff es a
k = forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  forall c. IO c -> IO c
C.runInUnboundThread forall a b. (a -> b) -> a -> b
$ forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es a
k Env es
esF

----------------------------------------
-- Weak references to ThreadIds

-- | Lifted 'C.mkWeakThreadId'.
mkWeakThreadId :: Concurrent :> es => C.ThreadId -> Eff es (Weak C.ThreadId)
mkWeakThreadId :: forall (es :: [Effect]).
(Concurrent :> es) =>
ThreadId -> Eff es (Weak ThreadId)
mkWeakThreadId = forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> IO (Weak ThreadId)
C.mkWeakThreadId

----------------------------------------
-- Helpers

liftForkWithUnmask
  :: (((forall c. IO c -> IO c) -> IO a) -> IO C.ThreadId)
  -> ((forall c. Eff es c -> Eff es c) -> Eff es a)
  -> Eff es C.ThreadId
liftForkWithUnmask :: forall a (es :: [Effect]).
(((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> Eff es ThreadId
liftForkWithUnmask ((forall c. IO c -> IO c) -> IO a) -> IO ThreadId
fork (forall c. Eff es c -> Eff es c) -> Eff es a
action = forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  -- Unmask never runs its argument in a different thread.
  ((forall c. IO c -> IO c) -> IO a) -> IO ThreadId
fork forall a b. (a -> b) -> a -> b
$ \forall c. IO c -> IO c
unmask -> forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff ((forall c. Eff es c -> Eff es c) -> Eff es a
action forall a b. (a -> b) -> a -> b
$ forall a b (es :: [Effect]). (IO a -> IO b) -> Eff es a -> Eff es b
reallyUnsafeLiftMapIO forall c. IO c -> IO c
unmask) Env es
esF