module Data.MediaBus.Conduit.Stream
( yieldStreamish
, yieldStreamish'
, yieldNextFrame
, yieldNextFrame'
, yieldStartFrameCtx
, yieldStartFrameCtx'
, toFramesC
, overStreamC
, mapFramesC
, mapFramesC'
, mapSeqNumC
, mapTimestampC
, mapTimestampC'
, mapFrameContentMC
, mapFrameContentMC'
, mapFrameContentC'
, foldStream
, foldStreamM
, concatStreamContents
) where
import Conduit
import Control.Lens
import Control.Monad
import Control.Monad.Writer.Strict (tell)
import Control.Parallel.Strategies (NFData, rdeepseq, withStrategy)
import Data.Maybe
import Data.MediaBus.Basics.Sequence
import Data.MediaBus.Basics.Series
import Data.MediaBus.Basics.Ticks
import Data.MediaBus.Media.Stream
yieldStreamish
:: Monad m
=> Streamish i s t p c -> Conduit a m (Stream i s t p c)
yieldStreamish = yield . MkStream
yieldStreamish'
:: (NFData i, NFData s, NFData t, NFData c, NFData p, Monad m)
=> Streamish i s t p c -> Conduit a m (Stream i s t p c)
yieldStreamish' = yield . withStrategy rdeepseq . MkStream
yieldNextFrame
:: Monad m
=> Frame s t c -> Conduit a m (Stream i s t p c)
yieldNextFrame = yieldStreamish . Next
yieldNextFrame'
:: (NFData i, NFData s, NFData t, NFData c, NFData p, Monad m)
=> Frame s t c -> Conduit a m (Stream i s t p c)
yieldNextFrame' = yieldStreamish' . Next
yieldStartFrameCtx
:: Monad m
=> FrameCtx i s t p -> Conduit a m (Stream i s t p c)
yieldStartFrameCtx = yieldStreamish . Start
yieldStartFrameCtx'
:: ( NFData i
, NFData s
, NFData t
, NFData c
, NFData p
, NFData (FrameCtx i s t p)
, Monad m
)
=> FrameCtx i s t p -> Conduit a m (Stream i s t p c)
yieldStartFrameCtx' = yieldStreamish' . Start
overStreamC
:: Monad m
=> Conduit (Streamish i s t p c) m (Streamish i' s' t' p' c')
-> Conduit (Stream i s t p c) m (Stream i' s' t' p' c')
overStreamC = mapInput _stream (Just . MkStream) . mapOutput MkStream
toFramesC
:: Monad m
=> Conduit (Stream i s t p c) m (Frame s t c)
toFramesC = awaitForever go
where
go (MkStream (Start _)) = return ()
go (MkStream (Next !frm)) = yield frm
mapFramesC
:: Monad m
=> (Frame s t c -> m (Frame s t c'))
-> Conduit (Stream i s t p c) m (Stream i s t p c')
mapFramesC !f = mapMC (mapMOf (stream . _Next) f)
mapFramesC'
:: (NFData i, NFData s, NFData t, NFData c', Monad m)
=> (Frame s t c -> Frame s t c')
-> Conduit (Stream i s t p c) m (Stream i s t p c')
mapFramesC' !f = mapC (over (stream . _Next) (withStrategy rdeepseq f))
mapSeqNumC
:: Monad m
=> (s -> s') -> Conduit (Stream i s t p c) m (Stream i s' t p c)
mapSeqNumC = mapC . over seqNum
mapTimestampC
:: Monad m
=> (t -> t') -> Conduit (Stream i s t p c) m (Stream i s t' p c)
mapTimestampC = mapC . over timestamp
mapTimestampC'
:: (NFData t, Monad m)
=> (t -> t') -> Conduit (Stream i s t p c) m (Stream i s t' p c)
mapTimestampC' = mapC . withStrategy rdeepseq . over timestamp
mapFrameContentC'
:: (NFData c', Monad m)
=> (c -> c') -> Conduit (Stream i s t p c) m (Stream i s t p c')
mapFrameContentC' !f = mapC (over eachFrameContent (withStrategy rdeepseq . f))
mapFrameContentMC
:: Monad m
=> (c -> m c') -> Conduit (Stream i s t p c) m (Stream i s t p c')
mapFrameContentMC = mapMC . mapMOf eachFrameContent
mapFrameContentMC'
:: (NFData (Stream i s t p c'), Monad m)
=> (c -> m c') -> Conduit (Stream i s t p c) m (Stream i s t p c')
mapFrameContentMC' !f =
mapMC (mapMOf eachFrameContent f >=> return . withStrategy rdeepseq)
foldStream
:: (Monoid o, Monad m)
=> (Stream i s t p c -> o) -> Sink (Stream i s t p c) m o
foldStream !f = execWriterC $ awaitForever $ tell . f
foldStreamM
:: (Monoid o, Monad m)
=> (Stream i s t p c -> m o) -> Sink (Stream i s t p c) m o
foldStreamM !f = execWriterC $ awaitForever (lift . lift . f >=> tell)
concatStreamContents
:: (Monoid c, Monad m)
=> Sink (Stream i s t p c) m c
concatStreamContents =
foldStream (fromMaybe mempty . (^? stream . _Next . framePayload))