{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}
module Box.Stream
( toStream,
fromStream,
toCommit,
toCommitFold,
toCommitSink,
toEmit,
queueStream,
toStreamM,
fromStreamM,
)
where
import Prelude
import Box.Committer
import Box.Cont
import Box.Emitter
import Box.Queue
import qualified Control.Foldl as L
import Control.Monad
import Control.Monad.Conc.Class as C
import Streaming (Of (..), Stream)
import qualified Streaming.Prelude as S
toCommit :: (MonadConc m) => (Stream (Of a) m () -> m r) -> Cont m (Committer (STM m) a)
toCommit f =
Cont (\c -> queueC' c (\(Emitter o) -> f . toStream . Emitter $ o))
toCommitFold :: (MonadConc m) => L.FoldM m a () -> Cont m (Committer (STM m) a)
toCommitFold f = toCommit (fmap S.snd' . L.impurely S.foldM f)
toCommitSink :: (MonadConc m) => (a -> m ()) -> Cont m (Committer (STM m) a)
toCommitSink sink = toCommitFold (L.FoldM step begin done)
where
step x a = do
sink a
pure x
begin = pure ()
done = pure
toEmit :: (MonadConc m) => Stream (Of a) m () -> Cont m (Emitter (STM m) a)
toEmit s = Cont (queueE' (fromStream s))
queueStream ::
(MonadConc m) => Stream (Of a) m () -> Cont m (Stream (Of a) m ())
queueStream i = Cont $ \o -> queueE' (fromStream i) (o . toStream)
toStream :: (MonadConc m) => Emitter (STM m) a -> Stream (Of a) m ()
toStream e = toStreamM (liftE e)
toStreamM :: (MonadConc m) => Emitter m a -> Stream (Of a) m ()
toStreamM e = S.untilRight getNext
where
getNext = maybe (Right ()) Left <$> emit e
fromStream :: (MonadConc m) => Stream (Of b) m () -> Committer (STM m) b -> m ()
fromStream s c = fromStreamM s (liftC c)
fromStreamM :: (MonadConc m) => Stream (Of b) m () -> Committer m b -> m ()
fromStreamM s c = go s
where
go str = do
eNxt <- S.next str
forM_ eNxt $ \(a, str') -> do
continue <- commit c a
when continue (go str')