module SDR.PipeUtils (
fork,
combine,
printStream,
devnull,
rate,
pMapAccum
) where
import Data.Time.Clock
import Pipes
import Control.Monad
fork :: Monad m => Producer a m r -> Producer a (Producer a m) r
fork prod = runEffect $ hoist (lift . lift) prod >-> fork'
where
fork' = forever $ do
res <- await
lift $ yield res
lift $ lift $ yield res
combine :: Monad m => Consumer a m r -> Consumer a m r -> Consumer a m r
combine x y = runEffect $ runEffect (fork func >-> hoist (lift . lift) x) >-> hoist lift y
where
func :: Monad m => Producer a (Consumer a m) r
func = forever $ lift await >>= yield
printStream :: (Show a) => Int -> Consumer a IO ()
printStream samples = for cat $ lift . print
devnull :: Monad m => Consumer a m ()
devnull = forever await
rate :: Int -> Pipe a a IO b
rate samples = do
start <- lift getCurrentTime
let rate' buffers = do
res <- await
time <- lift getCurrentTime
let diff = diffUTCTime time start
diffSecs :: Double
diffSecs = fromRational $ toRational diff
lift $ print $ buffers * fromIntegral samples / diffSecs
yield res
rate' (buffers + 1)
rate' 1
pMapAccum :: (Monad m)
=> (acc -> x -> (acc, y))
-> acc
-> Pipe x y m ()
pMapAccum func acc = go acc
where
go acc = do
dat <- await
let (acc', res) = func acc dat
yield res
go acc'