{-# LANGUAGE CPP #-}
module Ribosome.Data.Conduit.Composition where
import Conduit
import qualified Control.Concurrent.Async.Lifted as A
import Control.Concurrent.Async.Lifted hiding (link2)
import Control.Concurrent.STM hiding (atomically, newTVarIO)
import Control.Exception.Lifted (finally)
import Control.Monad hiding (forM_)
import Control.Monad.Base (MonadBase, liftBase)
import Control.Monad.Loops
import Control.Monad.Trans.Resource
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Cereal as C
import qualified Data.Conduit.List as CL
import Data.Serialize
import Prelude hiding (get, put)
import System.Directory (removeFile)
import System.IO
buffer :: (CCatable c1 c2 c3, CRunnable c3, RunConstraints c3 m)
=> Natural
-> c1 () x m ()
-> c2 x Void m r
-> m r
buffer :: Natural -> c1 () x m () -> c2 x Void m r -> m r
buffer Natural
i c1 () x m ()
c1 c2 x Void m r
c2 = c3 () Void m r -> m r
forall (c :: * -> * -> (* -> *) -> * -> *) (m :: * -> *) r.
(CRunnable c, RunConstraints c m) =>
c () Void m r -> m r
runCConduit (Natural -> c1 () x m () -> c2 x Void m r -> c3 () Void m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i c1 () x m ()
c1 c2 x Void m r
c2)
($$&) :: (CCatable c1 c2 c3, CRunnable c3, RunConstraints c3 m) => c1 () x m () -> c2 x Void m r -> m r
c1 () x m ()
a $$& :: c1 () x m () -> c2 x Void m r -> m r
$$& c2 x Void m r
b = c3 () Void m r -> m r
forall (c :: * -> * -> (* -> *) -> * -> *) (m :: * -> *) r.
(CRunnable c, RunConstraints c m) =>
c () Void m r -> m r
runCConduit (c1 () x m ()
a c1 () x m () -> c2 x Void m r -> c3 () Void m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
c1 i x m () -> c2 x o m r -> c3 i o m r
=$=& c2 x Void m r
b)
infixr 0 $$&
(=$=&) :: (CCatable c1 c2 c3) => c1 i x m () -> c2 x o m r -> c3 i o m r
c1 i x m ()
a =$=& :: c1 i x m () -> c2 x o m r -> c3 i o m r
=$=& c2 x o m r
b = Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
64 c1 i x m ()
a c2 x o m r
b
infixr 2 =$=&
($=&) :: (CCatable c1 c2 c3) => c1 i x m () -> c2 x o m r -> c3 i o m r
$=& :: c1 i x m () -> c2 x o m r -> c3 i o m r
($=&) = c1 i x m () -> c2 x o m r -> c3 i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
c1 i x m () -> c2 x o m r -> c3 i o m r
(=$=&)
infixl 1 $=&
(=$&) :: (CCatable c1 c2 c3) => c1 i x m () -> c2 x o m r -> c3 i o m r
=$& :: c1 i x m () -> c2 x o m r -> c3 i o m r
(=$&) = c1 i x m () -> c2 x o m r -> c3 i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
c1 i x m () -> c2 x o m r -> c3 i o m r
(=$=&)
infixr 2 =$&
class CCatable c1 c2 (c3 :: * -> * -> (* -> *) -> * -> *) | c1 c2 -> c3 where
buffer' :: Natural
-> c1 i x m ()
-> c2 x o m r
-> c3 i o m r
bufferToFile :: (CFConduitLike c1, CFConduitLike c2, Serialize x, MonadBaseControl IO m, MonadResource m, MonadThrow m)
=> Natural
-> Maybe Int
-> FilePath
-> c1 () x m ()
-> c2 x Void m r
-> m r
bufferToFile :: Natural
-> Maybe Int -> FilePath -> c1 () x m () -> c2 x Void m r -> m r
bufferToFile Natural
bufsz Maybe Int
dsksz FilePath
tmpDir c1 () x m ()
c1 c2 x Void m r
c2 = CFConduit () Void m r -> m r
forall (c :: * -> * -> (* -> *) -> * -> *) (m :: * -> *) r.
(CRunnable c, RunConstraints c m) =>
c () Void m r -> m r
runCConduit (Natural
-> Maybe Int
-> FilePath
-> c1 () x m ()
-> c2 x Void m r
-> CFConduit () Void m r
forall (c1 :: * -> * -> (* -> *) -> * -> *)
(c2 :: * -> * -> (* -> *) -> * -> *) x i (m :: * -> *) o r.
(CFConduitLike c1, CFConduitLike c2, Serialize x) =>
Natural
-> Maybe Int
-> FilePath
-> c1 i x m ()
-> c2 x o m r
-> CFConduit i o m r
bufferToFile' Natural
bufsz Maybe Int
dsksz FilePath
tmpDir c1 () x m ()
c1 c2 x Void m r
c2)
bufferToFile' :: (CFConduitLike c1, CFConduitLike c2, Serialize x)
=> Natural
-> Maybe Int
-> FilePath
-> c1 i x m ()
-> c2 x o m r
-> CFConduit i o m r
bufferToFile' :: Natural
-> Maybe Int
-> FilePath
-> c1 i x m ()
-> c2 x o m r
-> CFConduit i o m r
bufferToFile' Natural
bufsz Maybe Int
dsksz FilePath
tmpDir c1 i x m ()
c1 c2 x o m r
c2 = CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
combine (c1 i x m () -> CFConduit i x m ()
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit c1 i x m ()
c1) (c2 x o m r -> CFConduit x o m r
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit c2 x o m r
c2)
where combine :: CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
combine (FSingle ConduitT i x m ()
a) CFConduit x o m r
b = Natural
-> Maybe Int
-> FilePath
-> ConduitT i x m ()
-> CFConduit x o m r
-> CFConduit i o m r
forall x i (m :: * -> *) o r.
Serialize x =>
Natural
-> Maybe Int
-> FilePath
-> ConduitT i x m ()
-> CFConduit x o m r
-> CFConduit i o m r
FMultipleF Natural
bufsz Maybe Int
dsksz FilePath
tmpDir ConduitT i x m ()
a CFConduit x o m r
b
combine (FMultiple Natural
i ConduitT i x m ()
a CFConduit x x m ()
as) CFConduit x o m r
b = Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
FMultiple Natural
i ConduitT i x m ()
a (Natural
-> Maybe Int
-> FilePath
-> CFConduit x x m ()
-> CFConduit x o m r
-> CFConduit x o m r
forall (c1 :: * -> * -> (* -> *) -> * -> *)
(c2 :: * -> * -> (* -> *) -> * -> *) x i (m :: * -> *) o r.
(CFConduitLike c1, CFConduitLike c2, Serialize x) =>
Natural
-> Maybe Int
-> FilePath
-> c1 i x m ()
-> c2 x o m r
-> CFConduit i o m r
bufferToFile' Natural
bufsz Maybe Int
dsksz FilePath
tmpDir CFConduit x x m ()
as CFConduit x o m r
b)
combine (FMultipleF Natural
bufsz' Maybe Int
dsksz' FilePath
tmpDir' ConduitT i x m ()
a CFConduit x x m ()
as) CFConduit x o m r
b = Natural
-> Maybe Int
-> FilePath
-> ConduitT i x m ()
-> CFConduit x o m r
-> CFConduit i o m r
forall x i (m :: * -> *) o r.
Serialize x =>
Natural
-> Maybe Int
-> FilePath
-> ConduitT i x m ()
-> CFConduit x o m r
-> CFConduit i o m r
FMultipleF Natural
bufsz' Maybe Int
dsksz' FilePath
tmpDir' ConduitT i x m ()
a (Natural
-> Maybe Int
-> FilePath
-> CFConduit x x m ()
-> CFConduit x o m r
-> CFConduit x o m r
forall (c1 :: * -> * -> (* -> *) -> * -> *)
(c2 :: * -> * -> (* -> *) -> * -> *) x i (m :: * -> *) o r.
(CFConduitLike c1, CFConduitLike c2, Serialize x) =>
Natural
-> Maybe Int
-> FilePath
-> c1 i x m ()
-> c2 x o m r
-> CFConduit i o m r
bufferToFile' Natural
bufsz Maybe Int
dsksz FilePath
tmpDir CFConduit x x m ()
as CFConduit x o m r
b)
class CRunnable c where
type RunConstraints c (m :: * -> *) :: Constraint
runCConduit :: (RunConstraints c m) => c () Void m r -> m r
instance CCatable ConduitT ConduitT CConduit where
buffer' :: Natural
-> ConduitT i x m () -> ConduitT x o m r -> CConduit i o m r
buffer' Natural
i ConduitT i x m ()
a ConduitT x o m r
b = Natural
-> CConduit i x m () -> CConduit x o m r -> CConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i (ConduitT i x m () -> CConduit i x m ()
forall i o (m :: * -> *) r. ConduitT i o m r -> CConduit i o m r
Single ConduitT i x m ()
a) (ConduitT x o m r -> CConduit x o m r
forall i o (m :: * -> *) r. ConduitT i o m r -> CConduit i o m r
Single ConduitT x o m r
b)
instance CCatable ConduitT CConduit CConduit where
buffer' :: Natural
-> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
buffer' Natural
i ConduitT i x m ()
a CConduit x o m r
b = Natural
-> CConduit i x m () -> CConduit x o m r -> CConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i (ConduitT i x m () -> CConduit i x m ()
forall i o (m :: * -> *) r. ConduitT i o m r -> CConduit i o m r
Single ConduitT i x m ()
a) CConduit x o m r
b
instance CCatable ConduitT CFConduit CFConduit where
buffer' :: Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
buffer' Natural
i ConduitT i x m ()
a CFConduit x o m r
b = Natural
-> CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i (ConduitT i x m () -> CFConduit i x m ()
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit ConduitT i x m ()
a) CFConduit x o m r
b
instance CCatable CConduit ConduitT CConduit where
buffer' :: Natural
-> CConduit i x m () -> ConduitT x o m r -> CConduit i o m r
buffer' Natural
i CConduit i x m ()
a ConduitT x o m r
b = Natural
-> CConduit i x m () -> CConduit x o m r -> CConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i CConduit i x m ()
a (ConduitT x o m r -> CConduit x o m r
forall i o (m :: * -> *) r. ConduitT i o m r -> CConduit i o m r
Single ConduitT x o m r
b)
instance CCatable CConduit CConduit CConduit where
buffer' :: Natural
-> CConduit i x m () -> CConduit x o m r -> CConduit i o m r
buffer' Natural
i (Single ConduitT i x m ()
a) CConduit x o m r
b = Natural
-> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
Multiple Natural
i ConduitT i x m ()
a CConduit x o m r
b
buffer' Natural
i (Multiple Natural
i' ConduitT i x m ()
a CConduit x x m ()
as) CConduit x o m r
b = Natural
-> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
Multiple Natural
i' ConduitT i x m ()
a (Natural
-> CConduit x x m () -> CConduit x o m r -> CConduit x o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i CConduit x x m ()
as CConduit x o m r
b)
instance CCatable CConduit CFConduit CFConduit where
buffer' :: Natural
-> CConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
buffer' Natural
i CConduit i x m ()
a CFConduit x o m r
b = Natural
-> CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i (CConduit i x m () -> CFConduit i x m ()
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit CConduit i x m ()
a) CFConduit x o m r
b
instance CCatable CFConduit ConduitT CFConduit where
buffer' :: Natural
-> CFConduit i x m () -> ConduitT x o m r -> CFConduit i o m r
buffer' Natural
i CFConduit i x m ()
a ConduitT x o m r
b = Natural
-> CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i CFConduit i x m ()
a (ConduitT x o m r -> CFConduit x o m r
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit ConduitT x o m r
b)
instance CCatable CFConduit CConduit CFConduit where
buffer' :: Natural
-> CFConduit i x m () -> CConduit x o m r -> CFConduit i o m r
buffer' Natural
i CFConduit i x m ()
a CConduit x o m r
b = Natural
-> CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i CFConduit i x m ()
a (CConduit x o m r -> CFConduit x o m r
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit CConduit x o m r
b)
instance CCatable CFConduit CFConduit CFConduit where
buffer' :: Natural
-> CFConduit i x m () -> CFConduit x o m r -> CFConduit i o m r
buffer' Natural
i (FSingle ConduitT i x m ()
a) CFConduit x o m r
b = Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
FMultiple Natural
i ConduitT i x m ()
a CFConduit x o m r
b
buffer' Natural
i (FMultiple Natural
i' ConduitT i x m ()
a CFConduit x x m ()
as) CFConduit x o m r
b = Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
FMultiple Natural
i' ConduitT i x m ()
a (Natural
-> CFConduit x x m () -> CFConduit x o m r -> CFConduit x o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i CFConduit x x m ()
as CFConduit x o m r
b)
buffer' Natural
i (FMultipleF Natural
bufsz Maybe Int
dsksz FilePath
tmpDir ConduitT i x m ()
a CFConduit x x m ()
as) CFConduit x o m r
b = Natural
-> Maybe Int
-> FilePath
-> ConduitT i x m ()
-> CFConduit x o m r
-> CFConduit i o m r
forall x i (m :: * -> *) o r.
Serialize x =>
Natural
-> Maybe Int
-> FilePath
-> ConduitT i x m ()
-> CFConduit x o m r
-> CFConduit i o m r
FMultipleF Natural
bufsz Maybe Int
dsksz FilePath
tmpDir ConduitT i x m ()
a (Natural
-> CFConduit x x m () -> CFConduit x o m r -> CFConduit x o m r
forall k (c1 :: * -> k -> (* -> *) -> * -> *)
(c2 :: k -> * -> (* -> *) -> * -> *)
(c3 :: * -> * -> (* -> *) -> * -> *) i (x :: k) (m :: * -> *) o r.
CCatable c1 c2 c3 =>
Natural -> c1 i x m () -> c2 x o m r -> c3 i o m r
buffer' Natural
i CFConduit x x m ()
as CFConduit x o m r
b)
instance CRunnable ConduitT where
type RunConstraints ConduitT m = (Monad m)
runCConduit :: ConduitT () Void m r -> m r
runCConduit = ConduitT () Void m r -> m r
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit
instance CRunnable CConduit where
type RunConstraints CConduit m = (MonadBaseControl IO m, MonadIO m)
runCConduit :: CConduit () Void m r -> m r
runCConduit (Single ConduitT () Void m r
c) = ConduitT () Void m r -> m r
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit ConduitT () Void m r
c
runCConduit (Multiple Natural
bufsz ConduitT () x m ()
c CConduit x Void m r
cs) = do
TBQueue (Maybe x)
chan <- IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x)))
-> IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall a b. (a -> b) -> a -> b
$ Natural -> IO (TBQueue (Maybe x))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bufsz
m () -> (Async (StM m ()) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (TBQueue (Maybe x) -> ConduitT () x m () -> m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m () -> m ()
sender TBQueue (Maybe x)
chan ConduitT () x m ()
c) ((Async (StM m ()) -> m r) -> m r)
-> (Async (StM m ()) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m ())
c' ->
TBQueue (Maybe x) -> Async (StM m ()) -> CConduit x Void m r -> m r
forall (m :: * -> *) i x r.
(MonadBaseControl IO m, MonadIO m) =>
TBQueue (Maybe i) -> Async x -> CConduit i Void m r -> m r
stage TBQueue (Maybe x)
chan Async (StM m ())
c' CConduit x Void m r
cs
instance CRunnable CFConduit where
type RunConstraints CFConduit m = (MonadBaseControl IO m, MonadIO m, MonadResource m, MonadThrow m)
runCConduit :: CFConduit () Void m r -> m r
runCConduit (FSingle ConduitT () Void m r
c) = ConduitT () Void m r -> m r
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit ConduitT () Void m r
c
runCConduit (FMultiple Natural
bufsz ConduitT () x m ()
c CFConduit x Void m r
cs) = do
TBQueue (Maybe x)
chan <- IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x)))
-> IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall a b. (a -> b) -> a -> b
$ Natural -> IO (TBQueue (Maybe x))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bufsz
m () -> (Async (StM m ()) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (TBQueue (Maybe x) -> ConduitT () x m () -> m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m () -> m ()
sender TBQueue (Maybe x)
chan ConduitT () x m ()
c) ((Async (StM m ()) -> m r) -> m r)
-> (Async (StM m ()) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m ())
c' ->
ConduitT () x m ()
-> Async (StM m ()) -> CFConduit x Void m r -> m r
forall (m :: * -> *) i x r.
(MonadBaseControl IO m, MonadResource m, MonadThrow m) =>
ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
fstage (TBQueue (Maybe x) -> ConduitT () x m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m ()
receiver TBQueue (Maybe x)
chan) Async (StM m ())
c' CFConduit x Void m r
cs
runCConduit (FMultipleF Natural
bufsz Maybe Int
filemax FilePath
tempDir ConduitT () x m ()
c CFConduit x Void m r
cs) = do
BufferContext m x
context <- IO (BufferContext m x) -> m (BufferContext m x)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (BufferContext m x) -> m (BufferContext m x))
-> IO (BufferContext m x) -> m (BufferContext m x)
forall a b. (a -> b) -> a -> b
$ TBQueue x
-> TQueue (ConduitT () x m ())
-> TVar (Maybe Int)
-> TVar Bool
-> FilePath
-> BufferContext m x
forall (m :: * -> *) a.
TBQueue a
-> TQueue (ConduitT () a m ())
-> TVar (Maybe Int)
-> TVar Bool
-> FilePath
-> BufferContext m a
BufferContext (TBQueue x
-> TQueue (ConduitT () x m ())
-> TVar (Maybe Int)
-> TVar Bool
-> FilePath
-> BufferContext m x)
-> IO (TBQueue x)
-> IO
(TQueue (ConduitT () x m ())
-> TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Natural -> IO (TBQueue x)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bufsz
IO
(TQueue (ConduitT () x m ())
-> TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
-> IO (TQueue (ConduitT () x m ()))
-> IO
(TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (TQueue (ConduitT () x m ()))
forall a. IO (TQueue a)
newTQueueIO
IO (TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
-> IO (TVar (Maybe Int))
-> IO (TVar Bool -> FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe Int -> IO (TVar (Maybe Int))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Maybe Int
filemax
IO (TVar Bool -> FilePath -> BufferContext m x)
-> IO (TVar Bool) -> IO (FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Bool -> IO (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
False
IO (FilePath -> BufferContext m x)
-> IO FilePath -> IO (BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> FilePath -> IO FilePath
forall (f :: * -> *) a. Applicative f => a -> f a
pure FilePath
tempDir
m () -> (Async (StM m ()) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (BufferContext m x -> ConduitT () x m () -> m ()
forall (m :: * -> *) x.
(MonadIO m, MonadResource m, Serialize x, MonadThrow m) =>
BufferContext m x -> ConduitT () x m () -> m ()
fsender BufferContext m x
context ConduitT () x m ()
c) ((Async (StM m ()) -> m r) -> m r)
-> (Async (StM m ()) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m ())
c' ->
ConduitT () x m ()
-> Async (StM m ()) -> CFConduit x Void m r -> m r
forall (m :: * -> *) i x r.
(MonadBaseControl IO m, MonadResource m, MonadThrow m) =>
ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
fstage (BufferContext m x -> ConduitT () x m ()
forall (m :: * -> *) o.
MonadIO m =>
BufferContext m o -> ConduitT () o m ()
freceiver BufferContext m x
context) Async (StM m ())
c' CFConduit x Void m r
cs
data CConduit i o m r where
Single :: ConduitT i o m r -> CConduit i o m r
Multiple :: Natural -> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
link2 :: MonadBase IO m => Async a -> Async b -> m ()
link2 :: Async a -> Async b -> m ()
link2 = (IO () -> m ()
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase (IO () -> m ()) -> (Async b -> IO ()) -> Async b -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
.) ((Async b -> IO ()) -> Async b -> m ())
-> (Async a -> Async b -> IO ()) -> Async a -> Async b -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> Async b -> IO ()
forall (m :: * -> *) a b.
MonadBase IO m =>
Async a -> Async b -> m ()
A.link2
sender :: (MonadIO m) => TBQueue (Maybe o) -> ConduitT () o m () -> m ()
sender :: TBQueue (Maybe o) -> ConduitT () o m () -> m ()
sender TBQueue (Maybe o)
chan ConduitT () o m ()
input = do
ConduitT () Void m () -> m ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m () -> m ()) -> ConduitT () Void m () -> m ()
forall a b. (a -> b) -> a -> b
$ ConduitT () o m ()
input ConduitT () o m () -> ConduitM o Void m () -> ConduitT () Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (o -> m ()) -> ConduitM o Void m ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
mapM_C (TBQueue (Maybe o) -> Maybe o -> m ()
forall (m :: * -> *) a. MonadIO m => TBQueue a -> a -> m ()
send TBQueue (Maybe o)
chan (Maybe o -> m ()) -> (o -> Maybe o) -> o -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. o -> Maybe o
forall a. a -> Maybe a
Just)
TBQueue (Maybe o) -> Maybe o -> m ()
forall (m :: * -> *) a. MonadIO m => TBQueue a -> a -> m ()
send TBQueue (Maybe o)
chan Maybe o
forall a. Maybe a
Nothing
stage :: (MonadBaseControl IO m, MonadIO m) => TBQueue (Maybe i) -> Async x -> CConduit i Void m r -> m r
stage :: TBQueue (Maybe i) -> Async x -> CConduit i Void m r -> m r
stage TBQueue (Maybe i)
chan Async x
prevAsync (Single ConduitT i Void m r
c) =
m r -> (Async (StM m r) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (ConduitT () Void m r -> m r
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m r -> m r) -> ConduitT () Void m r -> m r
forall a b. (a -> b) -> a -> b
$ TBQueue (Maybe i) -> ConduitT () i m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m ()
receiver TBQueue (Maybe i)
chan ConduitT () i m () -> ConduitT i Void m r -> ConduitT () Void m r
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT i Void m r
c) ((Async (StM m r) -> m r) -> m r)
-> (Async (StM m r) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m r)
c' -> do
Async x -> Async (StM m r) -> m ()
forall (m :: * -> *) a b.
MonadBase IO m =>
Async a -> Async b -> m ()
link2 Async x
prevAsync Async (StM m r)
c'
Async (StM m r) -> m r
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async (StM m r)
c'
stage TBQueue (Maybe i)
chan Async x
prevAsync (Multiple Natural
bufsz ConduitT i x m ()
c CConduit x Void m r
cs) = do
TBQueue (Maybe x)
chan' <- IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x)))
-> IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall a b. (a -> b) -> a -> b
$ Natural -> IO (TBQueue (Maybe x))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bufsz
m () -> (Async (StM m ()) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (TBQueue (Maybe x) -> ConduitT () x m () -> m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m () -> m ()
sender TBQueue (Maybe x)
chan' (ConduitT () x m () -> m ()) -> ConduitT () x m () -> m ()
forall a b. (a -> b) -> a -> b
$ TBQueue (Maybe i) -> ConduitT () i m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m ()
receiver TBQueue (Maybe i)
chan ConduitT () i m () -> ConduitT i x m () -> ConduitT () x m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT i x m ()
c) ((Async (StM m ()) -> m r) -> m r)
-> (Async (StM m ()) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m ())
c' -> do
Async x -> Async (StM m ()) -> m ()
forall (m :: * -> *) a b.
MonadBase IO m =>
Async a -> Async b -> m ()
link2 Async x
prevAsync Async (StM m ())
c'
TBQueue (Maybe x) -> Async (StM m ()) -> CConduit x Void m r -> m r
forall (m :: * -> *) i x r.
(MonadBaseControl IO m, MonadIO m) =>
TBQueue (Maybe i) -> Async x -> CConduit i Void m r -> m r
stage TBQueue (Maybe x)
chan' Async (StM m ())
c' CConduit x Void m r
cs
receiver :: (MonadIO m) => TBQueue (Maybe o) -> ConduitT () o m ()
receiver :: TBQueue (Maybe o) -> ConduitT () o m ()
receiver TBQueue (Maybe o)
chan = do
Maybe o
mx <- TBQueue (Maybe o) -> ConduitT () o m (Maybe o)
forall (m :: * -> *) a. MonadIO m => TBQueue a -> m a
recv TBQueue (Maybe o)
chan
case Maybe o
mx of
Maybe o
Nothing -> () -> ConduitT () o m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just o
x -> o -> ConduitT () o m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield o
x ConduitT () o m () -> ConduitT () o m () -> ConduitT () o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TBQueue (Maybe o) -> ConduitT () o m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m ()
receiver TBQueue (Maybe o)
chan
data CFConduit i o m r where
FSingle :: ConduitT i o m r -> CFConduit i o m r
FMultiple :: Natural -> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
FMultipleF :: (Serialize x) => Natural -> Maybe Int -> FilePath -> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
class CFConduitLike a where
asCFConduit :: a i o m r -> CFConduit i o m r
instance CFConduitLike ConduitT where
asCFConduit :: ConduitT i o m r -> CFConduit i o m r
asCFConduit = ConduitT i o m r -> CFConduit i o m r
forall i o (m :: * -> *) r. ConduitT i o m r -> CFConduit i o m r
FSingle
instance CFConduitLike CConduit where
asCFConduit :: CConduit i o m r -> CFConduit i o m r
asCFConduit (Single ConduitT i o m r
c) = ConduitT i o m r -> CFConduit i o m r
forall i o (m :: * -> *) r. ConduitT i o m r -> CFConduit i o m r
FSingle ConduitT i o m r
c
asCFConduit (Multiple Natural
i ConduitT i x m ()
c CConduit x o m r
cs) = Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r
FMultiple Natural
i ConduitT i x m ()
c (CConduit x o m r -> CFConduit x o m r
forall (a :: * -> * -> (* -> *) -> * -> *) i o (m :: * -> *) r.
CFConduitLike a =>
a i o m r -> CFConduit i o m r
asCFConduit CConduit x o m r
cs)
instance CFConduitLike CFConduit where
asCFConduit :: CFConduit i o m r -> CFConduit i o m r
asCFConduit = CFConduit i o m r -> CFConduit i o m r
forall a. a -> a
id
data BufferContext m a = BufferContext { BufferContext m a -> TBQueue a
chan :: TBQueue a
, BufferContext m a -> TQueue (ConduitT () a m ())
restore :: TQueue (ConduitT () a m ())
, BufferContext m a -> TVar (Maybe Int)
slotsFree :: TVar (Maybe Int)
, BufferContext m a -> TVar Bool
done :: TVar Bool
, BufferContext m a -> FilePath
tempDir :: FilePath
}
fsender :: (MonadIO m, MonadResource m, Serialize x, MonadThrow m) => BufferContext m x -> ConduitT () x m () -> m ()
fsender :: BufferContext m x -> ConduitT () x m () -> m ()
fsender bc :: BufferContext m x
bc@BufferContext{FilePath
TVar Bool
TVar (Maybe Int)
TQueue (ConduitT () x m ())
TBQueue x
tempDir :: FilePath
done :: TVar Bool
slotsFree :: TVar (Maybe Int)
restore :: TQueue (ConduitT () x m ())
chan :: TBQueue x
$sel:tempDir:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> FilePath
$sel:done:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TVar Bool
$sel:slotsFree:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TVar (Maybe Int)
$sel:restore:BufferContext :: forall (m :: * -> *) a.
BufferContext m a -> TQueue (ConduitT () a m ())
$sel:chan:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TBQueue a
..} ConduitT () x m ()
input = do
ConduitT () Void m () -> m ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m () -> m ()) -> ConduitT () Void m () -> m ()
forall a b. (a -> b) -> a -> b
$ ConduitT () x m ()
input ConduitT () x m () -> ConduitM x Void m () -> ConduitT () Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (x -> m ()) -> ConduitM x Void m ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
mapM_C x -> m ()
f
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
done Bool
True
where
f :: x -> m ()
f x
x = m (m ()) -> m ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m ()) -> m ()) -> m (m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ IO (m ()) -> m (m ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (m ()) -> m (m ())) -> IO (m ()) -> m (m ())
forall a b. (a -> b) -> a -> b
$ STM (m ()) -> IO (m ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (m ()) -> IO (m ())) -> STM (m ()) -> IO (m ())
forall a b. (a -> b) -> a -> b
$
(TBQueue x -> x -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue x
chan x
x STM () -> STM (m ()) -> STM (m ())
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m () -> STM (m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())) STM (m ()) -> STM (m ()) -> STM (m ())
forall a. STM a -> STM a -> STM a
`orElse` do
m ()
action <- BufferContext m x -> STM (m ())
forall (m :: * -> *) o.
(MonadIO m, MonadResource m, Serialize o, MonadThrow m) =>
BufferContext m o -> STM (m ())
persistChan BufferContext m x
bc
TBQueue x -> x -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue x
chan x
x
return m ()
action
fstage :: (MonadBaseControl IO m, MonadResource m, MonadThrow m) => ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
fstage :: ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
fstage ConduitT () i m ()
prevStage Async x
prevAsync (FSingle ConduitT i Void m r
c) =
m r -> (Async (StM m r) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (ConduitT () Void m r -> m r
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m r -> m r) -> ConduitT () Void m r -> m r
forall a b. (a -> b) -> a -> b
$ ConduitT () i m ()
prevStage ConduitT () i m () -> ConduitT i Void m r -> ConduitT () Void m r
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT i Void m r
c) ((Async (StM m r) -> m r) -> m r)
-> (Async (StM m r) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m r)
c' -> do
Async x -> Async (StM m r) -> m ()
forall (m :: * -> *) a b.
MonadBase IO m =>
Async a -> Async b -> m ()
link2 Async x
prevAsync Async (StM m r)
c'
Async (StM m r) -> m r
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async (StM m r)
c'
fstage ConduitT () i m ()
prevStage Async x
prevAsync (FMultiple Natural
bufsz ConduitT i x m ()
c CFConduit x Void m r
cs) = do
TBQueue (Maybe x)
chan' <- IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x)))
-> IO (TBQueue (Maybe x)) -> m (TBQueue (Maybe x))
forall a b. (a -> b) -> a -> b
$ Natural -> IO (TBQueue (Maybe x))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bufsz
m () -> (Async (StM m ()) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (TBQueue (Maybe x) -> ConduitT () x m () -> m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m () -> m ()
sender TBQueue (Maybe x)
chan' (ConduitT () x m () -> m ()) -> ConduitT () x m () -> m ()
forall a b. (a -> b) -> a -> b
$ ConduitT () i m ()
prevStage ConduitT () i m () -> ConduitT i x m () -> ConduitT () x m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT i x m ()
c) ((Async (StM m ()) -> m r) -> m r)
-> (Async (StM m ()) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m ())
c' -> do
Async x -> Async (StM m ()) -> m ()
forall (m :: * -> *) a b.
MonadBase IO m =>
Async a -> Async b -> m ()
link2 Async x
prevAsync Async (StM m ())
c'
ConduitT () x m ()
-> Async (StM m ()) -> CFConduit x Void m r -> m r
forall (m :: * -> *) i x r.
(MonadBaseControl IO m, MonadResource m, MonadThrow m) =>
ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
fstage (TBQueue (Maybe x) -> ConduitT () x m ()
forall (m :: * -> *) o.
MonadIO m =>
TBQueue (Maybe o) -> ConduitT () o m ()
receiver TBQueue (Maybe x)
chan') Async (StM m ())
c' CFConduit x Void m r
cs
fstage ConduitT () i m ()
prevStage Async x
prevAsync (FMultipleF Natural
bufsz Maybe Int
dsksz FilePath
tempDir ConduitT i x m ()
c CFConduit x Void m r
cs) = do
BufferContext m x
bc <- IO (BufferContext m x) -> m (BufferContext m x)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (BufferContext m x) -> m (BufferContext m x))
-> IO (BufferContext m x) -> m (BufferContext m x)
forall a b. (a -> b) -> a -> b
$ TBQueue x
-> TQueue (ConduitT () x m ())
-> TVar (Maybe Int)
-> TVar Bool
-> FilePath
-> BufferContext m x
forall (m :: * -> *) a.
TBQueue a
-> TQueue (ConduitT () a m ())
-> TVar (Maybe Int)
-> TVar Bool
-> FilePath
-> BufferContext m a
BufferContext (TBQueue x
-> TQueue (ConduitT () x m ())
-> TVar (Maybe Int)
-> TVar Bool
-> FilePath
-> BufferContext m x)
-> IO (TBQueue x)
-> IO
(TQueue (ConduitT () x m ())
-> TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Natural -> IO (TBQueue x)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bufsz
IO
(TQueue (ConduitT () x m ())
-> TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
-> IO (TQueue (ConduitT () x m ()))
-> IO
(TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (TQueue (ConduitT () x m ()))
forall a. IO (TQueue a)
newTQueueIO
IO (TVar (Maybe Int) -> TVar Bool -> FilePath -> BufferContext m x)
-> IO (TVar (Maybe Int))
-> IO (TVar Bool -> FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe Int -> IO (TVar (Maybe Int))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Maybe Int
dsksz
IO (TVar Bool -> FilePath -> BufferContext m x)
-> IO (TVar Bool) -> IO (FilePath -> BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Bool -> IO (TVar Bool)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
False
IO (FilePath -> BufferContext m x)
-> IO FilePath -> IO (BufferContext m x)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> FilePath -> IO FilePath
forall (f :: * -> *) a. Applicative f => a -> f a
pure FilePath
tempDir
m () -> (Async (StM m ()) -> m r) -> m r
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> (Async (StM m a) -> m b) -> m b
withAsync (BufferContext m x -> ConduitT () x m () -> m ()
forall (m :: * -> *) x.
(MonadIO m, MonadResource m, Serialize x, MonadThrow m) =>
BufferContext m x -> ConduitT () x m () -> m ()
fsender BufferContext m x
bc (ConduitT () x m () -> m ()) -> ConduitT () x m () -> m ()
forall a b. (a -> b) -> a -> b
$ ConduitT () i m ()
prevStage ConduitT () i m () -> ConduitT i x m () -> ConduitT () x m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT i x m ()
c) ((Async (StM m ()) -> m r) -> m r)
-> (Async (StM m ()) -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \Async (StM m ())
c' -> do
Async x -> Async (StM m ()) -> m ()
forall (m :: * -> *) a b.
MonadBase IO m =>
Async a -> Async b -> m ()
link2 Async x
prevAsync Async (StM m ())
c'
ConduitT () x m ()
-> Async (StM m ()) -> CFConduit x Void m r -> m r
forall (m :: * -> *) i x r.
(MonadBaseControl IO m, MonadResource m, MonadThrow m) =>
ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
fstage (BufferContext m x -> ConduitT () x m ()
forall (m :: * -> *) o.
MonadIO m =>
BufferContext m o -> ConduitT () o m ()
freceiver BufferContext m x
bc) Async (StM m ())
c' CFConduit x Void m r
cs
freceiver :: (MonadIO m) => BufferContext m o -> ConduitT () o m ()
freceiver :: BufferContext m o -> ConduitT () o m ()
freceiver BufferContext{FilePath
TVar Bool
TVar (Maybe Int)
TQueue (ConduitT () o m ())
TBQueue o
tempDir :: FilePath
done :: TVar Bool
slotsFree :: TVar (Maybe Int)
restore :: TQueue (ConduitT () o m ())
chan :: TBQueue o
$sel:tempDir:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> FilePath
$sel:done:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TVar Bool
$sel:slotsFree:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TVar (Maybe Int)
$sel:restore:BufferContext :: forall (m :: * -> *) a.
BufferContext m a -> TQueue (ConduitT () a m ())
$sel:chan:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TBQueue a
..} = ConduitT () o m ()
loop where
loop :: ConduitT () o m ()
loop = do
(ConduitT () o m ()
src, Bool
exit) <- IO (ConduitT () o m (), Bool)
-> ConduitT () o m (ConduitT () o m (), Bool)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ConduitT () o m (), Bool)
-> ConduitT () o m (ConduitT () o m (), Bool))
-> IO (ConduitT () o m (), Bool)
-> ConduitT () o m (ConduitT () o m (), Bool)
forall a b. (a -> b) -> a -> b
$ STM (ConduitT () o m (), Bool) -> IO (ConduitT () o m (), Bool)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (ConduitT () o m (), Bool) -> IO (ConduitT () o m (), Bool))
-> STM (ConduitT () o m (), Bool) -> IO (ConduitT () o m (), Bool)
forall a b. (a -> b) -> a -> b
$ do
(TQueue (ConduitT () o m ()) -> STM (ConduitT () o m ())
forall a. TQueue a -> STM a
readTQueue TQueue (ConduitT () o m ())
restore STM (ConduitT () o m ())
-> (ConduitT () o m () -> STM (ConduitT () o m (), Bool))
-> STM (ConduitT () o m (), Bool)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\ConduitT () o m ()
action -> (ConduitT () o m (), Bool) -> STM (ConduitT () o m (), Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitT () o m ()
action, Bool
False))) STM (ConduitT () o m (), Bool)
-> STM (ConduitT () o m (), Bool) -> STM (ConduitT () o m (), Bool)
forall a. STM a -> STM a -> STM a
`orElse` do
[o]
xs <- TBQueue o -> STM [o]
forall a. TBQueue a -> STM [a]
exhaust TBQueue o
chan
Bool
isDone <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
done
return ([o] -> ConduitT () o m ()
forall (m :: * -> *) a i. Monad m => [a] -> ConduitT i a m ()
CL.sourceList [o]
xs, Bool
isDone)
ConduitT () o m ()
src
Bool -> ConduitT () o m () -> ConduitT () o m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
exit ConduitT () o m ()
loop
persistChan :: (MonadIO m, MonadResource m, Serialize o, MonadThrow m) => BufferContext m o -> STM (m ())
persistChan :: BufferContext m o -> STM (m ())
persistChan BufferContext{FilePath
TVar Bool
TVar (Maybe Int)
TQueue (ConduitT () o m ())
TBQueue o
tempDir :: FilePath
done :: TVar Bool
slotsFree :: TVar (Maybe Int)
restore :: TQueue (ConduitT () o m ())
chan :: TBQueue o
$sel:tempDir:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> FilePath
$sel:done:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TVar Bool
$sel:slotsFree:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TVar (Maybe Int)
$sel:restore:BufferContext :: forall (m :: * -> *) a.
BufferContext m a -> TQueue (ConduitT () a m ())
$sel:chan:BufferContext :: forall (m :: * -> *) a. BufferContext m a -> TBQueue a
..} = do
[o]
xs <- TBQueue o -> STM [o]
forall a. TBQueue a -> STM [a]
exhaust TBQueue o
chan
Maybe Int
mslots <- TVar (Maybe Int) -> STM (Maybe Int)
forall a. TVar a -> STM a
readTVar TVar (Maybe Int)
slotsFree
let len :: Int
len = [o] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [o]
xs
Maybe Int -> (Int -> STM ()) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe Int
mslots ((Int -> STM ()) -> STM ()) -> (Int -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \Int
slots -> Bool -> STM ()
check (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
slots)
TMVar (FilePath, ReleaseKey)
filePath <- STM (TMVar (FilePath, ReleaseKey))
forall a. STM (TMVar a)
newEmptyTMVar
TQueue (ConduitT () o m ()) -> ConduitT () o m () -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (ConduitT () o m ())
restore (ConduitT () o m () -> STM ()) -> ConduitT () o m () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
(FilePath
path, ReleaseKey
key) <- IO (FilePath, ReleaseKey) -> ConduitT () o m (FilePath, ReleaseKey)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (FilePath, ReleaseKey)
-> ConduitT () o m (FilePath, ReleaseKey))
-> IO (FilePath, ReleaseKey)
-> ConduitT () o m (FilePath, ReleaseKey)
forall a b. (a -> b) -> a -> b
$ STM (FilePath, ReleaseKey) -> IO (FilePath, ReleaseKey)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (FilePath, ReleaseKey) -> IO (FilePath, ReleaseKey))
-> STM (FilePath, ReleaseKey) -> IO (FilePath, ReleaseKey)
forall a b. (a -> b) -> a -> b
$ TMVar (FilePath, ReleaseKey) -> STM (FilePath, ReleaseKey)
forall a. TMVar a -> STM a
takeTMVar TMVar (FilePath, ReleaseKey)
filePath
FilePath -> ConduitT () ByteString m ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
CB.sourceFile FilePath
path ConduitT () ByteString m ()
-> ConduitM ByteString o m () -> ConduitT () o m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| do
Get o -> ConduitM ByteString o m ()
forall (m :: * -> *) o.
MonadThrow m =>
Get o -> ConduitT ByteString o m ()
C.conduitGet2 Get o
forall t. Serialize t => Get t
get
IO () -> ConduitM ByteString o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitM ByteString o m ())
-> IO () -> ConduitM ByteString o m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe Int) -> (Maybe Int -> Maybe Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar (Maybe Int)
slotsFree ((Int -> Int) -> Maybe Int -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len))
ReleaseKey -> ConduitM ByteString o m ()
forall (m :: * -> *). MonadIO m => ReleaseKey -> m ()
release ReleaseKey
key
case [o]
xs of
[] -> m () -> STM (m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
[o]
_ -> do
TVar (Maybe Int) -> (Maybe Int -> Maybe Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar (Maybe Int)
slotsFree ((Int -> Int) -> Maybe Int -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
len))
return $ do
(ReleaseKey
key, (FilePath
path, Handle
h)) <- IO (FilePath, Handle)
-> ((FilePath, Handle) -> IO ())
-> m (ReleaseKey, (FilePath, Handle))
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (FilePath -> FilePath -> IO (FilePath, Handle)
openBinaryTempFile FilePath
tempDir FilePath
"conduit.bin") (\(FilePath
path, Handle
h) -> Handle -> IO ()
hClose Handle
h IO () -> IO () -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`finally` FilePath -> IO ()
removeFile FilePath
path)
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ConduitT () Void IO () -> IO ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void IO () -> IO ())
-> ConduitT () Void IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [o] -> ConduitT () o IO ()
forall (m :: * -> *) a i. Monad m => [a] -> ConduitT i a m ()
CL.sourceList [o]
xs ConduitT () o IO ()
-> ConduitM o Void IO () -> ConduitT () Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| Putter o -> ConduitT o ByteString IO ()
forall (m :: * -> *) a.
Monad m =>
Putter a -> ConduitT a ByteString m ()
C.conduitPut Putter o
forall t. Serialize t => Putter t
put ConduitT o ByteString IO ()
-> ConduitM ByteString Void IO () -> ConduitM o Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| Handle -> ConduitM ByteString Void IO ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
CB.sinkHandle Handle
h
Handle -> IO ()
hClose Handle
h
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (FilePath, ReleaseKey) -> (FilePath, ReleaseKey) -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (FilePath, ReleaseKey)
filePath (FilePath
path, ReleaseKey
key)
exhaust :: TBQueue a -> STM [a]
exhaust :: TBQueue a -> STM [a]
exhaust TBQueue a
chan = STM Bool -> STM a -> STM [a]
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m [a]
whileM (Bool -> Bool
not (Bool -> Bool) -> STM Bool -> STM Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue a
chan) (TBQueue a -> STM a
forall a. TBQueue a -> STM a
readTBQueue TBQueue a
chan)
recv :: (MonadIO m) => TBQueue a -> m a
recv :: TBQueue a -> m a
recv TBQueue a
c = IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> (STM a -> IO a) -> STM a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM a -> m a) -> STM a -> m a
forall a b. (a -> b) -> a -> b
$ TBQueue a -> STM a
forall a. TBQueue a -> STM a
readTBQueue TBQueue a
c
send :: (MonadIO m) => TBQueue a -> a -> m ()
send :: TBQueue a -> a -> m ()
send TBQueue a
c = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (a -> IO ()) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> (a -> STM ()) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBQueue a -> a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue a
c
ccMap :: (∀ i1 . ConduitT i1 o1 m a -> ConduitT i1 o2 m a) -> CConduit i o1 m a -> CConduit i o2 m a
ccMap :: (forall i1. ConduitT i1 o1 m a -> ConduitT i1 o2 m a)
-> CConduit i o1 m a -> CConduit i o2 m a
ccMap forall i1. ConduitT i1 o1 m a -> ConduitT i1 o2 m a
f =
CConduit i o1 m a -> CConduit i o2 m a
go
where
go :: CConduit i o1 m a -> CConduit i o2 m a
go (Single ConduitT i o1 m a
c) =
ConduitT i o2 m a -> CConduit i o2 m a
forall i o (m :: * -> *) r. ConduitT i o m r -> CConduit i o m r
Single (ConduitT i o1 m a -> ConduitT i o2 m a
forall i1. ConduitT i1 o1 m a -> ConduitT i1 o2 m a
f ConduitT i o1 m a
c)
go (Multiple Natural
bufsz ConduitT i x m ()
l CConduit x o1 m a
r) =
Natural
-> ConduitT i x m () -> CConduit x o2 m a -> CConduit i o2 m a
forall i x (m :: * -> *) o r.
Natural
-> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r
Multiple Natural
bufsz ConduitT i x m ()
l ((forall i1. ConduitT i1 o1 m a -> ConduitT i1 o2 m a)
-> CConduit x o1 m a -> CConduit x o2 m a
forall o1 (m :: * -> *) a o2 i.
(forall i1. ConduitT i1 o1 m a -> ConduitT i1 o2 m a)
-> CConduit i o1 m a -> CConduit i o2 m a
ccMap forall i1. ConduitT i1 o1 m a -> ConduitT i1 o2 m a
f CConduit x o1 m a
r)