-- | Lifted "Control.Concurrent".
--
-- For functions that spawn threads, the order of preference for their usage is
-- recommended as follows:
--
-- 1) High level functions from "Effectful.Concurrent.Async" such as
--    'Effectful.Concurrent.Async.withAsync',
--    'Effectful.Concurrent.Async.concurrently' or
--    'Effectful.Concurrent.Async.mapConcurrently'.
--
-- 2) Low level functions from "Effectful.Concurrent.Async" such as
--    'Effectful.Concurrent.Async.async'.
--
-- 3) Low level functions from "Effectful.Concurrent" such as 'forkIO'.
module Effectful.Concurrent
  ( -- * Effect
    Concurrent

    -- ** Handlers
  , runConcurrent

    -- * Basic concurrency operations
  , myThreadId
  , forkIO
  , forkFinally
  , forkIOWithUnmask
  , killThread
  , throwTo

    -- ** Threads with affinity
  , forkOn
  , forkOnWithUnmask
  , getNumCapabilities
  , setNumCapabilities
  , getNumProcessors
  , threadCapability

    -- * Scheduling
  , yield

    -- ** Waiting
  , threadDelay
  , threadWaitRead
  , threadWaitWrite
  , threadWaitReadSTM
  , threadWaitWriteSTM

    -- * Bound threads
  , forkOS
  , forkOSWithUnmask
  , isCurrentThreadBound
  , runInBoundThread
  , runInUnboundThread

    -- * Weak references to ThreadIds
  , mkWeakThreadId

    -- * Re-exports
  , C.rtsSupportsBoundThreads
  ) where

import Control.Exception (Exception, SomeException)
import Data.Bifunctor (second)
import System.Mem.Weak (Weak)
import System.Posix.Types (Fd)
import UnliftIO.STM (STM)
import qualified Control.Concurrent as C
import qualified GHC.Conc as GHC

import Effectful
import Effectful.Concurrent.Effect
import Effectful.Dispatch.Static
import Effectful.Dispatch.Static.Primitive
import Effectful.Dispatch.Static.Unsafe

----------------------------------------
-- Basic concurrency operations

-- | Lifted 'C.myThreadId'.
myThreadId :: Concurrent :> es => Eff es C.ThreadId
myThreadId :: Eff es ThreadId
myThreadId = IO ThreadId -> Eff es ThreadId
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO ThreadId
C.myThreadId

-- | Lifted 'C.forkIO'.
forkIO :: Concurrent :> es => Eff es () -> Eff es C.ThreadId
forkIO :: Eff es () -> Eff es ThreadId
forkIO Eff es ()
k = (Env es -> IO ThreadId) -> Eff es ThreadId
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO ThreadId) -> Eff es ThreadId)
-> (Env es -> IO ThreadId) -> Eff es ThreadId
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  IO () -> IO ThreadId
C.forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Eff es () -> Env es -> IO ()
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es ()
k Env es
esF

-- | Lifted 'C.forkFinally'.
forkFinally
  :: Concurrent :> es
  => Eff es a
  -> (Either SomeException a -> Eff es ())
  -> Eff es C.ThreadId
forkFinally :: Eff es a
-> (Either SomeException a -> Eff es ()) -> Eff es ThreadId
forkFinally Eff es a
k Either SomeException a -> Eff es ()
cleanup = (Env es -> IO ThreadId) -> Eff es ThreadId
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO ThreadId) -> Eff es ThreadId)
-> (Env es -> IO ThreadId) -> Eff es ThreadId
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
C.forkFinally (Eff es a -> Env es -> IO a
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es a
k Env es
esF) ((Eff es () -> Env es -> IO ()
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
`unEff` Env es
esF) (Eff es () -> IO ())
-> (Either SomeException a -> Eff es ())
-> Either SomeException a
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either SomeException a -> Eff es ()
cleanup)

-- | Lifted 'C.forkIOWithUnmask'.
forkIOWithUnmask
  :: Concurrent :> es
  => ((forall a. Eff es a -> Eff es a) -> Eff es ())
  -> Eff es C.ThreadId
