{-# LANGUAGE CPP #-}
module Control.Shell.Concurrent (
Future, ThreadId,
Control.Shell.Concurrent.forkIO, Control.Shell.Concurrent.killThread,
fork2, fork3,
future, await, check,
parallel, parallel_,
chunks
) where
import Control.Concurrent as CC
import Control.Monad
import Control.Shell
import Control.Shell.Internal (inEnv)
import Data.IORef
import System.Process
type FinalizerHandle = IORef ThreadId
data Future a = Future !FinalizerHandle !(MVar (Either ExitReason a))
future :: Shell a -> Shell (Future a)
future m = do
env <- getShellEnv
unsafeLiftIO $ do
v <- newEmptyMVar
tid <- CC.forkIO $ runSh env m >>= putMVar v
r <- newIORef tid
_ <- mkWeakIORef r (CC.killThread tid)
return $ Future r v
await :: Future a -> Shell a
await (Future h v) = joinResult $ unsafeLiftIO (readMVar v <* readIORef h)
check :: Future a -> Shell (Maybe a)
check (Future h v) = do
mx <- unsafeLiftIO $ tryReadMVar v <* readIORef h
case h `seq` mx of
Just x -> Just <$> joinResult (pure x)
_ -> pure Nothing
parallel :: [Shell a] -> Shell [a]
parallel = mapM future >=> mapM await
parallel_ :: [Shell a] -> Shell ()
parallel_ = mapM future >=> mapM_ await
chunks :: Int -> [a] -> [[a]]
chunks _ [] = []
chunks n xs | length xs > n = take n xs : chunks n (drop n xs)
| otherwise = [xs]
forkIO :: Shell () -> Shell ThreadId
forkIO m = do
env <- getShellEnv
unsafeLiftIO $ do
CC.forkIO $ void $ runSh env m
fork2 :: Shell () -> Shell (Handle, Handle, ThreadId)
fork2 m = do
env <- getShellEnv
(ri, wi) <- unsafeLiftIO createPipe
(ro, wo) <- unsafeLiftIO createPipe
mapM_ (flip hSetBuffering LineBuffering) [wi,wo]
tid <- Control.Shell.Concurrent.forkIO $ do
inEnv (env {envStdOut = wo, envStdIn = ri}) m
return (wi, ro, tid)
fork3 :: Shell () -> Shell (Handle, Handle, Handle, ThreadId)
fork3 m = do
env <- getShellEnv
(ri, wi) <- unsafeLiftIO createPipe
(ro, wo) <- unsafeLiftIO createPipe
(re, we) <- unsafeLiftIO createPipe
mapM_ (flip hSetBuffering LineBuffering) [wi,wo,we]
tid <- inEnv (env {envStdOut = wo, envStdErr = we, envStdIn = ri}) $ do
Control.Shell.Concurrent.forkIO m
return (wi, ro, re, tid)
killThread :: ThreadId -> Shell ()
killThread = liftIO . CC.killThread