module Data.MediaBus.Conduit.Async
( withAsyncPolledSource
, FrameContentQ()
, mkFrameContentQ
, frameContentQSink
, frameContentQSource
) where
import Conduit
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async.Lifted
import Control.Concurrent.STM
import Control.Exception (evaluate)
import Control.Lens
import Control.Monad.Logger
import Control.Monad.State
import Control.Parallel.Strategies (NFData, rdeepseq, withStrategy)
import Data.Default
import Data.MediaBus.Basics.Clock
import Data.MediaBus.Basics.Ticks
import Data.MediaBus.Conduit.Stream
import Data.MediaBus.Media.Discontinous
import Data.MediaBus.Media.Stream
import Data.Proxy
import Data.Time.Clock
import System.Random
import Text.Printf
import Data.String
data PollFrameContentSourceSt s t = MkPollFrameContentSourceSt
{ _ppSeqNum :: !s
, _ppTicks :: !t
}
makeLenses ''PollFrameContentSourceSt
withAsyncPolledSource
:: ( MonadResource m
, MonadLogger m
, MonadBaseControl IO m
, KnownRate r
, Integral t
, Integral s
, Default p
, HasStaticDuration c
, HasDuration c
, NFData c
, NFData p
, NFData s
, NFData t
, Random i
, Random t
, Random s
, Show c
)
=> Int
-> Source m (Stream i s (Ticks r t) p c)
-> ((Async (), Source m (Stream i s (Ticks r t) p (Discontinous c))) -> m o)
-> m o
withAsyncPolledSource !frameQueueLen !src !f = do
!pq <- mkFrameContentQ frameQueueLen
withAsync
(runConduit (src .| frameContentQSink pq))
(\a -> f (void a, frameContentQSource pq))
data FrameContentQ a = MkFrameContentQ
{ _frameContentQSegmentDuration :: !NominalDiffTime
, _frameContentQPollInterval :: !NominalDiffTime
, _frameContentQRing :: !(TBQueue a)
}
mkFrameContentQ
:: forall m a.
(HasStaticDuration a, MonadBaseControl IO m)
=> Int -> m (FrameContentQ a)
mkFrameContentQ qlen =
MkFrameContentQ segmentDuration (fromIntegral qlen * 0.5 * segmentDuration) <$>
liftBase (newTBQueueIO qlen)
where
segmentDuration = getStaticDuration (Proxy :: Proxy a)
frameContentQSink
:: (NFData a, MonadBaseControl IO m, Show a, MonadLogger m)
=> FrameContentQ a -> Sink (Stream i s t p a) m ()
frameContentQSink (MkFrameContentQ _ _ !ringRef) = awaitForever go
where
go !x = do
maybe (return ()) pushInRing (x ^? eachFrameContent)
return ()
where
pushInRing !buf' = do
isFull <-
liftBase $ do
!buf <- evaluate $ withStrategy rdeepseq buf'
atomically $ do
isFull <- isFullTBQueue ringRef
when isFull (void $ readTBQueue ringRef)
writeTBQueue ringRef buf
return isFull
when isFull $ $logInfo "queue full"
frameContentQSource
:: ( Random i
, NFData c
, NFData p
, Default p
, HasStaticDuration c
, HasDuration c
, MonadBaseControl IO m
, MonadLogger m
, KnownRate r
, Integral t
, Integral s
, NFData t
, NFData s
)
=> FrameContentQ c -> Source m (Stream i s (Ticks r t) p (Discontinous c))
frameContentQSource (MkFrameContentQ pTime pollIntervall ringRef) =
evalStateC (MkPollFrameContentSourceSt 0 0) $ do
yieldStart
go False
where
go wasMissing = do
res <- liftBase $ race (atomically $ readTBQueue ringRef) sleep
case res of
Left buf -> yieldNextBuffer (Got buf) >> go False
Right dt -> yieldMissing dt wasMissing >> go True
sleep =
liftBase
(do !(t0 :: ClockTime UtcClock) <- now
threadDelay (_ticks pollIntervallMicros)
!t1 <- now
return (diffTime t1 t0 ^. utcClockTimeDiff))
yieldMissing !dt !wasMissing = do
unless
wasMissing
($logDebug (fromString (printf "underflow: %s" (show dt))))
replicateM_ (floor (dt / pTime)) (yieldNextBuffer Missing)
yieldStart =
(MkFrameCtx <$> liftBase randomIO <*> use ppTicks <*> use ppSeqNum <*>
pure def) >>=
yieldStartFrameCtx
pollIntervallMicros :: Ticks (Hz 1000000) Int
pollIntervallMicros = nominalDiffTime # pollIntervall
yieldNextBuffer !buf = do
let !bufferDuration = nominalDiffTime # getDuration buf
!ts <- ppTicks <<+= bufferDuration
!sn <- ppSeqNum <<+= 1
frm <- liftBase (evaluate (withStrategy rdeepseq $ MkFrame ts sn buf))
yieldNextFrame frm