forkIOWithUnmask :: ((forall a. Eff es a -> Eff es a) -> Eff es ()) -> Eff es ThreadId
forkIOWithUnmask = (((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> ((forall a. Eff es a -> Eff es a) -> Eff es ())
-> Eff es ThreadId
forall a (es :: [Effect]).
(((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> Eff es ThreadId
liftForkWithUnmask ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
C.forkIOWithUnmask

-- | Lifted 'C.killThread'.
killThread :: Concurrent :> es => C.ThreadId -> Eff es ()
killThread :: ThreadId -> Eff es ()
killThread = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO () -> Eff es ())
-> (ThreadId -> IO ()) -> ThreadId -> Eff es ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> IO ()
C.killThread

-- | Lifted 'C.throwTo'.
throwTo :: (Concurrent :> es, Exception e) => C.ThreadId -> e -> Eff es ()
throwTo :: ThreadId -> e -> Eff es ()
throwTo ThreadId
tid = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO () -> Eff es ()) -> (e -> IO ()) -> e -> Eff es ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> e -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
C.throwTo ThreadId
tid

----------------------------------------
-- Threads with affinity

-- | Lifted 'C.forkOn'.
forkOn :: Concurrent :> es => Int -> Eff es () -> Eff es C.ThreadId
forkOn :: Int -> Eff es () -> Eff es ThreadId
forkOn Int
n Eff es ()
k = (Env es -> IO ThreadId) -> Eff es ThreadId
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO ThreadId) -> Eff es ThreadId)
-> (Env es -> IO ThreadId) -> Eff es ThreadId
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  Int -> IO () -> IO ThreadId
C.forkOn Int
n (Eff es () -> Env es -> IO ()
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es ()
k Env es
esF)

-- | Lifted 'C.forkOnWithUnmask'.
forkOnWithUnmask
  :: Concurrent :> es
  => Int
  -> ((forall a. Eff es a -> Eff es a) -> Eff es ())
  -> Eff es C.ThreadId
forkOnWithUnmask :: Int
-> ((forall a. Eff es a -> Eff es a) -> Eff es ())
-> Eff es ThreadId
forkOnWithUnmask Int
n = (((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> ((forall a. Eff es a -> Eff es a) -> Eff es ())
-> Eff es ThreadId
forall a (es :: [Effect]).
(((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> Eff es ThreadId
liftForkWithUnmask (Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
C.forkOnWithUnmask Int
n)

-- | Lifted 'C.getNumCapabilities'.
getNumCapabilities :: Concurrent :> es => Eff es Int
getNumCapabilities :: Eff es Int
getNumCapabilities = IO Int -> Eff es Int
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO Int
C.getNumCapabilities

-- | Lifted 'C.setNumCapabilities'.
setNumCapabilities :: Concurrent :> es => Int -> Eff es ()
setNumCapabilities :: Int -> Eff es ()
setNumCapabilities = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO () -> Eff es ()) -> (Int -> IO ()) -> Int -> Eff es ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
C.setNumCapabilities

-- | Lifted 'GHC.getNumProcessors'.
getNumProcessors :: Concurrent :> es => Eff es Int
getNumProcessors :: Eff es Int
getNumProcessors = IO Int -> Eff es Int
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO Int
GHC.getNumProcessors

-- | Lifted 'C.threadCapability'.
threadCapability :: Concurrent :> es => C.ThreadId -> Eff es (Int, Bool)
threadCapability :: ThreadId -> Eff es (Int, Bool)
threadCapability = IO (Int, Bool) -> Eff es (Int, Bool)
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO (Int, Bool) -> Eff es (Int, Bool))
-> (ThreadId -> IO (Int, Bool)) -> ThreadId -> Eff es (Int, Bool)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> IO (Int, Bool)
C.threadCapability

----------------------------------------
-- Scheduling

-- | Lifted 'C.yield'.
yield :: Concurrent :> es => Eff es ()
yield :: Eff es ()
yield = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO ()
C.yield

----------------------------------------
-- Waiting

-- | Lifted 'C.threadDelay'.
threadDelay :: Concurrent :> es => Int -> Eff es ()
threadDelay :: Int -> Eff es ()
threadDelay = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO () -> Eff es ()) -> (Int -> IO ()) -> Int -> Eff es ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO ()
C.threadDelay

-- | Lifted 'C.threadWaitRead'.
threadWaitRead :: Concurrent :> es => Fd -> Eff es ()
threadWaitRead :: Fd -> Eff es ()
threadWaitRead = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO () -> Eff es ()) -> (Fd -> IO ()) -> Fd -> Eff es ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Fd -> IO ()
C.threadWaitRead

-- | Lifted 'C.threadWaitWrite'.
threadWaitWrite :: Concurrent :> es => Fd -> Eff es ()
threadWaitWrite :: Fd -> Eff es ()
threadWaitWrite = IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO () -> Eff es ()) -> (Fd -> IO ()) -> Fd -> Eff es ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Fd -> IO ()
C.threadWaitWrite

-- | Lifted 'C.threadWaitReadSTM'.
threadWaitReadSTM :: Concurrent :> es => Fd -> Eff es (STM (), Eff es ())
threadWaitReadSTM :: Fd -> Eff es (STM (), Eff es ())
threadWaitReadSTM Fd
fd = IO (STM (), Eff es ()) -> Eff es (STM (), Eff es ())
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO (STM (), Eff es ()) -> Eff es (STM (), Eff es ()))
-> IO (STM (), Eff es ()) -> Eff es (STM (), Eff es ())
forall a b. (a -> b) -> a -> b
$ do
  (IO () -> Eff es ()) -> (STM (), IO ()) -> (STM (), Eff es ())
forall (p :: Type -> Type -> Type) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ ((STM (), IO ()) -> (STM (), Eff es ()))
-> IO (STM (), IO ()) -> IO (STM (), Eff es ())
forall (f :: Type -> Type) a b. Functor f => (a -> b) -> f a -> f b
<$> Fd -> IO (STM (), IO ())
C.threadWaitReadSTM Fd
fd

-- | Lifted 'C.threadWaitWriteSTM'.
threadWaitWriteSTM :: Concurrent :> es => Fd -> Eff es (STM (), Eff es ())
threadWaitWriteSTM :: Fd -> Eff es (STM (), Eff es ())
threadWaitWriteSTM Fd
fd = IO (STM (), Eff es ()) -> Eff es (STM (), Eff es ())
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO (STM (), Eff es ()) -> Eff es (STM (), Eff es ()))
-> IO (STM (), Eff es ()) -> Eff es (STM (), Eff es ())
forall a b. (a -> b) -> a -> b
$ do
  (IO () -> Eff es ()) -> (STM (), IO ()) -> (STM (), Eff es ())
forall (p :: Type -> Type -> Type) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second IO () -> Eff es ()
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ ((STM (), IO ()) -> (STM (), Eff es ()))
-> IO (STM (), IO ()) -> IO (STM (), Eff es ())
forall (f :: Type -> Type) a b. Functor f => (a -> b) -> f a -> f b
<$> Fd -> IO (STM (), IO ())
C.threadWaitWriteSTM Fd
fd

----------------------------------------
-- Bound threads

-- | Lifted 'C.forkOS'.
forkOS :: Concurrent :> es => Eff es () -> Eff es C.ThreadId
forkOS :: Eff es () -> Eff es ThreadId
forkOS Eff es ()
k = (Env es -> IO ThreadId) -> Eff es ThreadId
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO ThreadId) -> Eff es ThreadId)
-> (Env es -> IO ThreadId) -> Eff es ThreadId
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  IO () -> IO ThreadId
C.forkOS (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Eff es () -> Env es -> IO ()
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es ()
k Env es
esF

-- | Lifted 'E.forkOSWithUnmask'.
forkOSWithUnmask
  :: Concurrent :> es
  => ((forall a. Eff es a -> Eff es a) -> Eff es ())
  -> Eff es C.ThreadId
forkOSWithUnmask :: ((forall a. Eff es a -> Eff es a) -> Eff es ()) -> Eff es ThreadId
forkOSWithUnmask = (((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> ((forall a. Eff es a -> Eff es a) -> Eff es ())
-> Eff es ThreadId
forall a (es :: [Effect]).
(((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> Eff es ThreadId
liftForkWithUnmask ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
C.forkOSWithUnmask

-- | Lifted 'C.isCurrentThreadBound'.
isCurrentThreadBound :: Concurrent :> es => Eff es Bool
isCurrentThreadBound :: Eff es Bool
isCurrentThreadBound = IO Bool -> Eff es Bool
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ IO Bool
C.isCurrentThreadBound

-- | Lifted 'C.runInBoundThread'.
runInBoundThread :: Concurrent :> es => Eff es a -> Eff es a
runInBoundThread :: Eff es a -> Eff es a
runInBoundThread Eff es a
k = (Env es -> IO a) -> Eff es a
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO a) -> Eff es a) -> (Env es -> IO a) -> Eff es a
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  IO a -> IO a
forall c. IO c -> IO c
C.runInBoundThread (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Eff es a -> Env es -> IO a
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es a
k Env es
esF

-- | Lifted 'C.runInUnboundThread'.
runInUnboundThread :: Concurrent :> es => Eff es a -> Eff es a
runInUnboundThread :: Eff es a -> Eff es a
runInUnboundThread Eff es a
k = (Env es -> IO a) -> Eff es a
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO a) -> Eff es a) -> (Env es -> IO a) -> Eff es a
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  IO a -> IO a
forall c. IO c -> IO c
C.runInUnboundThread (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Eff es a -> Env es -> IO a
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff Eff es a
k Env es
esF

----------------------------------------
-- Weak references to ThreadIds

-- | Lifted 'C.mkWeakThreadId'.
mkWeakThreadId :: Concurrent :> es => C.ThreadId -> Eff es (Weak C.ThreadId)
mkWeakThreadId :: ThreadId -> Eff es (Weak ThreadId)
mkWeakThreadId = IO (Weak ThreadId) -> Eff es (Weak ThreadId)
forall a (es :: [Effect]). IO a -> Eff es a
unsafeEff_ (IO (Weak ThreadId) -> Eff es (Weak ThreadId))
-> (ThreadId -> IO (Weak ThreadId))
-> ThreadId
-> Eff es (Weak ThreadId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ThreadId -> IO (Weak ThreadId)
C.mkWeakThreadId

----------------------------------------
-- Helpers

liftForkWithUnmask
  :: (((forall c. IO c -> IO c) -> IO a) -> IO C.ThreadId)
  -> ((forall c. Eff es c -> Eff es c) -> Eff es a)
  -> Eff es C.ThreadId
liftForkWithUnmask :: (((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> Eff es ThreadId
liftForkWithUnmask ((forall c. IO c -> IO c) -> IO a) -> IO ThreadId
fork (forall c. Eff es c -> Eff es c) -> Eff es a
action = (Env es -> IO ThreadId) -> Eff es ThreadId
forall (es :: [Effect]) a. (Env es -> IO a) -> Eff es a
unsafeEff ((Env es -> IO ThreadId) -> Eff es ThreadId)
-> (Env es -> IO ThreadId) -> Eff es ThreadId
forall a b. (a -> b) -> a -> b
$ \Env es
es -> do
  Env es
esF <- Env es -> IO (Env es)
forall (es :: [Effect]). Env es -> IO (Env es)
cloneEnv Env es
es
  -- Unmask never runs its argument in a different thread.
  ((forall c. IO c -> IO c) -> IO a) -> IO ThreadId
fork (((forall c. IO c -> IO c) -> IO a) -> IO ThreadId)
-> ((forall c. IO c -> IO c) -> IO a) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall c. IO c -> IO c
unmask -> Eff es a -> Env es -> IO a
forall (es :: [Effect]) a. Eff es a -> Env es -> IO a
unEff ((forall c. Eff es c -> Eff es c) -> Eff es a
action ((forall c. Eff es c -> Eff es c) -> Eff es a)
-> (forall c. Eff es c -> Eff es c) -> Eff es a
forall a b. (a -> b) -> a -> b
$ (IO c -> IO c) -> Eff es c -> Eff es c
forall a b (es :: [Effect]). (IO a -> IO b) -> Eff es a -> Eff es b
reallyUnsafeLiftMapIO IO c -> IO c
forall c. IO c -> IO c
unmask) Env es
esF