{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}
module Box.Transducer
( Transducer (..),
etc,
etcM,
asPipe,
)
where
import Box.Box
import Box.Committer
import Box.Cont
import Box.Emitter
import Box.Stream
import Control.Category (Category (..))
import Control.Lens hiding ((.>), (:>), (<|), (|>))
import Control.Monad.Base (MonadBase, liftBase)
import Control.Monad.Conc.Class as C
import Control.Monad.Trans.State.Lazy
import qualified Pipes
import qualified Pipes.Prelude as Pipes
import Streaming (Of (..), Stream)
import qualified Streaming.Prelude as S
import Prelude hiding ((.), id)
newtype Transducer s a b
= Transducer
{ transduce ::
forall m.
Monad m =>
Stream (Of a) (StateT s m) () ->
Stream (Of b) (StateT s m) ()
}
instance Category (Transducer s) where
(Transducer t1) . (Transducer t2) = Transducer (t1 . t2)
id = Transducer id
asPipe ::
(Monad m) =>
Pipes.Pipe a b (StateT s m) () ->
(Stream (Of a) (StateT s m) () -> Stream (Of b) (StateT s m) ())
asPipe p s = ((s & Pipes.unfoldr S.next) Pipes.>-> p) & S.unfoldr Pipes.next
etc :: (MonadConc m) => s -> Transducer s a b -> Cont m (Box (C.STM m) b a) -> m s
etc st t box =
with box $ \(Box c e) ->
(e & toStream & transduce t & fromStream) c & flip execStateT st
etcM :: (MonadConc m, MonadBase m m) => s -> Transducer s a b -> Cont m (Box m b a) -> m s
etcM st t box =
with box $ \(Box c e) ->
(liftE' e & toStreamM & transduce t & fromStreamM) (liftC' c) & flip execStateT st
where
liftC' c = Committer $ liftBase . commit c
liftE' = Emitter . liftBase . emit