{-# LANGUAGE CPP #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE FlexibleContexts #-}
module Streamly.Internal.System.Process
(
Config
, ProcessFailure (..)
, toBytes
, toBytes'
, toChunks
, toChunks'
, processBytes
, processBytes'
, processChunksWith
, processChunks
, processChunks'With
, processChunks'
)
where
import Control.Monad.Catch (MonadCatch, throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Word (Word8)
import Streamly.Data.Array.Foreign (Array)
import Streamly.Prelude (MonadAsync, parallel, IsStream, adapt)
import System.Exit (ExitCode(..))
import System.IO (hClose, Handle)
#ifdef USE_NATIVE
import Control.Exception (Exception(..), catch, throwIO, SomeException)
import System.Posix.Process (ProcessStatus(..))
import Streamly.Internal.System.Process.Posix
#else
import Control.Concurrent (forkIO)
import Control.Exception (Exception(..), catch, throwIO)
import Control.Monad (void, unless)
import Foreign.C.Error (Errno(..), ePIPE)
import GHC.IO.Exception (IOException(..), IOErrorType(..))
import System.Process
( ProcessHandle
, CreateProcess(..)
, StdStream (..)
, createProcess
, waitForProcess
, CmdSpec(..)
, terminateProcess
)
#endif
import qualified Streamly.Data.Array.Foreign as Array
import qualified Streamly.Prelude as Stream
import Streamly.Internal.System.IO (defaultChunkSize)
import qualified Streamly.Internal.Data.Array.Stream.Foreign
as ArrayStream (arraysOf)
import qualified Streamly.Internal.Data.Stream.IsStream as Stream (bracket')
import qualified Streamly.Internal.Data.Unfold as Unfold (either)
import qualified Streamly.Internal.FileSystem.Handle
as Handle (toChunks, putChunks)
#ifdef USE_NATIVE
type ProcessHandle = Process
newtype Config = Config Bool
mkConfig :: FilePath -> [String] -> Config
mkConfig _ _ = Config False
pipeStdErr :: Config -> Config
pipeStdErr (Config _) = Config True
#else
newtype Config = Config CreateProcess
mkConfig :: FilePath -> [String] -> Config
mkConfig :: FilePath -> [FilePath] -> Config
mkConfig FilePath
path [FilePath]
args = CreateProcess -> Config
Config (CreateProcess -> Config) -> CreateProcess -> Config
forall a b. (a -> b) -> a -> b
$ CreateProcess :: CmdSpec
-> Maybe FilePath
-> Maybe [(FilePath, FilePath)]
-> StdStream
-> StdStream
-> StdStream
-> Bool
-> Bool
-> Bool
-> Bool
-> Bool
-> Bool
-> Maybe GroupID
-> Maybe UserID
-> Bool
-> CreateProcess
CreateProcess
{ cmdspec :: CmdSpec
cmdspec = FilePath -> [FilePath] -> CmdSpec
RawCommand FilePath
path [FilePath]
args
, cwd :: Maybe FilePath
cwd = Maybe FilePath
forall a. Maybe a
Nothing
, env :: Maybe [(FilePath, FilePath)]
env = Maybe [(FilePath, FilePath)]
forall a. Maybe a
Nothing
, std_in :: StdStream
std_in = StdStream
CreatePipe
, std_out :: StdStream
std_out = StdStream
CreatePipe
, std_err :: StdStream
std_err = StdStream
Inherit
, close_fds :: Bool
close_fds = Bool
False
, create_group :: Bool
create_group = Bool
False
, child_user :: Maybe UserID
child_user = Maybe UserID
forall a. Maybe a
Nothing
, child_group :: Maybe GroupID
child_group = Maybe GroupID
forall a. Maybe a
Nothing
, delegate_ctlc :: Bool
delegate_ctlc = Bool
False
, new_session :: Bool
new_session = Bool
False
, detach_console :: Bool
detach_console = Bool
False
, create_new_console :: Bool
create_new_console = Bool
False
, use_process_jobs :: Bool
use_process_jobs = Bool
True
}
pipeStdErr :: Config -> Config
pipeStdErr :: Config -> Config
pipeStdErr (Config CreateProcess
cfg) = CreateProcess -> Config
Config (CreateProcess -> Config) -> CreateProcess -> Config
forall a b. (a -> b) -> a -> b
$ CreateProcess
cfg { std_err :: StdStream
std_err = StdStream
CreatePipe }
#endif
newtype ProcessFailure = ProcessFailure Int
deriving Int -> ProcessFailure -> ShowS
[ProcessFailure] -> ShowS
ProcessFailure -> FilePath
(Int -> ProcessFailure -> ShowS)
-> (ProcessFailure -> FilePath)
-> ([ProcessFailure] -> ShowS)
-> Show ProcessFailure
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ProcessFailure] -> ShowS
$cshowList :: [ProcessFailure] -> ShowS
show :: ProcessFailure -> FilePath
$cshow :: ProcessFailure -> FilePath
showsPrec :: Int -> ProcessFailure -> ShowS
$cshowsPrec :: Int -> ProcessFailure -> ShowS
Show
instance Exception ProcessFailure where
displayException :: ProcessFailure -> FilePath
displayException (ProcessFailure Int
exitCode) =
FilePath
"Process failed with exit code: " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> FilePath
forall a. Show a => a -> FilePath
show Int
exitCode
cleanupNormal :: MonadIO m =>
(Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupNormal :: (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupNormal (Maybe Handle
_, Maybe Handle
_, Maybe Handle
_, ProcessHandle
procHandle) = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
#ifdef USE_NATIVE
status <- wait procHandle
case status of
Exited ExitSuccess -> return ()
Exited (ExitFailure code) -> throwM $ ProcessFailure code
Terminated signal _ ->
throwM $ ProcessFailure (negate $ fromIntegral signal)
Stopped signal ->
throwM $ ProcessFailure (negate $ fromIntegral signal)
#else
ExitCode
exitCode <- ProcessHandle -> IO ExitCode
waitForProcess ProcessHandle
procHandle
case ExitCode
exitCode of
ExitCode
ExitSuccess -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
ExitFailure Int
code -> ProcessFailure -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (ProcessFailure -> IO ()) -> ProcessFailure -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> ProcessFailure
ProcessFailure Int
code
#endif
cleanupException :: MonadIO m =>
(Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupException :: (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupException (Just Handle
stdinH, Just Handle
stdoutH, Maybe Handle
stderrMaybe, ProcessHandle
ph) = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ProcessHandle -> IO ()
terminateProcess ProcessHandle
ph
Handle -> IO ()
hClose Handle
stdinH IO () -> (IOException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` IOException -> IO ()
eatSIGPIPE
Handle -> IO ()
hClose Handle
stdoutH
(Handle -> IO ()) -> Maybe Handle -> IO ()
forall (f :: * -> *) a.
Applicative f =>
(a -> f ()) -> Maybe a -> f ()
whenJust Handle -> IO ()
hClose Maybe Handle
stderrMaybe
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO ExitCode -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ExitCode -> IO ()) -> IO ExitCode -> IO ()
forall a b. (a -> b) -> a -> b
$ ProcessHandle -> IO ExitCode
waitForProcess ProcessHandle
ph)
where
whenJust :: (a -> f ()) -> Maybe a -> f ()
whenJust a -> f ()
action Maybe a
mb = f () -> (a -> f ()) -> Maybe a -> f ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> f ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) a -> f ()
action Maybe a
mb
isSIGPIPE :: IOException -> Bool
isSIGPIPE IOException
e =
case IOException
e of
IOError
{ ioe_type :: IOException -> IOErrorType
ioe_type = IOErrorType
ResourceVanished
, ioe_errno :: IOException -> Maybe CInt
ioe_errno = Just CInt
ioe
} -> CInt -> Errno
Errno CInt
ioe Errno -> Errno -> Bool
forall a. Eq a => a -> a -> Bool
== Errno
ePIPE
IOException
_ -> Bool
False
eatSIGPIPE :: IOException -> IO ()
eatSIGPIPE IOException
e = Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (IOException -> Bool
isSIGPIPE IOException
e) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IOException -> IO ()
forall e a. Exception e => e -> IO a
throwIO IOException
e
cleanupException (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
_ = FilePath -> m ()
forall a. HasCallStack => FilePath -> a
error FilePath
"cleanupProcess: Not reachable"
createProc' ::
(Config -> Config)
-> FilePath
-> [String]
-> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
createProc' :: (Config -> Config)
-> FilePath
-> [FilePath]
-> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
createProc' Config -> Config
modCfg FilePath
path [FilePath]
args = do
#ifdef USE_NATIVE
((inp, out, err, _excParent, _excChild), parent, child, failure) <-
mkStdioPipes cfg
proc <- newProcess child path args Nothing
`catch` (\(e :: SomeException) -> failure >> throwIO e)
parent
return (Just inp, Just out, err, proc)
#else
CreateProcess
-> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
createProcess CreateProcess
cfg
#endif
where
Config CreateProcess
cfg = Config -> Config
modCfg (Config -> Config) -> Config -> Config
forall a b. (a -> b) -> a -> b
$ FilePath -> [FilePath] -> Config
mkConfig FilePath
path [FilePath]
args
{-# INLINE putChunksClose #-}
putChunksClose :: (MonadIO m, IsStream t) =>
Handle -> t m (Array Word8) -> t m a
putChunksClose :: Handle -> t m (Array Word8) -> t m a
putChunksClose Handle
h t m (Array Word8)
input =
m () -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
Stream.before
(Handle -> SerialT m (Array Word8) -> m ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Handle -> SerialT m (Array a) -> m ()
Handle.putChunks Handle
h (t m (Array Word8) -> SerialT m (Array Word8)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m (Array Word8)
input) m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Handle -> IO ()
hClose Handle
h))
t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
{-# INLINE toChunksClose #-}
toChunksClose :: (IsStream t, MonadAsync m) => Handle -> t m (Array Word8)
toChunksClose :: Handle -> t m (Array Word8)
toChunksClose Handle
h = m () -> t m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadIO m, MonadBaseControl IO m) =>
m b -> t m a -> t m a
Stream.after (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h) (Handle -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Handle -> t m (Array Word8)
Handle.toChunks Handle
h)
{-# INLINE processChunksWithAction #-}
processChunksWithAction ::
(IsStream t, MonadCatch m, MonadAsync m)
=> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> t m a)
-> (Config -> Config)
-> FilePath
-> [String]
-> t m a
processChunksWithAction :: ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> t m a)
-> (Config -> Config) -> FilePath -> [FilePath] -> t m a
processChunksWithAction (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> t m a
run Config -> Config
modCfg FilePath
path [FilePath]
args =
m (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> m ())
-> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> m ())
-> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> m ())
-> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> t m a)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c d e a.
(IsStream t, MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c) -> (b -> m d) -> (b -> m e) -> (b -> t m a) -> t m a
Stream.bracket'
m (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
alloc (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
forall (m :: * -> *).
MonadIO m =>
(Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupNormal (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
forall (m :: * -> *).
MonadIO m =>
(Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupException (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
forall (m :: * -> *).
MonadIO m =>
(Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupException (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> t m a
run
where
alloc :: m (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
alloc = IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> m (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> m (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle))
-> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> m (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
forall a b. (a -> b) -> a -> b
$ (Config -> Config)
-> FilePath
-> [FilePath]
-> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
createProc' Config -> Config
modCfg FilePath
path [FilePath]
args
{-# INLINE processChunks'With #-}
processChunks'With ::
(IsStream t, MonadCatch m, MonadAsync m)
=> (Config -> Config)
-> FilePath
-> [String]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
processChunks'With :: (Config -> Config)
-> FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
processChunks'With Config -> Config
modifier FilePath
path [FilePath]
args t m (Array Word8)
input =
((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> t m (Either (Array Word8) (Array Word8)))
-> (Config -> Config)
-> FilePath
-> [FilePath]
-> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadCatch m, MonadAsync m) =>
((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> t m a)
-> (Config -> Config) -> FilePath -> [FilePath] -> t m a
processChunksWithAction (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> t m (Either (Array Word8) (Array Word8))
forall d.
(Maybe Handle, Maybe Handle, Maybe Handle, d)
-> t m (Either (Array Word8) (Array Word8))
run (Config -> Config
modifier (Config -> Config) -> (Config -> Config) -> Config -> Config
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Config -> Config
pipeStdErr) FilePath
path [FilePath]
args
where
run :: (Maybe Handle, Maybe Handle, Maybe Handle, d)
-> t m (Either (Array Word8) (Array Word8))
run (Just Handle
stdinH, Just Handle
stdoutH, Just Handle
stderrH, d
_) =
Handle
-> t m (Array Word8) -> t m (Either (Array Word8) (Array Word8))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadIO m, IsStream t) =>
Handle -> t m (Array Word8) -> t m a
putChunksClose Handle
stdinH t m (Array Word8)
input
t m (Either (Array Word8) (Array Word8))
-> t m (Either (Array Word8) (Array Word8))
-> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`parallel` (Array Word8 -> Either (Array Word8) (Array Word8))
-> t m (Array Word8) -> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map Array Word8 -> Either (Array Word8) (Array Word8)
forall a b. a -> Either a b
Left (Handle -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Handle -> t m (Array Word8)
toChunksClose Handle
stderrH)
t m (Either (Array Word8) (Array Word8))
-> t m (Either (Array Word8) (Array Word8))
-> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`parallel` (Array Word8 -> Either (Array Word8) (Array Word8))
-> t m (Array Word8) -> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map Array Word8 -> Either (Array Word8) (Array Word8)
forall a b. b -> Either a b
Right (Handle -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Handle -> t m (Array Word8)
toChunksClose Handle
stdoutH)
run (Maybe Handle, Maybe Handle, Maybe Handle, d)
_ = FilePath -> t m (Either (Array Word8) (Array Word8))
forall a. HasCallStack => FilePath -> a
error FilePath
"processChunks': Not reachable"
{-# INLINE processChunks' #-}
processChunks' ::
(IsStream t, MonadCatch m, MonadAsync m)
=> FilePath
-> [String]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
processChunks' :: FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
processChunks' = (Config -> Config)
-> FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
(Config -> Config)
-> FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
processChunks'With Config -> Config
forall a. a -> a
id
{-# INLINE processBytes' #-}
processBytes' ::
(IsStream t, MonadCatch m, MonadAsync m)
=> FilePath
-> [String]
-> t m Word8
-> t m (Either Word8 Word8)
processBytes' :: FilePath -> [FilePath] -> t m Word8 -> t m (Either Word8 Word8)
processBytes' FilePath
path [FilePath]
args t m Word8
input =
let input1 :: t m (Array Word8)
input1 = Int -> t m Word8 -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
Int -> t m a -> t m (Array a)
ArrayStream.arraysOf Int
defaultChunkSize t m Word8
input
output :: t m (Either (Array Word8) (Array Word8))
output = FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
processChunks' FilePath
path [FilePath]
args t m (Array Word8)
input1
in Unfold m (Either (Array Word8) (Array Word8)) (Either Word8 Word8)
-> t m (Either (Array Word8) (Array Word8))
-> t m (Either Word8 Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> t m a -> t m b
Stream.unfoldMany (Unfold m (Array Word8) Word8
-> Unfold
m (Either (Array Word8) (Array Word8)) (Either Word8 Word8)
forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> Unfold m (Either a a) (Either b b)
Unfold.either Unfold m (Array Word8) Word8
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
Array.read) t m (Either (Array Word8) (Array Word8))
output
{-# INLINE processChunksWith #-}
processChunksWith ::
(IsStream t, MonadCatch m, MonadAsync m)
=> (Config -> Config)
-> FilePath
-> [String]
-> t m (Array Word8)
-> t m (Array Word8)
processChunksWith :: (Config -> Config)
-> FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
processChunksWith Config -> Config
modifier FilePath
path [FilePath]
args t m (Array Word8)
input =
((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> t m (Array Word8))
-> (Config -> Config)
-> FilePath
-> [FilePath]
-> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadCatch m, MonadAsync m) =>
((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> t m a)
-> (Config -> Config) -> FilePath -> [FilePath] -> t m a
processChunksWithAction (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> t m (Array Word8)
forall c d. (Maybe Handle, Maybe Handle, c, d) -> t m (Array Word8)
run Config -> Config
modifier FilePath
path [FilePath]
args
where
run :: (Maybe Handle, Maybe Handle, c, d) -> t m (Array Word8)
run (Just Handle
stdinH, Just Handle
stdoutH, c
_, d
_) =
Handle -> t m (Array Word8) -> t m (Array Word8)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadIO m, IsStream t) =>
Handle -> t m (Array Word8) -> t m a
putChunksClose Handle
stdinH t m (Array Word8)
input t m (Array Word8) -> t m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`parallel` Handle -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Handle -> t m (Array Word8)
toChunksClose Handle
stdoutH
run (Maybe Handle, Maybe Handle, c, d)
_ = FilePath -> t m (Array Word8)
forall a. HasCallStack => FilePath -> a
error FilePath
"processChunks: Not reachable"
{-# INLINE processChunks #-}
processChunks ::
(IsStream t, MonadCatch m, MonadAsync m)
=> FilePath
-> [String]
-> t m (Array Word8)
-> t m (Array Word8)
processChunks :: FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
processChunks = (Config -> Config)
-> FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
(Config -> Config)
-> FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
processChunksWith Config -> Config
forall a. a -> a
id
{-# INLINE processBytes #-}
processBytes ::
(IsStream t, MonadCatch m, MonadAsync m)
=> FilePath
-> [String]
-> t m Word8
-> t m Word8
processBytes :: FilePath -> [FilePath] -> t m Word8 -> t m Word8
processBytes FilePath
path [FilePath]
args t m Word8
input =
let input1 :: t m (Array Word8)
input1 = Int -> t m Word8 -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
Int -> t m a -> t m (Array a)
ArrayStream.arraysOf Int
defaultChunkSize t m Word8
input
output :: t m (Array Word8)
output = FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
processChunks FilePath
path [FilePath]
args t m (Array Word8)
input1
in Unfold m (Array Word8) Word8 -> t m (Array Word8) -> t m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> t m a -> t m b
Stream.unfoldMany Unfold m (Array Word8) Word8
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
Array.read t m (Array Word8)
output
{-# INLINE toBytes' #-}
toBytes' ::
(IsStream t, MonadAsync m, MonadCatch m)
=> FilePath
-> [String]
-> t m (Either Word8 Word8)
toBytes' :: FilePath -> [FilePath] -> t m (Either Word8 Word8)
toBytes' FilePath
path [FilePath]
args = FilePath -> [FilePath] -> t m Word8 -> t m (Either Word8 Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
FilePath -> [FilePath] -> t m Word8 -> t m (Either Word8 Word8)
processBytes' FilePath
path [FilePath]
args t m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
{-# INLINE toBytes #-}
toBytes ::
(IsStream t, MonadAsync m, MonadCatch m)
=> FilePath
-> [String]
-> t m Word8
toBytes :: FilePath -> [FilePath] -> t m Word8
toBytes FilePath
path [FilePath]
args = FilePath -> [FilePath] -> t m Word8 -> t m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
FilePath -> [FilePath] -> t m Word8 -> t m Word8
processBytes FilePath
path [FilePath]
args t m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
{-# INLINE toChunks' #-}
toChunks' ::
(IsStream t, MonadAsync m, MonadCatch m)
=> FilePath
-> [String]
-> t m (Either (Array Word8) (Array Word8))
toChunks' :: FilePath -> [FilePath] -> t m (Either (Array Word8) (Array Word8))
toChunks' FilePath
path [FilePath]
args = FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
processChunks' FilePath
path [FilePath]
args t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
{-# INLINE toChunks #-}
toChunks ::
(IsStream t, MonadAsync m, MonadCatch m)
=> FilePath
-> [String]
-> t m (Array Word8)
toChunks :: FilePath -> [FilePath] -> t m (Array Word8)
toChunks FilePath
path [FilePath]
args = FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
processChunks FilePath
path [FilePath]
args t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil