{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}
module Box.Transducer
( Transducer (..),
etc,
toCommitter,
toEmitter,
toStream,
fromStream,
foldCommitter,
sinkCommitter,
)
where
import Box.Box
import Box.Committer
import Box.Cont
import Box.Emitter
import Box.Queue
import qualified Control.Foldl as L
import Control.Lens hiding ((.>), (:>), (<|), (|>))
import Control.Monad.Conc.Class as C
import NumHask.Prelude hiding (STM, atomically)
import Streaming (Of (..), Stream)
import qualified Streaming.Prelude as S
newtype Transducer s a b
= Transducer
{ transduce ::
forall m.
Monad m =>
Stream (Of a) (StateT s m) () ->
Stream (Of b) (StateT s m) ()
}
instance Category (Transducer s) where
(Transducer t1) . (Transducer t2) = Transducer (t1 . t2)
id = Transducer id
etc :: Monad m => s -> Transducer s a b -> Box m b a -> m s
etc st t (Box c e) =
(e & hoist lift & toStream & transduce t & fromStream) (hoist lift c) & flip execStateT st
toCommitter :: (MonadConc m) => (Stream (Of a) m () -> m r) -> Cont m (Committer m a)
toCommitter f =
Cont (\c -> queueC c (\(Emitter o) -> f . toStream . Emitter $ o))
toEmitter :: (MonadConc m) => Stream (Of b) m () -> Cont m (Emitter m b)
toEmitter s = Cont (queueE (fromStream s))
toStream :: (Monad m) => Emitter m a -> Stream (Of a) m ()
toStream e = S.untilRight getNext
where
getNext = maybe (Right ()) Left <$> emit e
fromStream :: (Monad m) => Stream (Of b) m () -> Committer m b -> m ()
fromStream s c = go s
where
go str = do
eNxt <- S.next str
forM_ eNxt $ \(a, str') -> do
continue <- commit c a
when continue (go str')
foldCommitter :: (MonadConc m) => L.FoldM m a () -> Cont m (Committer m a)
foldCommitter f = toCommitter (fmap S.snd' . L.impurely S.foldM f)
sinkCommitter :: (MonadConc m) => (a -> m ()) -> Cont m (Committer m a)
sinkCommitter sink = foldCommitter (L.FoldM step begin done)
where
step x a = do
sink a
pure x
begin = pure ()
done = pure