module Data.MediaBus.Stream
( HasPayload(..)
, FrameCtx(..)
, frameCtxSourceId
, frameCtxSeqNumRef
, frameCtxTimestampRef
, frameCtxInit
, Frame(..)
, frameSeqNum
, frameTimestamp
, framePayload
, Stream(..)
, stream
, yieldStreamish
, yieldStreamish'
, yieldNextFrame
, yieldNextFrame'
, yieldStartFrameCtx
, yieldStartFrameCtx'
, toFramesC
, overStreamC
, mapFramesC
, mapFramesC'
, mapSeqNumC
, mapTicksC
, mapTicksC'
, mapPayloadMC
, mapPayloadMC'
, mapPayloadC'
, convertTicksC'
, foldStream
, foldStreamM
, concatStreamContents
) where
import Conduit
import Control.Monad
import Control.Lens
import Data.MediaBus.Sequence
import Data.MediaBus.Payload
import Data.MediaBus.Ticks
import Data.MediaBus.Series
import Control.Monad.Writer.Strict ( tell )
import Data.Maybe
import Test.QuickCheck
import Data.Default
import Text.Printf
import GHC.TypeLits
import GHC.Generics ( Generic )
import Control.Parallel.Strategies ( NFData, rdeepseq, withStrategy )
data FrameCtx i s t p = MkFrameCtx { _frameCtxSourceId :: !i
, _frameCtxTimestampRef :: !t
, _frameCtxSeqNumRef :: !s
, _frameCtxInit :: !p
}
deriving (Eq, Ord, Generic)
instance (NFData i, NFData s, NFData t, NFData p) =>
NFData (FrameCtx i s t p)
makeLenses ''FrameCtx
instance HasTimestampT (FrameCtx i s t p) where
type GetTimestamp (FrameCtx i s t p) = t
type SetTimestamp (FrameCtx i s t p) t' = (FrameCtx i s t' p)
instance HasTimestamp (FrameCtx i s t p) where
timestamp = frameCtxTimestampRef
instance HasSeqNumT (FrameCtx i s t p) where
type GetSeqNum (FrameCtx i s t p) = s
type SetSeqNum (FrameCtx i s t p) x = FrameCtx i x t p
instance HasDuration (FrameCtx i s t p) where
getDuration _ = 0
instance HasSeqNum (FrameCtx i s t p) where
seqNum = frameCtxSeqNumRef
instance (Arbitrary i, Arbitrary s, Arbitrary t, Arbitrary p) =>
Arbitrary (FrameCtx i s t p) where
arbitrary = MkFrameCtx <$> arbitrary
<*> arbitrary
<*> arbitrary
<*> arbitrary
instance (Default i, Default s, Default t, Default p) =>
Default (FrameCtx i s t p) where
def = MkFrameCtx def def def def
instance (Show i, Show s, Show t, Show p) =>
Show (FrameCtx i s t p) where
show (MkFrameCtx sid tsr snr spr) =
printf "FRAME-CTX: %15s | %15s | %15s | %s"
(show sid)
(show snr)
(show tsr)
(show spr)
data Frame s t c = MkFrame { _frameTimestamp :: !t
, _frameSeqNum :: !s
, _framePayload :: !c
}
deriving (Eq, Ord, Generic)
instance (NFData c, NFData s, NFData t) =>
NFData (Frame s t c)
deriving instance Functor (Frame s t)
makeLenses ''Frame
instance HasPayload (Frame s t c) where
type GetPayload (Frame s t c) = c
type SetPayload (Frame s t c) d = Frame s t d
payload = framePayload
instance HasTimestampT (Frame s t c) where
type GetTimestamp (Frame s t c) = t
type SetTimestamp (Frame s t c) t' = Frame s t' c
instance HasTimestamp (Frame s t c) where
timestamp = frameTimestamp
instance HasSeqNumT (Frame s t c) where
type GetSeqNum (Frame s t c) = s
type SetSeqNum (Frame s t c) x = Frame x t c
instance HasSeqNum (Frame s t c) where
seqNum = frameSeqNum
instance HasDuration c =>
HasDuration (Frame s t c) where
getDuration = getDuration . _framePayload
instance (Arbitrary c, Arbitrary s, Arbitrary t) =>
Arbitrary (Frame s t c) where
arbitrary = MkFrame <$> arbitrary <*> arbitrary <*> arbitrary
instance (Default s, Default t, Default c) =>
Default (Frame s t c) where
def = MkFrame def def def
instance (Show s, Show t, Show v) =>
Show (Frame s t v) where
show (MkFrame ts sn v) =
printf "FRAME: %15s | %15s | %s" (show sn) (show ts) (show v)
newtype Stream i s t p c = MkStream { _stream :: Streamish i s t p c }
deriving (Ord, Eq, Arbitrary, Generic)
instance (NFData i, NFData s, NFData t, NFData c, NFData p) =>
NFData (Stream i s t p c)
type Streamish i s t p c = Series (FrameCtx i s t p) (Frame s t c)
makeLenses ''Stream
instance HasPayload (Stream i s t p c) where
type GetPayload (Stream i s t p c) = c
type SetPayload (Stream i s t p c) d = Stream i s t p d
payload = stream . _Next . payload
instance HasDuration c =>
HasDuration (Stream i s t p c) where
getDuration = maybe 0 getDuration . preview (stream . _Next)
instance HasSeqNumT (Stream i s t p c) where
type GetSeqNum (Stream i s t p c) = s
type SetSeqNum (Stream i s t p c) x = Stream i x t p c
instance HasSeqNum (Stream i s t p c) where
seqNum = stream . seqNum
instance HasTimestampT (Stream i s t p c) where
type GetTimestamp (Stream i s t p c) = t
type SetTimestamp (Stream i s t p c) t' = Stream i s t' p c
instance HasTimestamp (Stream i s t p c) where
timestamp = stream . timestamp
instance (Default c, Default s, Default t) =>
Default (Stream i s t p c) where
def = MkStream (Next (MkFrame def def def))
instance (Show i, Show s, Show t, Show c, Show p) =>
Show (Stream i s t p c) where
show (MkStream s) = show s
yieldStreamish :: Monad m
=> Streamish i s t p c
-> Conduit a m (Stream i s t p c)
yieldStreamish = yield . MkStream
yieldStreamish' :: (NFData i, NFData s, NFData t, NFData c, NFData p, Monad m)
=> Streamish i s t p c
-> Conduit a m (Stream i s t p c)
yieldStreamish' = yield . withStrategy rdeepseq . MkStream
yieldNextFrame :: Monad m => Frame s t c -> Conduit a m (Stream i s t p c)
yieldNextFrame = yieldStreamish . Next
yieldNextFrame' :: (NFData i, NFData s, NFData t, NFData c, NFData p, Monad m)
=> Frame s t c
-> Conduit a m (Stream i s t p c)
yieldNextFrame' = yieldStreamish' . Next
yieldStartFrameCtx :: Monad m
=> FrameCtx i s t p
-> Conduit a m (Stream i s t p c)
yieldStartFrameCtx = yieldStreamish . Start
yieldStartFrameCtx' :: (NFData i, NFData s, NFData t, NFData c, NFData p, NFData (FrameCtx i s t p), Monad m)
=> FrameCtx i s t p
-> Conduit a m (Stream i s t p c)
yieldStartFrameCtx' = yieldStreamish' . Start
overStreamC :: Monad m
=> Conduit (Series (FrameCtx i s t p) (Frame s t c)) m (Series (FrameCtx i' s' t' p') (Frame s' t' c'))
-> Conduit (Stream i s t p c) m (Stream i' s' t' p' c')
overStreamC = mapInput _stream (Just . MkStream) . mapOutput MkStream
toFramesC :: Monad m => Conduit (Stream i s t p c) m (Frame s t c)
toFramesC = awaitForever go
where
go (MkStream (Start _)) =
return ()
go (MkStream (Next !frm)) =
yield frm
mapFramesC' :: (NFData i, NFData s, NFData t, NFData c', Monad m)
=> (Frame s t c -> Frame s t c')
-> Conduit (Stream i s t p c) m (Stream i s t p c')
mapFramesC' !f = mapC (over (stream . _Next) (withStrategy rdeepseq f))
mapFramesC :: Monad m
=> (Frame s t c -> m (Frame s t c'))
-> Conduit (Stream i s t p c) m (Stream i s t p c')
mapFramesC !f = mapMC (mapMOf (stream . _Next) f)
mapSeqNumC :: Monad m
=> (s -> s')
-> Conduit (Stream i s t p c) m (Stream i s' t p c)
mapSeqNumC = mapC . over seqNum
mapTicksC :: Monad m
=> (t -> t')
-> Conduit (Stream i s t p c) m (Stream i s t' p c)
mapTicksC = mapC . over timestamp
mapTicksC' :: (NFData t, Monad m)
=> (t -> t')
-> Conduit (Stream i s t p c) m (Stream i s t' p c)
mapTicksC' = mapC . withStrategy rdeepseq . over timestamp
mapPayloadMC :: Monad m
=> (c -> m c')
-> Conduit (Stream i s t p c) m (Stream i s t p c')
mapPayloadMC = mapMC . mapMOf payload
mapPayloadMC' :: (NFData (Stream i s t p c'), Monad m)
=> (c -> m c')
-> Conduit (Stream i s t p c) m (Stream i s t p c')
mapPayloadMC' !f = mapMC (mapMOf payload f >=> return . withStrategy rdeepseq)
mapPayloadC' :: (NFData c', Monad m)
=> (c -> c')
-> Conduit (Stream i s t p c) m (Stream i s t p c')
mapPayloadC' !f = mapC (over payload (withStrategy rdeepseq . f))
convertTicksC' :: forall proxy0 proxy1 m r t r' t' i s c p.
(NFData t, NFData t', KnownNat r, KnownNat r', Integral t, Integral t', Monad m, NFData t')
=> proxy0 '(r, t)
-> proxy1 '(r', t')
-> Conduit (Stream i s (Ticks r t) p c) m (Stream i s (Ticks r' t') p c)
convertTicksC' _ _ = mapTicksC' convertTicks
foldStream :: (Monoid o, Monad m)
=> (Stream i s t p c -> o)
-> Sink (Stream i s t p c) m o
foldStream !f = execWriterC $
awaitForever $
tell .
f
foldStreamM :: (Monoid o, Monad m)
=> (Stream i s t p c -> m o)
-> Sink (Stream i s t p c) m o
foldStreamM !f = execWriterC $
awaitForever (lift . lift . f >=> tell)
concatStreamContents :: (Monoid c, Monad m) => Sink (Stream i s t p c) m c
concatStreamContents = foldStream (fromMaybe mempty .
(^? stream .
_Next .
framePayload))