{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE MagicHash #-}
#if __GLASGOW_HASKELL__ >= 801
{-# LANGUAGE TypeApplications #-}
#endif
#include "inline.hs"
module Streamly.Internal.Data.Stream.StreamD
(
Step (..)
#if __GLASGOW_HASKELL__ >= 800
, Stream (Stream, UnStream)
#else
, Stream (UnStream)
, pattern Stream
#endif
, nil
, nilM
, cons
, uncons
, unfoldr
, unfoldrM
, unfold
, repeat
, repeatM
, replicate
, replicateM
, fromIndices
, fromIndicesM
, generate
, generateM
, iterate
, iterateM
, enumerateFromStepIntegral
, enumerateFromIntegral
, enumerateFromThenIntegral
, enumerateFromToIntegral
, enumerateFromThenToIntegral
, enumerateFromStepNum
, numFrom
, numFromThen
, enumerateFromToFractional
, enumerateFromThenToFractional
, currentTime
, yield
, yieldM
, fromList
, fromListM
, fromStreamK
, fromStreamD
, fromPrimVar
, fromSVar
, foldrS
, foldrT
, foldrM
, foldrMx
, foldr
, foldr1
, foldl'
, foldlM'
, foldlS
, foldlT
, reverse
, reverse'
, foldlx'
, foldlMx'
, runFold
, parselMx'
, splitParse
, tap
, tapOffsetEvery
, tapAsync
, tapRate
, pollCounts
, drain
, null
, head
, headElse
, tail
, last
, elem
, notElem
, all
, any
, maximum
, maximumBy
, minimum
, minimumBy
, findIndices
, lookup
, findM
, find
, (!!)
, toSVarParallel
, concatMapM
, concatMap
, ConcatMapUState (..)
, concatMapU
, ConcatUnfoldInterleaveState (..)
, concatUnfoldInterleave
, concatUnfoldRoundrobin
, AppendState(..)
, append
, InterleaveState(..)
, interleave
, interleaveMin
, interleaveSuffix
, interleaveInfix
, roundRobin
, gintercalateSuffix
, interposeSuffix
, gintercalate
, interpose
, groupsOf
, groupsOf2
, groupsBy
, groupsRollingBy
, splitBy
, splitSuffixBy
, wordsBy
, splitSuffixBy'
, splitOn
, splitSuffixOn
, splitInnerBy
, splitInnerBySuffix
, isPrefixOf
, isSubsequenceOf
, stripPrefix
, mapM_
, toList
, toListRev
, toStreamK
, toStreamD
, hoist
, generally
, liftInner
, runReaderT
, evalStateT
, runStateT
, transform
, scanlM'
, scanl'
, scanlM
, scanl
, scanl1M'
, scanl1'
, scanl1M
, scanl1
, prescanl'
, prescanlM'
, postscanl
, postscanlM
, postscanl'
, postscanlM'
, postscanlx'
, postscanlMx'
, scanlMx'
, scanlx'
, filter
, filterM
, uniq
, take
, takeByTime
, takeWhile
, takeWhileM
, drop
, dropByTime
, dropWhile
, dropWhileM
, map
, mapM
, sequence
, rollingMap
, rollingMapM
, intersperseM
, intersperse
, intersperseSuffix
, intersperseSuffixBySpan
, insertBy
, deleteBy
, mapMaybe
, mapMaybeM
, indexed
, indexedR
, zipWith
, zipWithM
, eqBy
, cmpBy
, mergeBy
, mergeByM
, the
, newFinalizedIORef
, runIORefFinalizer
, clearIORefFinalizer
, gbracket
, before
, after
, afterIO
, bracket
, bracketIO
, onException
, finally
, finallyIO
, handle
, mkParallel
, mkParallelD
, newCallbackStream
, lastN
)
where
import Control.Concurrent (killThread, myThreadId, takeMVar, threadDelay)
import Control.Exception
(assert, Exception, SomeException, AsyncException, fromException, mask_)
import Control.Monad (void, when, forever)
import Control.Monad.Catch (MonadCatch, MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader (ReaderT)
import Control.Monad.State.Strict (StateT)
import Control.Monad.Trans (MonadTrans(lift))
import Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp_)
import Data.Bits (shiftR, shiftL, (.|.), (.&.))
import Data.Functor.Identity (Identity(..))
import Data.Int (Int64)
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef, IORef)
import Data.Maybe (fromJust, isJust, isNothing)
import Data.Word (Word32)
import Foreign.Ptr (Ptr)
import Foreign.Storable (Storable(..))
import GHC.Types (SPEC(..))
import System.Mem (performMajorGC)
import Prelude
hiding (map, mapM, mapM_, repeat, foldr, last, take, filter,
takeWhile, drop, dropWhile, all, any, maximum, minimum, elem,
notElem, null, head, tail, zipWith, lookup, foldr1, sequence,
(!!), scanl, scanl1, concatMap, replicate, enumFromTo, concat,
reverse, iterate, splitAt)
import qualified Control.Monad.Catch as MC
import qualified Control.Monad.Reader as Reader
import qualified Control.Monad.State.Strict as State
import qualified Prelude
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Mutable.Prim.Var
(Prim, Var, readVar, newVar, modifyVar')
import Streamly.Internal.Data.Time.Units
(TimeUnit64, toRelTime64, diffAbsTime64)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
import Streamly.Internal.Memory.Array.Types (Array(..))
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.Parser.Types (Parser(..), ParseError(..))
import Streamly.Internal.Data.Pipe.Types (Pipe(..), PipeState(..))
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
(MicroSecond64(..), fromAbsTime, toAbsTime, AbsTime)
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Data.Strict (Tuple3'(..))
import Streamly.Internal.Data.Stream.StreamD.Type
import Streamly.Internal.Data.SVar
import Streamly.Internal.Data.Stream.SVar (fromConsumer, pushToFold)
import qualified Streamly.Internal.Data.Pipe.Types as Pipe
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Memory.Ring as RB
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Parser.Types as PR
{-# INLINE_NORMAL nil #-}
nil :: Monad m => Stream m a
nil = Stream (\_ _ -> return Stop) ()
{-# INLINE_NORMAL nilM #-}
nilM :: Monad m => m b -> Stream m a
nilM m = Stream (\_ _ -> m >> return Stop) ()
{-# INLINE_NORMAL consM #-}
consM :: Monad m => m a -> Stream m a -> Stream m a
consM m (Stream step state) = Stream step1 Nothing
where
{-# INLINE_LATE step1 #-}
step1 _ Nothing = m >>= \x -> return $ Yield x (Just state)
step1 gst (Just st) = do
r <- step gst st
return $
case r of
Yield a s -> Yield a (Just s)
Skip s -> Skip (Just s)
Stop -> Stop
{-# INLINE_NORMAL cons #-}
cons :: Monad m => a -> Stream m a -> Stream m a
cons x (Stream step state) = Stream step1 Nothing
where
{-# INLINE_LATE step1 #-}
step1 _ Nothing = return $ Yield x (Just state)
step1 gst (Just st) = do
r <- step gst st
return $
case r of
Yield a s -> Yield a (Just s)
Skip s -> Skip (Just s)
Stop -> Stop
{-# INLINE_NORMAL uncons #-}
uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a))
uncons (UnStream step state) = go state
where
go st = do
r <- step defState st
case r of
Yield x s -> return $ Just (x, Stream step s)
Skip s -> go s
Stop -> return Nothing
{-# INLINE_NORMAL unfoldrM #-}
unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a
unfoldrM next state = Stream step state
where
{-# INLINE_LATE step #-}
step _ st = do
r <- next st
return $ case r of
Just (x, s) -> Yield x s
Nothing -> Stop
{-# INLINE_LATE unfoldr #-}
unfoldr :: Monad m => (s -> Maybe (a, s)) -> s -> Stream m a
unfoldr f = unfoldrM (return . f)
{-# INLINE_NORMAL unfold #-}
unfold :: Monad m => Unfold m a b -> a -> Stream m b
unfold (Unfold ustep inject) seed = Stream step Nothing
where
{-# INLINE_LATE step #-}
step _ Nothing = inject seed >>= return . Skip . Just
step _ (Just st) = do
r <- ustep st
return $ case r of
Yield x s -> Yield x (Just s)
Skip s -> Skip (Just s)
Stop -> Stop
{-# INLINE_NORMAL repeatM #-}
repeatM :: Monad m => m a -> Stream m a
repeatM x = Stream (\_ _ -> x >>= \r -> return $ Yield r ()) ()
{-# INLINE_NORMAL repeat #-}
repeat :: Monad m => a -> Stream m a
repeat x = Stream (\_ _ -> return $ Yield x ()) ()
{-# INLINE_NORMAL iterateM #-}
iterateM :: Monad m => (a -> m a) -> m a -> Stream m a
iterateM step = Stream (\_ st -> st >>= \x -> return $ Yield x (step x))
{-# INLINE_NORMAL iterate #-}
iterate :: Monad m => (a -> a) -> a -> Stream m a
iterate step st = iterateM (return . step) (return st)
{-# INLINE_NORMAL replicateM #-}
replicateM :: forall m a. Monad m => Int -> m a -> Stream m a
replicateM n p = Stream step n
where
{-# INLINE_LATE step #-}
step _ (i :: Int)
| i <= 0 = return Stop
| otherwise = do
x <- p
return $ Yield x (i - 1)
{-# INLINE_NORMAL replicate #-}
replicate :: Monad m => Int -> a -> Stream m a
replicate n x = replicateM n (return x)
{-# INLINE_NORMAL enumerateFromStepIntegral #-}
enumerateFromStepIntegral :: (Integral a, Monad m) => a -> a -> Stream m a
enumerateFromStepIntegral from stride =
from `seq` stride `seq` Stream step from
where
{-# INLINE_LATE step #-}
step _ !x = return $ Yield x $! (x + stride)
{-# INLINE enumerateFromToIntegral #-}
enumerateFromToIntegral :: (Monad m, Integral a) => a -> a -> Stream m a
enumerateFromToIntegral from to =
takeWhile (<= to) $ enumerateFromStepIntegral from 1
{-# INLINE enumerateFromIntegral #-}
enumerateFromIntegral :: (Monad m, Integral a, Bounded a) => a -> Stream m a
enumerateFromIntegral from = enumerateFromToIntegral from maxBound
data EnumState a = EnumInit | EnumYield a a a | EnumStop
{-# INLINE_NORMAL enumerateFromThenToIntegralUp #-}
enumerateFromThenToIntegralUp
:: (Monad m, Integral a)
=> a -> a -> a -> Stream m a
enumerateFromThenToIntegralUp from next to = Stream step EnumInit
where
{-# INLINE_LATE step #-}
step _ EnumInit =
return $
if to < next
then if to < from
then Stop
else Yield from EnumStop
else
let stride = next - from
in Skip $ EnumYield from stride (to - stride)
step _ (EnumYield x stride toMinus) =
return $
if x > toMinus
then Yield x EnumStop
else Yield x $ EnumYield (x + stride) stride toMinus
step _ EnumStop = return Stop
{-# INLINE_NORMAL enumerateFromThenToIntegralDn #-}
enumerateFromThenToIntegralDn
:: (Monad m, Integral a)
=> a -> a -> a -> Stream m a
enumerateFromThenToIntegralDn from next to = Stream step EnumInit
where
{-# INLINE_LATE step #-}
step _ EnumInit =
return $ if to > next
then if to > from
then Stop
else Yield from EnumStop
else
let stride = next - from
in Skip $ EnumYield from stride (to - stride)
step _ (EnumYield x stride toMinus) =
return $
if x < toMinus
then Yield x EnumStop
else Yield x $ EnumYield (x + stride) stride toMinus
step _ EnumStop = return Stop
{-# INLINE_NORMAL enumerateFromThenToIntegral #-}
enumerateFromThenToIntegral
:: (Monad m, Integral a)
=> a -> a -> a -> Stream m a
enumerateFromThenToIntegral from next to
| next >= from = enumerateFromThenToIntegralUp from next to
| otherwise = enumerateFromThenToIntegralDn from next to
{-# INLINE_NORMAL enumerateFromThenIntegral #-}
enumerateFromThenIntegral
:: (Monad m, Integral a, Bounded a)
=> a -> a -> Stream m a
enumerateFromThenIntegral from next =
if next > from
then enumerateFromThenToIntegralUp from next maxBound
else enumerateFromThenToIntegralDn from next minBound
{-# INLINE_NORMAL enumerateFromStepNum #-}
enumerateFromStepNum :: (Monad m, Num a) => a -> a -> Stream m a
enumerateFromStepNum from stride = Stream step 0
where
{-# INLINE_LATE step #-}
step _ !i = return $ (Yield $! (from + i * stride)) $! (i + 1)
{-# INLINE_NORMAL numFrom #-}
numFrom :: (Monad m, Num a) => a -> Stream m a
numFrom from = enumerateFromStepNum from 1
{-# INLINE_NORMAL numFromThen #-}
numFromThen :: (Monad m, Num a) => a -> a -> Stream m a
numFromThen from next = enumerateFromStepNum from (next - from)
{-# INLINE_NORMAL enumerateFromToFractional #-}
enumerateFromToFractional
:: (Monad m, Fractional a, Ord a)
=> a -> a -> Stream m a
enumerateFromToFractional from to =
takeWhile (<= to + 1 / 2) $ enumerateFromStepNum from 1
{-# INLINE_NORMAL enumerateFromThenToFractional #-}
enumerateFromThenToFractional
:: (Monad m, Fractional a, Ord a)
=> a -> a -> a -> Stream m a
enumerateFromThenToFractional from next to =
takeWhile predicate $ numFromThen from next
where
mid = (next - from) / 2
predicate | next >= from = (<= to + mid)
| otherwise = (>= to + mid)
{-# INLINE_NORMAL fromIndicesM #-}
fromIndicesM :: Monad m => (Int -> m a) -> Stream m a
fromIndicesM gen = Stream step 0
where
{-# INLINE_LATE step #-}
step _ i = do
x <- gen i
return $ Yield x (i + 1)
{-# INLINE fromIndices #-}
fromIndices :: Monad m => (Int -> a) -> Stream m a
fromIndices gen = fromIndicesM (return . gen)
{-# INLINE_NORMAL generateM #-}
generateM :: Monad m => Int -> (Int -> m a) -> Stream m a
generateM n gen = n `seq` Stream step 0
where
{-# INLINE_LATE step #-}
step _ i | i < n = do
x <- gen i
return $ Yield x (i + 1)
| otherwise = return Stop
{-# INLINE generate #-}
generate :: Monad m => Int -> (Int -> a) -> Stream m a
generate n gen = generateM n (return . gen)
{-# INLINE_LATE fromListM #-}
fromListM :: MonadAsync m => [m a] -> Stream m a
fromListM = Stream step
where
{-# INLINE_LATE step #-}
step _ (m:ms) = m >>= \x -> return $ Yield x ms
step _ [] = return Stop
{-# INLINE toStreamD #-}
toStreamD :: (K.IsStream t, Monad m) => t m a -> Stream m a
toStreamD = fromStreamK . K.toStream
{-# INLINE_NORMAL fromPrimVar #-}
fromPrimVar :: (MonadIO m, Prim a) => Var IO a -> Stream m a
fromPrimVar var = Stream step ()
where
{-# INLINE_LATE step #-}
step _ () = liftIO (readVar var) >>= \x -> return $ Yield x ()
data FromSVarState t m a =
FromSVarInit
| FromSVarRead (SVar t m a)
| FromSVarLoop (SVar t m a) [ChildEvent a]
| FromSVarDone (SVar t m a)
{-# INLINE_NORMAL fromSVar #-}
fromSVar :: (MonadAsync m) => SVar t m a -> Stream m a
fromSVar svar = Stream step FromSVarInit
where
{-# INLINE_LATE step #-}
step _ FromSVarInit = do
ref <- liftIO $ newIORef ()
_ <- liftIO $ mkWeakIORef ref hook
let sv = svar{svarRef = Just ref}
return $ Skip (FromSVarRead sv)
where
{-# NOINLINE hook #-}
hook = do
when (svarInspectMode svar) $ do
r <- liftIO $ readIORef (svarStopTime (svarStats svar))
when (isNothing r) $
printSVar svar "SVar Garbage Collected"
cleanupSVar svar
when (svarInspectMode svar) performMajorGC
step _ (FromSVarRead sv) = do
list <- readOutputQ sv
return $ Skip $ FromSVarLoop sv (Prelude.reverse list)
step _ (FromSVarLoop sv []) = do
done <- postProcess sv
return $ Skip $ if done
then (FromSVarDone sv)
else (FromSVarRead sv)
step _ (FromSVarLoop sv (ev : es)) = do
case ev of
ChildYield a -> return $ Yield a (FromSVarLoop sv es)
ChildStop tid e -> do
accountThread sv tid
case e of
Nothing -> do
stop <- shouldStop tid
if stop
then do
liftIO (cleanupSVar sv)
return $ Skip (FromSVarDone sv)
else return $ Skip (FromSVarLoop sv es)
Just ex ->
case fromException ex of
Just ThreadAbort ->
return $ Skip (FromSVarLoop sv es)
Nothing -> liftIO (cleanupSVar sv) >> throwM ex
where
shouldStop tid =
case svarStopStyle sv of
StopNone -> return False
StopAny -> return True
StopBy -> do
sid <- liftIO $ readIORef (svarStopBy sv)
return $ if tid == sid then True else False
step _ (FromSVarDone sv) = do
when (svarInspectMode sv) $ do
t <- liftIO $ getTime Monotonic
liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t)
liftIO $ printSVar sv "SVar Done"
return Stop
{-# INLINE_NORMAL fromProducer #-}
fromProducer :: (MonadAsync m) => SVar t m a -> Stream m a
fromProducer svar = Stream step (FromSVarRead svar)
where
{-# INLINE_LATE step #-}
step _ (FromSVarRead sv) = do
list <- readOutputQ sv
return $ Skip $ FromSVarLoop sv (Prelude.reverse list)
step _ (FromSVarLoop sv []) = return $ Skip $ FromSVarRead sv
step _ (FromSVarLoop sv (ev : es)) = do
case ev of
ChildYield a -> return $ Yield a (FromSVarLoop sv es)
ChildStop tid e -> do
accountThread sv tid
case e of
Nothing -> do
sendStopToProducer sv
return $ Skip (FromSVarDone sv)
Just _ -> error "Bug: fromProducer: received exception"
step _ (FromSVarDone sv) = do
when (svarInspectMode sv) $ do
t <- liftIO $ getTime Monotonic
liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t)
liftIO $ printSVar sv "SVar Done"
return Stop
step _ FromSVarInit = undefined
{-# INLINE_NORMAL hoist #-}
hoist :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a
hoist f (Stream step state) = (Stream step' state)
where
{-# INLINE_LATE step' #-}
step' gst st = do
r <- f $ step (adaptState gst) st
return $ case r of
Yield x s -> Yield x s
Skip s -> Skip s
Stop -> Stop
{-# INLINE generally #-}
generally :: Monad m => Stream Identity a -> Stream m a
generally = hoist (return . runIdentity)
{-# INLINE_NORMAL liftInner #-}
liftInner :: (Monad m, MonadTrans t, Monad (t m))
=> Stream m a -> Stream (t m) a
liftInner (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' gst st = do
r <- lift $ step (adaptState gst) st
return $ case r of
Yield x s -> Yield x s
Skip s -> Skip s
Stop -> Stop
{-# INLINE_NORMAL runReaderT #-}
runReaderT :: Monad m => s -> Stream (ReaderT s m) a -> Stream m a
runReaderT sval (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' gst st = do
r <- Reader.runReaderT (step (adaptState gst) st) sval
return $ case r of
Yield x s -> Yield x s
Skip s -> Skip s
Stop -> Stop
{-# INLINE_NORMAL evalStateT #-}
evalStateT :: Monad m => s -> Stream (StateT s m) a -> Stream m a
evalStateT sval (Stream step state) = Stream step' (state, sval)
where
{-# INLINE_LATE step' #-}
step' gst (st, sv) = do
(r, sv') <- State.runStateT (step (adaptState gst) st) sv
return $ case r of
Yield x s -> Yield x (s, sv')
Skip s -> Skip (s, sv')
Stop -> Stop
{-# INLINE_NORMAL runStateT #-}
runStateT :: Monad m => s -> Stream (StateT s m) a -> Stream m (s, a)
runStateT sval (Stream step state) = Stream step' (state, sval)
where
{-# INLINE_LATE step' #-}
step' gst (st, sv) = do
(r, sv') <- State.runStateT (step (adaptState gst) st) sv
return $ case r of
Yield x s -> Yield (sv', x) (s, sv')
Skip s -> Skip (s, sv')
Stop -> Stop
{-# INLINE_NORMAL foldr1 #-}
foldr1 :: Monad m => (a -> a -> a) -> Stream m a -> m (Maybe a)
foldr1 f m = do
r <- uncons m
case r of
Nothing -> return Nothing
Just (h, t) -> fmap Just (foldr f h t)
{-# INLINE_NORMAL foldlT #-}
foldlT :: (Monad m, Monad (s m), MonadTrans s)
=> (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b
foldlT fstep begin (Stream step state) = go SPEC begin state
where
go !_ acc st = do
r <- lift $ step defState st
case r of
Yield x s -> go SPEC (fstep acc x) s
Skip s -> go SPEC acc s
Stop -> acc
{-# INLINE_NORMAL foldlS #-}
foldlS :: Monad m
=> (Stream m b -> a -> Stream m b) -> Stream m b -> Stream m a -> Stream m b
foldlS fstep begin (Stream step state) = Stream step' (Left (state, begin))
where
step' gst (Left (st, acc)) = do
r <- step (adaptState gst) st
return $ case r of
Yield x s -> Skip (Left (s, fstep acc x))
Skip s -> Skip (Left (s, acc))
Stop -> Skip (Right acc)
step' gst (Right (Stream stp stt)) = do
r <- stp (adaptState gst) stt
return $ case r of
Yield x s -> Yield x (Right (Stream stp s))
Skip s -> Skip (Right (Stream stp s))
Stop -> Stop
{-# INLINE splitAt #-}
splitAt :: Int -> [a] -> ([a],[a])
splitAt n ls
| n <= 0 = ([], ls)
| otherwise = splitAt' n ls
where
splitAt' :: Int -> [a] -> ([a], [a])
splitAt' _ [] = ([], [])
splitAt' 1 (x:xs) = ([x], xs)
splitAt' m (x:xs) = (x:xs', xs'')
where
(xs', xs'') = splitAt' (m - 1) xs
{-# INLINE_NORMAL parselMx' #-}
parselMx'
:: MonadThrow m
=> (s -> a -> m (PR.Step s b))
-> m s
-> (s -> m b)
-> Stream m a
-> m b
parselMx' pstep initial extract (Stream step state) = do
initial >>= go SPEC state []
where
{-# INLINE go #-}
go !_ st buf !pst = do
r <- step defState st
case r of
Yield x s -> do
pRes <- pstep pst x
case pRes of
PR.Yield n pst1 -> do
assert (n <= length (x:buf)) (return ())
go SPEC s (Prelude.take n (x:buf)) pst1
PR.Skip 0 pst1 -> go SPEC s (x:buf) pst1
PR.Skip n pst1 -> do
assert (n <= length (x:buf)) (return ())
let (src0, buf1) = splitAt n (x:buf)
src = Prelude.reverse src0
gobuf SPEC s buf1 src pst1
PR.Stop _ b -> return b
PR.Error err -> throwM $ ParseError err
Skip s -> go SPEC s buf pst
Stop -> extract pst
gobuf !_ s buf [] !pst = go SPEC s buf pst
gobuf !_ s buf (x:xs) !pst = do
pRes <- pstep pst x
case pRes of
PR.Yield n pst1 -> do
assert (n <= length (x:buf)) (return ())
gobuf SPEC s (Prelude.take n (x:buf)) xs pst1
PR.Skip 0 pst1 -> gobuf SPEC s (x:buf) xs pst1
PR.Skip n pst1 -> do
assert (n <= length (x:buf)) (return ())
let (src0, buf1) = splitAt n (x:buf)
src = Prelude.reverse src0 ++ xs
gobuf SPEC s buf1 src pst1
PR.Stop _ b -> return b
PR.Error err -> throwM $ ParseError err
{-# ANN type ParseChunksState Fuse #-}
data ParseChunksState x inpBuf st pst =
ParseChunksInit inpBuf st
| ParseChunksInitLeftOver inpBuf
| ParseChunksStream st inpBuf pst
| ParseChunksBuf inpBuf st inpBuf pst
| ParseChunksYield x (ParseChunksState x inpBuf st pst)
{-# INLINE_NORMAL splitParse #-}
splitParse
:: MonadThrow m
=> Parser m a b
-> Stream m a
-> Stream m b
splitParse (Parser pstep initial extract) (Stream step state) =
Stream stepOuter (ParseChunksInit [] state)
where
{-# INLINE_LATE stepOuter #-}
stepOuter _ (ParseChunksInit [] st) = do
initial >>= return . Skip . ParseChunksStream st []
stepOuter _ (ParseChunksInit src st) = do
initial >>= return . Skip . ParseChunksBuf src st []
stepOuter _ (ParseChunksInitLeftOver _) = return Stop
stepOuter gst (ParseChunksStream st buf pst) = do
r <- step (adaptState gst) st
case r of
Yield x s -> do
pRes <- pstep pst x
case pRes of
PR.Yield n pst1 -> do
assert (n <= length (x:buf)) (return ())
let buf1 = Prelude.take n (x:buf)
return $ Skip $ ParseChunksStream s buf1 pst1
PR.Skip n pst1 -> do
assert (n <= length (x:buf)) (return ())
let (src0, buf1) = splitAt n (x:buf)
src = Prelude.reverse src0
return $ Skip $ ParseChunksBuf src s buf1 pst1
PR.Stop n b -> do
assert (n <= length (x:buf)) (return ())
let src = Prelude.reverse (Prelude.take n (x:buf))
return $ Skip $
ParseChunksYield b (ParseChunksInit src s)
PR.Error err -> throwM $ ParseError err
Skip s -> return $ Skip $ ParseChunksStream s buf pst
Stop -> do
b <- extract pst
let src = Prelude.reverse buf
return $ Skip $
ParseChunksYield b (ParseChunksInitLeftOver src)
stepOuter _ (ParseChunksBuf [] s buf pst) =
return $ Skip $ ParseChunksStream s buf pst
stepOuter _ (ParseChunksBuf (x:xs) s buf pst) = do
pRes <- pstep pst x
case pRes of
PR.Yield n pst1 -> do
assert (n <= length (x:buf)) (return ())
let buf1 = Prelude.take n (x:buf)
return $ Skip $ ParseChunksBuf xs s buf1 pst1
PR.Skip n pst1 -> do
assert (n <= length (x:buf)) (return ())
let (src0, buf1) = splitAt n (x:buf)
src = Prelude.reverse src0 ++ xs
return $ Skip $ ParseChunksBuf src s buf1 pst1
PR.Stop n b -> do
assert (n <= length (x:buf)) (return ())
let src = Prelude.reverse (Prelude.take n (x:buf)) ++ xs
return $ Skip $ ParseChunksYield b (ParseChunksInit src s)
PR.Error err -> throwM $ ParseError err
stepOuter _ (ParseChunksYield a next) = return $ Yield a next
{-# INLINE_LATE drain #-}
drain :: Monad m => Stream m a -> m ()
drain (Stream step state) = go SPEC state
where
go !_ st = do
r <- step defState st
case r of
Yield _ s -> go SPEC s
Skip s -> go SPEC s
Stop -> return ()
{-# INLINE_NORMAL null #-}
null :: Monad m => Stream m a -> m Bool
null m = foldrM (\_ _ -> return False) (return True) m
{-# INLINE_NORMAL head #-}
head :: Monad m => Stream m a -> m (Maybe a)
head m = foldrM (\x _ -> return (Just x)) (return Nothing) m
{-# INLINE_NORMAL headElse #-}
headElse :: Monad m => a -> Stream m a -> m a
headElse a m = foldrM (\x _ -> return x) (return a) m
{-# INLINE_NORMAL tail #-}
tail :: Monad m => Stream m a -> m (Maybe (Stream m a))
tail (UnStream step state) = go state
where
go st = do
r <- step defState st
case r of
Yield _ s -> return (Just $ Stream step s)
Skip s -> go s
Stop -> return Nothing
{-# INLINE_NORMAL last #-}
last :: Monad m => Stream m a -> m (Maybe a)
last = foldl' (\_ y -> Just y) Nothing
{-# INLINE_NORMAL elem #-}
elem :: (Monad m, Eq a) => a -> Stream m a -> m Bool
elem e (Stream step state) = go state
where
go st = do
r <- step defState st
case r of
Yield x s
| x == e -> return True
| otherwise -> go s
Skip s -> go s
Stop -> return False
{-# INLINE_NORMAL notElem #-}
notElem :: (Monad m, Eq a) => a -> Stream m a -> m Bool
notElem e s = fmap not (elem e s)
{-# INLINE_NORMAL all #-}
all :: Monad m => (a -> Bool) -> Stream m a -> m Bool
all p (Stream step state) = go state
where
go st = do
r <- step defState st
case r of
Yield x s
| p x -> go s
| otherwise -> return False
Skip s -> go s
Stop -> return True
{-# INLINE_NORMAL any #-}
any :: Monad m => (a -> Bool) -> Stream m a -> m Bool
any p (Stream step state) = go state
where
go st = do
r <- step defState st
case r of
Yield x s
| p x -> return True
| otherwise -> go s
Skip s -> go s
Stop -> return False
{-# INLINE_NORMAL maximum #-}
maximum :: (Monad m, Ord a) => Stream m a -> m (Maybe a)
maximum (Stream step state) = go Nothing state
where
go Nothing st = do
r <- step defState st
case r of
Yield x s -> go (Just x) s
Skip s -> go Nothing s
Stop -> return Nothing
go (Just acc) st = do
r <- step defState st
case r of
Yield x s
| acc <= x -> go (Just x) s
| otherwise -> go (Just acc) s
Skip s -> go (Just acc) s
Stop -> return (Just acc)
{-# INLINE_NORMAL maximumBy #-}
maximumBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> m (Maybe a)
maximumBy cmp (Stream step state) = go Nothing state
where
go Nothing st = do
r <- step defState st
case r of
Yield x s -> go (Just x) s
Skip s -> go Nothing s
Stop -> return Nothing
go (Just acc) st = do
r <- step defState st
case r of
Yield x s -> case cmp acc x of
GT -> go (Just acc) s
_ -> go (Just x) s
Skip s -> go (Just acc) s
Stop -> return (Just acc)
{-# INLINE_NORMAL minimum #-}
minimum :: (Monad m, Ord a) => Stream m a -> m (Maybe a)
minimum (Stream step state) = go Nothing state
where
go Nothing st = do
r <- step defState st
case r of
Yield x s -> go (Just x) s
Skip s -> go Nothing s
Stop -> return Nothing
go (Just acc) st = do
r <- step defState st
case r of
Yield x s
| acc <= x -> go (Just acc) s
| otherwise -> go (Just x) s
Skip s -> go (Just acc) s
Stop -> return (Just acc)
{-# INLINE_NORMAL minimumBy #-}
minimumBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> m (Maybe a)
minimumBy cmp (Stream step state) = go Nothing state
where
go Nothing st = do
r <- step defState st
case r of
Yield x s -> go (Just x) s
Skip s -> go Nothing s
Stop -> return Nothing
go (Just acc) st = do
r <- step defState st
case r of
Yield x s -> case cmp acc x of
GT -> go (Just x) s
_ -> go (Just acc) s
Skip s -> go (Just acc) s
Stop -> return (Just acc)
{-# INLINE_NORMAL (!!) #-}
(!!) :: (Monad m) => Stream m a -> Int -> m (Maybe a)
(Stream step state) !! i = go i state
where
go n st = do
r <- step defState st
case r of
Yield x s | n < 0 -> return Nothing
| n == 0 -> return $ Just x
| otherwise -> go (n - 1) s
Skip s -> go n s
Stop -> return Nothing
{-# INLINE_NORMAL lookup #-}
lookup :: (Monad m, Eq a) => a -> Stream m (a, b) -> m (Maybe b)
lookup e m = foldrM (\(a, b) xs -> if e == a then return (Just b) else xs)
(return Nothing) m
{-# INLINE_NORMAL findM #-}
findM :: Monad m => (a -> m Bool) -> Stream m a -> m (Maybe a)
findM p m = foldrM (\x xs -> p x >>= \r -> if r then return (Just x) else xs)
(return Nothing) m
{-# INLINE find #-}
find :: Monad m => (a -> Bool) -> Stream m a -> m (Maybe a)
find p = findM (return . p)
{-# INLINE_NORMAL findIndices #-}
findIndices :: Monad m => (a -> Bool) -> Stream m a -> Stream m Int
findIndices p (Stream step state) = Stream step' (state, 0)
where
{-# INLINE_LATE step' #-}
step' gst (st, i) = i `seq` do
r <- step (adaptState gst) st
return $ case r of
Yield x s -> if p x then Yield i (s, i+1) else Skip (s, i+1)
Skip s -> Skip (s, i)
Stop -> Stop
{-# INLINE toListRev #-}
toListRev :: Monad m => Stream m a -> m [a]
toListRev = foldl' (flip (:)) []
{-# INLINE_NORMAL reverse #-}
reverse :: Monad m => Stream m a -> Stream m a
reverse m = Stream step Nothing
where
{-# INLINE_LATE step #-}
step _ Nothing = do
xs <- toListRev m
return $ Skip (Just xs)
step _ (Just (x:xs)) = return $ Yield x (Just xs)
step _ (Just []) = return Stop
{-# INLINE_NORMAL reverse' #-}
reverse' :: forall m a. (MonadIO m, Storable a) => Stream m a -> Stream m a
reverse' m =
A.flattenArraysRev
$ fromStreamK
$ K.reverse
$ toStreamK
$ A.fromStreamDArraysOf A.defaultChunkSize m
{-# INLINE_NORMAL splitSuffixBy' #-}
splitSuffixBy' :: Monad m
=> (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
splitSuffixBy' predicate f (Stream step state) =
Stream (stepOuter f) (Just state)
where
{-# INLINE_LATE stepOuter #-}
stepOuter (Fold fstep initial done) gst (Just st) = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
acc <- initial
acc' <- fstep acc x
if (predicate x)
then done acc' >>= \val -> return $ Yield val (Just s)
else go SPEC s acc'
Skip s -> return $ Skip $ Just s
Stop -> return Stop
where
go !_ stt !acc = do
res <- step (adaptState gst) stt
case res of
Yield x s -> do
acc' <- fstep acc x
if (predicate x)
then done acc' >>= \val -> return $ Yield val (Just s)
else go SPEC s acc'
Skip s -> go SPEC s acc
Stop -> done acc >>= \val -> return $ Yield val Nothing
stepOuter _ _ Nothing = return Stop
{-# INLINE_NORMAL groupsBy #-}
groupsBy :: Monad m
=> (a -> a -> Bool)
-> Fold m a b
-> Stream m a
-> Stream m b
groupsBy cmp f (Stream step state) = Stream (stepOuter f) (Just state, Nothing)
where
{-# INLINE_LATE stepOuter #-}
stepOuter (Fold fstep initial done) gst (Just st, Nothing) = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
acc <- initial
acc' <- fstep acc x
go SPEC x s acc'
Skip s -> return $ Skip $ (Just s, Nothing)
Stop -> return Stop
where
go !_ prev stt !acc = do
res <- step (adaptState gst) stt
case res of
Yield x s -> do
if cmp x prev
then do
acc' <- fstep acc x
go SPEC prev s acc'
else done acc >>= \r -> return $ Yield r (Just s, Just x)
Skip s -> go SPEC prev s acc
Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing)
stepOuter (Fold fstep initial done) gst (Just st, Just prev) = do
acc <- initial
acc' <- fstep acc prev
go SPEC st acc'
where
go !_ stt !acc = do
res <- step (adaptState gst) stt
case res of
Yield x s -> do
if cmp x prev
then do
acc' <- fstep acc x
go SPEC s acc'
else done acc >>= \r -> return $ Yield r (Just s, Just x)
Skip s -> go SPEC s acc
Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing)
stepOuter _ _ (Nothing,_) = return Stop
{-# INLINE_NORMAL groupsRollingBy #-}
groupsRollingBy :: Monad m
=> (a -> a -> Bool)
-> Fold m a b
-> Stream m a
-> Stream m b
groupsRollingBy cmp f (Stream step state) =
Stream (stepOuter f) (Just state, Nothing)
where
{-# INLINE_LATE stepOuter #-}
stepOuter (Fold fstep initial done) gst (Just st, Nothing) = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
acc <- initial
acc' <- fstep acc x
go SPEC x s acc'
Skip s -> return $ Skip $ (Just s, Nothing)
Stop -> return Stop
where
go !_ prev stt !acc = do
res <- step (adaptState gst) stt
case res of
Yield x s -> do
if cmp prev x
then do
acc' <- fstep acc x
go SPEC x s acc'
else
done acc >>= \r -> return $ Yield r (Just s, Just x)
Skip s -> go SPEC prev s acc
Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing)
stepOuter (Fold fstep initial done) gst (Just st, Just prev') = do
acc <- initial
acc' <- fstep acc prev'
go SPEC prev' st acc'
where
go !_ prevv stt !acc = do
res <- step (adaptState gst) stt
case res of
Yield x s -> do
if cmp prevv x
then do
acc' <- fstep acc x
go SPEC x s acc'
else done acc >>= \r -> return $ Yield r (Just s, Just x)
Skip s -> go SPEC prevv s acc
Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing)
stepOuter _ _ (Nothing, _) = return Stop
{-# INLINE_NORMAL splitBy #-}
splitBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
splitBy predicate f (Stream step state) = Stream (step' f) (Just state)
where
{-# INLINE_LATE step' #-}
step' (Fold fstep initial done) gst (Just st) = initial >>= go SPEC st
where
go !_ stt !acc = do
res <- step (adaptState gst) stt
case res of
Yield x s -> do
if predicate x
then done acc >>= \r -> return $ Yield r (Just s)
else do
acc' <- fstep acc x
go SPEC s acc'
Skip s -> go SPEC s acc
Stop -> done acc >>= \r -> return $ Yield r Nothing
step' _ _ Nothing = return Stop
{-# INLINE_NORMAL splitSuffixBy #-}
splitSuffixBy :: Monad m
=> (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
splitSuffixBy predicate f (Stream step state) = Stream (step' f) (Just state)
where
{-# INLINE_LATE step' #-}
step' (Fold fstep initial done) gst (Just st) = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
acc <- initial
if predicate x
then done acc >>= \val -> return $ Yield val (Just s)
else do
acc' <- fstep acc x
go SPEC s acc'
Skip s -> return $ Skip $ Just s
Stop -> return Stop
where
go !_ stt !acc = do
res <- step (adaptState gst) stt
case res of
Yield x s -> do
if predicate x
then done acc >>= \r -> return $ Yield r (Just s)
else do
acc' <- fstep acc x
go SPEC s acc'
Skip s -> go SPEC s acc
Stop -> done acc >>= \r -> return $ Yield r Nothing
step' _ _ Nothing = return Stop
{-# INLINE_NORMAL wordsBy #-}
wordsBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
wordsBy predicate f (Stream step state) = Stream (stepOuter f) (Just state)
where
{-# INLINE_LATE stepOuter #-}
stepOuter (Fold fstep initial done) gst (Just st) = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
if predicate x
then return $ Skip (Just s)
else do
acc <- initial
acc' <- fstep acc x
go SPEC s acc'
Skip s -> return $ Skip $ Just s
Stop -> return Stop
where
go !_ stt !acc = do
res <- step (adaptState gst) stt
case res of
Yield x s -> do
if predicate x
then done acc >>= \r -> return $ Yield r (Just s)
else do
acc' <- fstep acc x
go SPEC s acc'
Skip s -> go SPEC s acc
Stop -> done acc >>= \r -> return $ Yield r Nothing
stepOuter _ _ Nothing = return Stop
data SplitOnState s a =
GO_START
| GO_EMPTY_PAT s
| GO_SINGLE_PAT s a
| GO_SHORT_PAT s
| GO_KARP_RABIN s !(RB.Ring a) !(Ptr a)
| GO_DONE
{-# INLINE_NORMAL splitOn #-}
splitOn
:: forall m a b. (MonadIO m, Storable a, Enum a, Eq a)
=> Array a
-> Fold m a b
-> Stream m a
-> Stream m b
splitOn patArr (Fold fstep initial done) (Stream step state) =
Stream stepOuter GO_START
where
patLen = A.length patArr
maxIndex = patLen - 1
elemBits = sizeOf (undefined :: a) * 8
{-# INLINE_LATE stepOuter #-}
stepOuter _ GO_START =
if patLen == 0
then return $ Skip $ GO_EMPTY_PAT state
else if patLen == 1
then do
r <- liftIO $ (A.unsafeIndexIO patArr 0)
return $ Skip $ GO_SINGLE_PAT state r
else if sizeOf (undefined :: a) * patLen
<= sizeOf (undefined :: Word)
then return $ Skip $ GO_SHORT_PAT state
else do
(rb, rhead) <- liftIO $ RB.new patLen
return $ Skip $ GO_KARP_RABIN state rb rhead
stepOuter gst (GO_SINGLE_PAT stt pat) = initial >>= go SPEC stt
where
go !_ st !acc = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
if pat == x
then do
r <- done acc
return $ Yield r (GO_SINGLE_PAT s pat)
else fstep acc x >>= go SPEC s
Skip s -> go SPEC s acc
Stop -> done acc >>= \r -> return $ Yield r GO_DONE
stepOuter gst (GO_SHORT_PAT stt) = initial >>= go0 SPEC 0 (0 :: Word) stt
where
mask :: Word
mask = (1 `shiftL` (elemBits * patLen)) - 1
addToWord wrd a = (wrd `shiftL` elemBits) .|. fromIntegral (fromEnum a)
patWord :: Word
patWord = mask .&. A.foldl' addToWord 0 patArr
go0 !_ !idx wrd st !acc = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
let wrd' = addToWord wrd x
if idx == maxIndex
then do
if wrd' .&. mask == patWord
then do
r <- done acc
return $ Yield r (GO_SHORT_PAT s)
else go1 SPEC wrd' s acc
else go0 SPEC (idx + 1) wrd' s acc
Skip s -> go0 SPEC idx wrd s acc
Stop -> do
acc' <- if idx /= 0
then go2 wrd idx acc
else return acc
done acc' >>= \r -> return $ Yield r GO_DONE
{-# INLINE go1 #-}
go1 !_ wrd st !acc = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
let wrd' = addToWord wrd x
old = (mask .&. wrd) `shiftR` (elemBits * (patLen - 1))
acc' <- fstep acc (toEnum $ fromIntegral old)
if wrd' .&. mask == patWord
then done acc' >>= \r -> return $ Yield r (GO_SHORT_PAT s)
else go1 SPEC wrd' s acc'
Skip s -> go1 SPEC wrd s acc
Stop -> do
acc' <- go2 wrd patLen acc
done acc' >>= \r -> return $ Yield r GO_DONE
go2 !wrd !n !acc | n > 0 = do
let old = (mask .&. wrd) `shiftR` (elemBits * (n - 1))
fstep acc (toEnum $ fromIntegral old) >>= go2 wrd (n - 1)
go2 _ _ acc = return acc
stepOuter gst (GO_KARP_RABIN stt rb rhead) = do
initial >>= go0 SPEC 0 rhead stt
where
k = 2891336453 :: Word32
coeff = k ^ patLen
addCksum cksum a = cksum * k + fromIntegral (fromEnum a)
deltaCksum cksum old new =
addCksum cksum new - coeff * fromIntegral (fromEnum old)
patHash = A.foldl' addCksum 0 patArr
go0 !_ !idx !rh st !acc = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
rh' <- liftIO $ RB.unsafeInsert rb rh x
if idx == maxIndex
then do
let fold = RB.unsafeFoldRing (RB.ringBound rb)
let !ringHash = fold addCksum 0 rb
if ringHash == patHash
then go2 SPEC ringHash rh' s acc
else go1 SPEC ringHash rh' s acc
else go0 SPEC (idx + 1) rh' s acc
Skip s -> go0 SPEC idx rh s acc
Stop -> do
!acc' <- if idx /= 0
then RB.unsafeFoldRingM rh fstep acc rb
else return acc
done acc' >>= \r -> return $ Yield r GO_DONE
{-# INLINE go1 #-}
go1 !_ !cksum !rh st !acc = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
old <- liftIO $ peek rh
let cksum' = deltaCksum cksum old x
acc' <- fstep acc old
if (cksum' == patHash)
then do
rh' <- liftIO (RB.unsafeInsert rb rh x)
go2 SPEC cksum' rh' s acc'
else do
rh' <- liftIO (RB.unsafeInsert rb rh x)
go1 SPEC cksum' rh' s acc'
Skip s -> go1 SPEC cksum rh s acc
Stop -> do
acc' <- RB.unsafeFoldRingFullM rh fstep acc rb
done acc' >>= \r -> return $ Yield r GO_DONE
go2 !_ !cksum' !rh' s !acc' = do
if RB.unsafeEqArray rb rh' patArr
then do
r <- done acc'
return $ Yield r (GO_KARP_RABIN s rb rhead)
else go1 SPEC cksum' rh' s acc'
stepOuter gst (GO_EMPTY_PAT st) = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
acc <- initial
acc' <- fstep acc x
done acc' >>= \r -> return $ Yield r (GO_EMPTY_PAT s)
Skip s -> return $ Skip (GO_EMPTY_PAT s)
Stop -> return Stop
stepOuter _ GO_DONE = return Stop
{-# INLINE_NORMAL splitSuffixOn #-}
splitSuffixOn
:: forall m a b. (MonadIO m, Storable a, Enum a, Eq a)
=> Bool
-> Array a
-> Fold m a b
-> Stream m a
-> Stream m b
splitSuffixOn withSep patArr (Fold fstep initial done)
(Stream step state) =
Stream stepOuter GO_START
where
patLen = A.length patArr
maxIndex = patLen - 1
elemBits = sizeOf (undefined :: a) * 8
{-# INLINE_LATE stepOuter #-}
stepOuter _ GO_START =
if patLen == 0
then return $ Skip $ GO_EMPTY_PAT state
else if patLen == 1
then do
r <- liftIO $ (A.unsafeIndexIO patArr 0)
return $ Skip $ GO_SINGLE_PAT state r
else if sizeOf (undefined :: a) * patLen
<= sizeOf (undefined :: Word)
then return $ Skip $ GO_SHORT_PAT state
else do
(rb, rhead) <- liftIO $ RB.new patLen
return $ Skip $ GO_KARP_RABIN state rb rhead
stepOuter gst (GO_SINGLE_PAT stt pat) = do
res <- step (adaptState gst) stt
case res of
Yield x s -> do
acc <- initial
if pat == x
then do
acc' <- if withSep then fstep acc x else return acc
done acc' >>= \r -> return $ Yield r (GO_SINGLE_PAT s pat)
else fstep acc x >>= go SPEC s
Skip s -> return $ Skip $ (GO_SINGLE_PAT s pat)
Stop -> return Stop
where
go !_ st !acc = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
if pat == x
then do
acc' <- if withSep then fstep acc x else return acc
r <- done acc'
return $ Yield r (GO_SINGLE_PAT s pat)
else fstep acc x >>= go SPEC s
Skip s -> go SPEC s acc
Stop -> done acc >>= \r -> return $ Yield r GO_DONE
stepOuter gst (GO_SHORT_PAT stt) = do
let idx = 0
let wrd = 0
res <- step (adaptState gst) stt
case res of
Yield x s -> do
acc <- initial
let wrd' = addToWord wrd x
acc' <- if withSep then fstep acc x else return acc
if idx == maxIndex
then do
if wrd' .&. mask == patWord
then done acc' >>= \r -> return $ Yield r (GO_SHORT_PAT s)
else go0 SPEC (idx + 1) wrd' s acc'
else go0 SPEC (idx + 1) wrd' s acc'
Skip s -> return $ Skip (GO_SHORT_PAT s)
Stop -> return Stop
where
mask :: Word
mask = (1 `shiftL` (elemBits * patLen)) - 1
addToWord wrd a = (wrd `shiftL` elemBits) .|. fromIntegral (fromEnum a)
patWord :: Word
patWord = mask .&. A.foldl' addToWord 0 patArr
go0 !_ !idx wrd st !acc = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
let wrd' = addToWord wrd x
acc' <- if withSep then fstep acc x else return acc
if idx == maxIndex
then do
if wrd' .&. mask == patWord
then do
r <- done acc'
return $ Yield r (GO_SHORT_PAT s)
else go1 SPEC wrd' s acc'
else go0 SPEC (idx + 1) wrd' s acc'
Skip s -> go0 SPEC idx wrd s acc
Stop -> do
if (idx == maxIndex) && (wrd .&. mask == patWord)
then return Stop
else do
acc' <- if idx /= 0 && not withSep
then go2 wrd idx acc
else return acc
done acc' >>= \r -> return $ Yield r GO_DONE
{-# INLINE go1 #-}
go1 !_ wrd st !acc = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
let wrd' = addToWord wrd x
old = (mask .&. wrd) `shiftR` (elemBits * (patLen - 1))
acc' <- if withSep
then fstep acc x
else fstep acc (toEnum $ fromIntegral old)
if wrd' .&. mask == patWord
then done acc' >>= \r -> return $ Yield r (GO_SHORT_PAT s)
else go1 SPEC wrd' s acc'
Skip s -> go1 SPEC wrd s acc
Stop ->
if wrd .&. mask == patWord
then return Stop
else do
acc' <- if withSep
then return acc
else go2 wrd patLen acc
done acc' >>= \r -> return $ Yield r GO_DONE
go2 !wrd !n !acc | n > 0 = do
let old = (mask .&. wrd) `shiftR` (elemBits * (n - 1))
fstep acc (toEnum $ fromIntegral old) >>= go2 wrd (n - 1)
go2 _ _ acc = return acc
stepOuter gst (GO_KARP_RABIN stt rb rhead) = do
let idx = 0
res <- step (adaptState gst) stt
case res of
Yield x s -> do
acc <- initial
acc' <- if withSep then fstep acc x else return acc
rh' <- liftIO (RB.unsafeInsert rb rhead x)
if idx == maxIndex
then do
let fold = RB.unsafeFoldRing (RB.ringBound rb)
let !ringHash = fold addCksum 0 rb
if ringHash == patHash
then go2 SPEC ringHash rh' s acc'
else go0 SPEC (idx + 1) rh' s acc'
else go0 SPEC (idx + 1) rh' s acc'
Skip s -> return $ Skip (GO_KARP_RABIN s rb rhead)
Stop -> return Stop
where
k = 2891336453 :: Word32
coeff = k ^ patLen
addCksum cksum a = cksum * k + fromIntegral (fromEnum a)
deltaCksum cksum old new =
addCksum cksum new - coeff * fromIntegral (fromEnum old)
patHash = A.foldl' addCksum 0 patArr
go0 !_ !idx !rh st !acc = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
acc' <- if withSep then fstep acc x else return acc
rh' <- liftIO (RB.unsafeInsert rb rh x)
if idx == maxIndex
then do
let fold = RB.unsafeFoldRing (RB.ringBound rb)
let !ringHash = fold addCksum 0 rb
if ringHash == patHash
then go2 SPEC ringHash rh' s acc'
else go1 SPEC ringHash rh' s acc'
else go0 SPEC (idx + 1) rh' s acc'
Skip s -> go0 SPEC idx rh s acc
Stop -> do
if (idx == maxIndex) && RB.unsafeEqArray rb rh patArr
then return Stop
else do
!acc' <- if idx /= 0 && not withSep
then RB.unsafeFoldRingM rh fstep acc rb
else return acc
done acc' >>= \r -> return $ Yield r GO_DONE
{-# INLINE go1 #-}
go1 !_ !cksum !rh st !acc = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
old <- liftIO $ peek rh
let cksum' = deltaCksum cksum old x
acc' <- if withSep
then fstep acc x
else fstep acc old
if (cksum' == patHash)
then do
rh' <- liftIO (RB.unsafeInsert rb rh x)
go2 SPEC cksum' rh' s acc'
else do
rh' <- liftIO (RB.unsafeInsert rb rh x)
go1 SPEC cksum' rh' s acc'
Skip s -> go1 SPEC cksum rh s acc
Stop -> do
if RB.unsafeEqArray rb rh patArr
then return Stop
else do
acc' <- if withSep
then return acc
else RB.unsafeFoldRingFullM rh fstep acc rb
done acc' >>= \r -> return $ Yield r GO_DONE
go2 !_ !cksum' !rh' s !acc' = do
if RB.unsafeEqArray rb rh' patArr
then do
r <- done acc'
return $ Yield r (GO_KARP_RABIN s rb rhead)
else go1 SPEC cksum' rh' s acc'
stepOuter gst (GO_EMPTY_PAT st) = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
acc <- initial
acc' <- fstep acc x
done acc' >>= \r -> return $ Yield r (GO_EMPTY_PAT s)
Skip s -> return $ Skip (GO_EMPTY_PAT s)
Stop -> return Stop
stepOuter _ GO_DONE = return Stop
data SplitState s arr
= SplitInitial s
| SplitBuffering s arr
| SplitSplitting s arr
| SplitYielding arr (SplitState s arr)
| SplitFinishing
{-# INLINE_NORMAL splitInnerBy #-}
splitInnerBy
:: Monad m
=> (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a))
-> Stream m (f a)
-> Stream m (f a)
splitInnerBy splitter joiner (Stream step1 state1) =
(Stream step (SplitInitial state1))
where
{-# INLINE_LATE step #-}
step gst (SplitInitial st) = do
r <- step1 gst st
case r of
Yield x s -> do
(x1, mx2) <- splitter x
return $ case mx2 of
Nothing -> Skip (SplitBuffering s x1)
Just x2 -> Skip (SplitYielding x1 (SplitSplitting s x2))
Skip s -> return $ Skip (SplitInitial s)
Stop -> return $ Stop
step gst (SplitBuffering st buf) = do
r <- step1 gst st
case r of
Yield x s -> do
(x1, mx2) <- splitter x
buf' <- joiner buf x1
return $ case mx2 of
Nothing -> Skip (SplitBuffering s buf')
Just x2 -> Skip (SplitYielding buf' (SplitSplitting s x2))
Skip s -> return $ Skip (SplitBuffering s buf)
Stop -> return $ Skip (SplitYielding buf SplitFinishing)
step _ (SplitSplitting st buf) = do
(x1, mx2) <- splitter buf
return $ case mx2 of
Nothing -> Skip $ SplitBuffering st x1
Just x2 -> Skip $ SplitYielding x1 (SplitSplitting st x2)
step _ (SplitYielding x next) = return $ Yield x next
step _ SplitFinishing = return $ Stop
{-# INLINE_NORMAL splitInnerBySuffix #-}
splitInnerBySuffix
:: (Monad m, Eq (f a), Monoid (f a))
=> (f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a))
-> Stream m (f a)
-> Stream m (f a)
splitInnerBySuffix splitter joiner (Stream step1 state1) =
(Stream step (SplitInitial state1))
where
{-# INLINE_LATE step #-}
step gst (SplitInitial st) = do
r <- step1 gst st
case r of
Yield x s -> do
(x1, mx2) <- splitter x
return $ case mx2 of
Nothing -> Skip (SplitBuffering s x1)
Just x2 -> Skip (SplitYielding x1 (SplitSplitting s x2))
Skip s -> return $ Skip (SplitInitial s)
Stop -> return $ Stop
step gst (SplitBuffering st buf) = do
r <- step1 gst st
case r of
Yield x s -> do
(x1, mx2) <- splitter x
buf' <- joiner buf x1
return $ case mx2 of
Nothing -> Skip (SplitBuffering s buf')
Just x2 -> Skip (SplitYielding buf' (SplitSplitting s x2))
Skip s -> return $ Skip (SplitBuffering s buf)
Stop -> return $
if buf == mempty
then Stop
else Skip (SplitYielding buf SplitFinishing)
step _ (SplitSplitting st buf) = do
(x1, mx2) <- splitter buf
return $ case mx2 of
Nothing -> Skip $ SplitBuffering st x1
Just x2 -> Skip $ SplitYielding x1 (SplitSplitting st x2)
step _ (SplitYielding x next) = return $ Yield x next
step _ SplitFinishing = return $ Stop
{-# INLINE_NORMAL isPrefixOf #-}
isPrefixOf :: (Eq a, Monad m) => Stream m a -> Stream m a -> m Bool
isPrefixOf (Stream stepa ta) (Stream stepb tb) = go (ta, tb, Nothing)
where
go (sa, sb, Nothing) = do
r <- stepa defState sa
case r of
Yield x sa' -> go (sa', sb, Just x)
Skip sa' -> go (sa', sb, Nothing)
Stop -> return True
go (sa, sb, Just x) = do
r <- stepb defState sb
case r of
Yield y sb' ->
if x == y
then go (sa, sb', Nothing)
else return False
Skip sb' -> go (sa, sb', Just x)
Stop -> return False
{-# INLINE_NORMAL isSubsequenceOf #-}
isSubsequenceOf :: (Eq a, Monad m) => Stream m a -> Stream m a -> m Bool
isSubsequenceOf (Stream stepa ta) (Stream stepb tb) = go (ta, tb, Nothing)
where
go (sa, sb, Nothing) = do
r <- stepa defState sa
case r of
Yield x sa' -> go (sa', sb, Just x)
Skip sa' -> go (sa', sb, Nothing)
Stop -> return True
go (sa, sb, Just x) = do
r <- stepb defState sb
case r of
Yield y sb' ->
if x == y
then go (sa, sb', Nothing)
else go (sa, sb', Just x)
Skip sb' -> go (sa, sb', Just x)
Stop -> return False
{-# INLINE_NORMAL stripPrefix #-}
stripPrefix
:: (Eq a, Monad m)
=> Stream m a -> Stream m a -> m (Maybe (Stream m a))
stripPrefix (Stream stepa ta) (Stream stepb tb) = go (ta, tb, Nothing)
where
go (sa, sb, Nothing) = do
r <- stepa defState sa
case r of
Yield x sa' -> go (sa', sb, Just x)
Skip sa' -> go (sa', sb, Nothing)
Stop -> return $ Just (Stream stepb sb)
go (sa, sb, Just x) = do
r <- stepb defState sb
case r of
Yield y sb' ->
if x == y
then go (sa, sb', Nothing)
else return Nothing
Skip sb' -> go (sa, sb', Just x)
Stop -> return Nothing
{-# INLINE_NORMAL mapM_ #-}
mapM_ :: Monad m => (a -> m b) -> Stream m a -> m ()
mapM_ m = drain . mapM m
data ConcatMapUState o i =
ConcatMapUOuter o
| ConcatMapUInner o i
{-# INLINE_NORMAL concatMapU #-}
concatMapU :: Monad m => Unfold m a b -> Stream m a -> Stream m b
concatMapU (Unfold istep inject) (Stream ostep ost) =
Stream step (ConcatMapUOuter ost)
where
{-# INLINE_LATE step #-}
step gst (ConcatMapUOuter o) = do
r <- ostep (adaptState gst) o
case r of
Yield a o' -> do
i <- inject a
i `seq` return (Skip (ConcatMapUInner o' i))
Skip o' -> return $ Skip (ConcatMapUOuter o')
Stop -> return $ Stop
step _ (ConcatMapUInner o i) = do
r <- istep i
return $ case r of
Yield x i' -> Yield x (ConcatMapUInner o i')
Skip i' -> Skip (ConcatMapUInner o i')
Stop -> Skip (ConcatMapUOuter o)
data ConcatUnfoldInterleaveState o i =
ConcatUnfoldInterleaveOuter o [i]
| ConcatUnfoldInterleaveInner o [i]
| ConcatUnfoldInterleaveInnerL [i] [i]
| ConcatUnfoldInterleaveInnerR [i] [i]
{-# INLINE_NORMAL concatUnfoldInterleave #-}
concatUnfoldInterleave :: Monad m => Unfold m a b -> Stream m a -> Stream m b
concatUnfoldInterleave (Unfold istep inject) (Stream ostep ost) =
Stream step (ConcatUnfoldInterleaveOuter ost [])
where
{-# INLINE_LATE step #-}
step gst (ConcatUnfoldInterleaveOuter o ls) = do
r <- ostep (adaptState gst) o
case r of
Yield a o' -> do
i <- inject a
i `seq` return (Skip (ConcatUnfoldInterleaveInner o' (i : ls)))
Skip o' -> return $ Skip (ConcatUnfoldInterleaveOuter o' ls)
Stop -> return $ Skip (ConcatUnfoldInterleaveInnerL ls [])
step _ (ConcatUnfoldInterleaveInner _ []) = undefined
step _ (ConcatUnfoldInterleaveInner o (st:ls)) = do
r <- istep st
return $ case r of
Yield x s -> Yield x (ConcatUnfoldInterleaveOuter o (s:ls))
Skip s -> Skip (ConcatUnfoldInterleaveInner o (s:ls))
Stop -> Skip (ConcatUnfoldInterleaveOuter o ls)
step _ (ConcatUnfoldInterleaveInnerL [] []) = return Stop
step _ (ConcatUnfoldInterleaveInnerL [] rs) =
return $ Skip (ConcatUnfoldInterleaveInnerR [] rs)
step _ (ConcatUnfoldInterleaveInnerL (st:ls) rs) = do
r <- istep st
return $ case r of
Yield x s -> Yield x (ConcatUnfoldInterleaveInnerL ls (s:rs))
Skip s -> Skip (ConcatUnfoldInterleaveInnerL (s:ls) rs)
Stop -> Skip (ConcatUnfoldInterleaveInnerL ls rs)
step _ (ConcatUnfoldInterleaveInnerR [] []) = return Stop
step _ (ConcatUnfoldInterleaveInnerR ls []) =
return $ Skip (ConcatUnfoldInterleaveInnerL ls [])
step _ (ConcatUnfoldInterleaveInnerR ls (st:rs)) = do
r <- istep st
return $ case r of
Yield x s -> Yield x (ConcatUnfoldInterleaveInnerR (s:ls) rs)
Skip s -> Skip (ConcatUnfoldInterleaveInnerR ls (s:rs))
Stop -> Skip (ConcatUnfoldInterleaveInnerR ls rs)
{-# INLINE_NORMAL concatUnfoldRoundrobin #-}
concatUnfoldRoundrobin :: Monad m => Unfold m a b -> Stream m a -> Stream m b
concatUnfoldRoundrobin (Unfold istep inject) (Stream ostep ost) =
Stream step (ConcatUnfoldInterleaveOuter ost [])
where
{-# INLINE_LATE step #-}
step gst (ConcatUnfoldInterleaveOuter o ls) = do
r <- ostep (adaptState gst) o
case r of
Yield a o' -> do
i <- inject a
i `seq` return (Skip (ConcatUnfoldInterleaveInner o' (i : ls)))
Skip o' -> return $ Skip (ConcatUnfoldInterleaveInner o' ls)
Stop -> return $ Skip (ConcatUnfoldInterleaveInnerL ls [])
step _ (ConcatUnfoldInterleaveInner o []) =
return $ Skip (ConcatUnfoldInterleaveOuter o [])
step _ (ConcatUnfoldInterleaveInner o (st:ls)) = do
r <- istep st
return $ case r of
Yield x s -> Yield x (ConcatUnfoldInterleaveOuter o (s:ls))
Skip s -> Skip (ConcatUnfoldInterleaveOuter o (s:ls))
Stop -> Skip (ConcatUnfoldInterleaveOuter o ls)
step _ (ConcatUnfoldInterleaveInnerL [] []) = return Stop
step _ (ConcatUnfoldInterleaveInnerL [] rs) =
return $ Skip (ConcatUnfoldInterleaveInnerR [] rs)
step _ (ConcatUnfoldInterleaveInnerL (st:ls) rs) = do
r <- istep st
return $ case r of
Yield x s -> Yield x (ConcatUnfoldInterleaveInnerL ls (s:rs))
Skip s -> Skip (ConcatUnfoldInterleaveInnerL ls (s:rs))
Stop -> Skip (ConcatUnfoldInterleaveInnerL ls rs)
step _ (ConcatUnfoldInterleaveInnerR [] []) = return Stop
step _ (ConcatUnfoldInterleaveInnerR ls []) =
return $ Skip (ConcatUnfoldInterleaveInnerL ls [])
step _ (ConcatUnfoldInterleaveInnerR ls (st:rs)) = do
r <- istep st
return $ case r of
Yield x s -> Yield x (ConcatUnfoldInterleaveInnerR (s:ls) rs)
Skip s -> Skip (ConcatUnfoldInterleaveInnerR (s:ls) rs)
Stop -> Skip (ConcatUnfoldInterleaveInnerR ls rs)
data AppendState s1 s2 = AppendFirst s1 | AppendSecond s2
{-# INLINE_NORMAL append #-}
append :: Monad m => Stream m a -> Stream m a -> Stream m a
append (Stream step1 state1) (Stream step2 state2) =
Stream step (AppendFirst state1)
where
{-# INLINE_LATE step #-}
step gst (AppendFirst st) = do
r <- step1 gst st
return $ case r of
Yield a s -> Yield a (AppendFirst s)
Skip s -> Skip (AppendFirst s)
Stop -> Skip (AppendSecond state2)
step gst (AppendSecond st) = do
r <- step2 gst st
return $ case r of
Yield a s -> Yield a (AppendSecond s)
Skip s -> Skip (AppendSecond s)
Stop -> Stop
data InterleaveState s1 s2 = InterleaveFirst s1 s2 | InterleaveSecond s1 s2
| InterleaveSecondOnly s2 | InterleaveFirstOnly s1
{-# INLINE_NORMAL interleave #-}
interleave :: Monad m => Stream m a -> Stream m a -> Stream m a
interleave (Stream step1 state1) (Stream step2 state2) =
Stream step (InterleaveFirst state1 state2)
where
{-# INLINE_LATE step #-}
step gst (InterleaveFirst st1 st2) = do
r <- step1 gst st1
return $ case r of
Yield a s -> Yield a (InterleaveSecond s st2)
Skip s -> Skip (InterleaveFirst s st2)
Stop -> Skip (InterleaveSecondOnly st2)
step gst (InterleaveSecond st1 st2) = do
r <- step2 gst st2
return $ case r of
Yield a s -> Yield a (InterleaveFirst st1 s)
Skip s -> Skip (InterleaveSecond st1 s)
Stop -> Skip (InterleaveFirstOnly st1)
step gst (InterleaveFirstOnly st1) = do
r <- step1 gst st1
return $ case r of
Yield a s -> Yield a (InterleaveFirstOnly s)
Skip s -> Skip (InterleaveFirstOnly s)
Stop -> Stop
step gst (InterleaveSecondOnly st2) = do
r <- step2 gst st2
return $ case r of
Yield a s -> Yield a (InterleaveSecondOnly s)
Skip s -> Skip (InterleaveSecondOnly s)
Stop -> Stop
{-# INLINE_NORMAL interleaveMin #-}
interleaveMin :: Monad m => Stream m a -> Stream m a -> Stream m a
interleaveMin (Stream step1 state1) (Stream step2 state2) =
Stream step (InterleaveFirst state1 state2)
where
{-# INLINE_LATE step #-}
step gst (InterleaveFirst st1 st2) = do
r <- step1 gst st1
return $ case r of
Yield a s -> Yield a (InterleaveSecond s st2)
Skip s -> Skip (InterleaveFirst s st2)
Stop -> Stop
step gst (InterleaveSecond st1 st2) = do
r <- step2 gst st2
return $ case r of
Yield a s -> Yield a (InterleaveFirst st1 s)
Skip s -> Skip (InterleaveSecond st1 s)
Stop -> Stop
step _ (InterleaveFirstOnly _) = undefined
step _ (InterleaveSecondOnly _) = undefined
{-# INLINE_NORMAL interleaveSuffix #-}
interleaveSuffix :: Monad m => Stream m a -> Stream m a -> Stream m a
interleaveSuffix (Stream step1 state1) (Stream step2 state2) =
Stream step (InterleaveFirst state1 state2)
where
{-# INLINE_LATE step #-}
step gst (InterleaveFirst st1 st2) = do
r <- step1 gst st1
return $ case r of
Yield a s -> Yield a (InterleaveSecond s st2)
Skip s -> Skip (InterleaveFirst s st2)
Stop -> Stop
step gst (InterleaveSecond st1 st2) = do
r <- step2 gst st2
return $ case r of
Yield a s -> Yield a (InterleaveFirst st1 s)
Skip s -> Skip (InterleaveSecond st1 s)
Stop -> Skip (InterleaveFirstOnly st1)
step gst (InterleaveFirstOnly st1) = do
r <- step1 gst st1
return $ case r of
Yield a s -> Yield a (InterleaveFirstOnly s)
Skip s -> Skip (InterleaveFirstOnly s)
Stop -> Stop
step _ (InterleaveSecondOnly _) = undefined
data InterleaveInfixState s1 s2 a
= InterleaveInfixFirst s1 s2
| InterleaveInfixSecondBuf s1 s2
| InterleaveInfixSecondYield s1 s2 a
| InterleaveInfixFirstYield s1 s2 a
| InterleaveInfixFirstOnly s1
{-# INLINE_NORMAL interleaveInfix #-}
interleaveInfix :: Monad m => Stream m a -> Stream m a -> Stream m a
interleaveInfix (Stream step1 state1) (Stream step2 state2) =
Stream step (InterleaveInfixFirst state1 state2)
where
{-# INLINE_LATE step #-}
step gst (InterleaveInfixFirst st1 st2) = do
r <- step1 gst st1
return $ case r of
Yield a s -> Yield a (InterleaveInfixSecondBuf s st2)
Skip s -> Skip (InterleaveInfixFirst s st2)
Stop -> Stop
step gst (InterleaveInfixSecondBuf st1 st2) = do
r <- step2 gst st2
return $ case r of
Yield a s -> Skip (InterleaveInfixSecondYield st1 s a)
Skip s -> Skip (InterleaveInfixSecondBuf st1 s)
Stop -> Skip (InterleaveInfixFirstOnly st1)
step gst (InterleaveInfixSecondYield st1 st2 x) = do
r <- step1 gst st1
return $ case r of
Yield a s -> Yield x (InterleaveInfixFirstYield s st2 a)
Skip s -> Skip (InterleaveInfixSecondYield s st2 x)
Stop -> Stop
step _ (InterleaveInfixFirstYield st1 st2 x) = do
return $ Yield x (InterleaveInfixSecondBuf st1 st2)
step gst (InterleaveInfixFirstOnly st1) = do
r <- step1 gst st1
return $ case r of
Yield a s -> Yield a (InterleaveInfixFirstOnly s)
Skip s -> Skip (InterleaveInfixFirstOnly s)
Stop -> Stop
{-# INLINE_NORMAL roundRobin #-}
roundRobin :: Monad m => Stream m a -> Stream m a -> Stream m a
roundRobin (Stream step1 state1) (Stream step2 state2) =
Stream step (InterleaveFirst state1 state2)
where
{-# INLINE_LATE step #-}
step gst (InterleaveFirst st1 st2) = do
r <- step1 gst st1
return $ case r of
Yield a s -> Yield a (InterleaveSecond s st2)
Skip s -> Skip (InterleaveSecond s st2)
Stop -> Skip (InterleaveSecondOnly st2)
step gst (InterleaveSecond st1 st2) = do
r <- step2 gst st2
return $ case r of
Yield a s -> Yield a (InterleaveFirst st1 s)
Skip s -> Skip (InterleaveFirst st1 s)
Stop -> Skip (InterleaveFirstOnly st1)
step gst (InterleaveSecondOnly st2) = do
r <- step2 gst st2
return $ case r of
Yield a s -> Yield a (InterleaveSecondOnly s)
Skip s -> Skip (InterleaveSecondOnly s)
Stop -> Stop
step gst (InterleaveFirstOnly st1) = do
r <- step1 gst st1
return $ case r of
Yield a s -> Yield a (InterleaveFirstOnly s)
Skip s -> Skip (InterleaveFirstOnly s)
Stop -> Stop
data ICUState s1 s2 i1 i2 =
ICUFirst s1 s2
| ICUSecond s1 s2
| ICUSecondOnly s2
| ICUFirstOnly s1
| ICUFirstInner s1 s2 i1
| ICUSecondInner s1 s2 i2
| ICUFirstOnlyInner s1 i1
| ICUSecondOnlyInner s2 i2
{-# INLINE_NORMAL gintercalateSuffix #-}
gintercalateSuffix
:: Monad m
=> Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
gintercalateSuffix
(Unfold istep1 inject1) (Stream step1 state1)
(Unfold istep2 inject2) (Stream step2 state2) =
Stream step (ICUFirst state1 state2)
where
{-# INLINE_LATE step #-}
step gst (ICUFirst s1 s2) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
i `seq` return (Skip (ICUFirstInner s s2 i))
Skip s -> return $ Skip (ICUFirst s s2)
Stop -> return Stop
step gst (ICUFirstOnly s1) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
i `seq` return (Skip (ICUFirstOnlyInner s i))
Skip s -> return $ Skip (ICUFirstOnly s)
Stop -> return Stop
step _ (ICUFirstInner s1 s2 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Yield x (ICUFirstInner s1 s2 i')
Skip i' -> Skip (ICUFirstInner s1 s2 i')
Stop -> Skip (ICUSecond s1 s2)
step _ (ICUFirstOnlyInner s1 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Yield x (ICUFirstOnlyInner s1 i')
Skip i' -> Skip (ICUFirstOnlyInner s1 i')
Stop -> Skip (ICUFirstOnly s1)
step gst (ICUSecond s1 s2) = do
r <- step2 (adaptState gst) s2
case r of
Yield a s -> do
i <- inject2 a
i `seq` return (Skip (ICUSecondInner s1 s i))
Skip s -> return $ Skip (ICUSecond s1 s)
Stop -> return $ Skip (ICUFirstOnly s1)
step _ (ICUSecondInner s1 s2 i2) = do
r <- istep2 i2
return $ case r of
Yield x i' -> Yield x (ICUSecondInner s1 s2 i')
Skip i' -> Skip (ICUSecondInner s1 s2 i')
Stop -> Skip (ICUFirst s1 s2)
step _ (ICUSecondOnly _s2) = undefined
step _ (ICUSecondOnlyInner _s2 _i2) = undefined
data InterposeSuffixState s1 i1 =
InterposeSuffixFirst s1
| InterposeSuffixFirstInner s1 i1
| InterposeSuffixSecond s1
{-# INLINE_NORMAL interposeSuffix #-}
interposeSuffix
:: Monad m
=> m c -> Unfold m b c -> Stream m b -> Stream m c
interposeSuffix
action
(Unfold istep1 inject1) (Stream step1 state1) =
Stream step (InterposeSuffixFirst state1)
where
{-# INLINE_LATE step #-}
step gst (InterposeSuffixFirst s1) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
i `seq` return (Skip (InterposeSuffixFirstInner s i))
Skip s -> return $ Skip (InterposeSuffixFirst s)
Stop -> return Stop
step _ (InterposeSuffixFirstInner s1 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Yield x (InterposeSuffixFirstInner s1 i')
Skip i' -> Skip (InterposeSuffixFirstInner s1 i')
Stop -> Skip (InterposeSuffixSecond s1)
step _ (InterposeSuffixSecond s1) = do
r <- action
return $ Yield r (InterposeSuffixFirst s1)
data ICALState s1 s2 i1 i2 a =
ICALFirst s1 s2
| ICALFirstInner s1 s2 i1
| ICALFirstOnly s1
| ICALFirstOnlyInner s1 i1
| ICALSecondInject s1 s2
| ICALFirstInject s1 s2 i2
| ICALSecondInner s1 s2 i1 i2
{-# INLINE_NORMAL gintercalate #-}
gintercalate
:: Monad m
=> Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
gintercalate
(Unfold istep1 inject1) (Stream step1 state1)
(Unfold istep2 inject2) (Stream step2 state2) =
Stream step (ICALFirst state1 state2)
where
{-# INLINE_LATE step #-}
step gst (ICALFirst s1 s2) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
i `seq` return (Skip (ICALFirstInner s s2 i))
Skip s -> return $ Skip (ICALFirst s s2)
Stop -> return Stop
step _ (ICALFirstInner s1 s2 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Yield x (ICALFirstInner s1 s2 i')
Skip i' -> Skip (ICALFirstInner s1 s2 i')
Stop -> Skip (ICALSecondInject s1 s2)
step gst (ICALFirstOnly s1) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
i `seq` return (Skip (ICALFirstOnlyInner s i))
Skip s -> return $ Skip (ICALFirstOnly s)
Stop -> return Stop
step _ (ICALFirstOnlyInner s1 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Yield x (ICALFirstOnlyInner s1 i')
Skip i' -> Skip (ICALFirstOnlyInner s1 i')
Stop -> Skip (ICALFirstOnly s1)
step gst (ICALSecondInject s1 s2) = do
r <- step2 (adaptState gst) s2
case r of
Yield a s -> do
i <- inject2 a
i `seq` return (Skip (ICALFirstInject s1 s i))
Skip s -> return $ Skip (ICALSecondInject s1 s)
Stop -> return $ Skip (ICALFirstOnly s1)
step gst (ICALFirstInject s1 s2 i2) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
i `seq` return (Skip (ICALSecondInner s s2 i i2))
Skip s -> return $ Skip (ICALFirstInject s s2 i2)
Stop -> return Stop
step _ (ICALSecondInner s1 s2 i1 i2) = do
r <- istep2 i2
return $ case r of
Yield x i' -> Yield x (ICALSecondInner s1 s2 i1 i')
Skip i' -> Skip (ICALSecondInner s1 s2 i1 i')
Stop -> Skip (ICALFirstInner s1 s2 i1)
data InterposeState s1 i1 a =
InterposeFirst s1
| InterposeFirstInner s1 i1
| InterposeFirstInject s1
| InterposeSecondYield s1 i1
{-# INLINE_NORMAL interpose #-}
interpose :: Monad m => m c -> Unfold m b c -> Stream m b -> Stream m c
interpose
action
(Unfold istep1 inject1) (Stream step1 state1) =
Stream step (InterposeFirst state1)
where
{-# INLINE_LATE step #-}
step gst (InterposeFirst s1) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
i `seq` return (Skip (InterposeFirstInner s i))
Skip s -> return $ Skip (InterposeFirst s)
Stop -> return Stop
step _ (InterposeFirstInner s1 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Yield x (InterposeFirstInner s1 i')
Skip i' -> Skip (InterposeFirstInner s1 i')
Stop -> Skip (InterposeFirstInject s1)
step gst (InterposeFirstInject s1) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
i `seq` return (Skip (InterposeSecondYield s i))
Skip s -> return $ Skip (InterposeFirstInject s)
Stop -> return Stop
step _ (InterposeSecondYield s1 i1) = do
r <- action
return $ Yield r (InterposeFirstInner s1 i1)
data GbracketState s1 s2 v
= GBracketInit
| GBracketNormal s1 v
| GBracketException s2
{-# INLINE_NORMAL gbracket #-}
gbracket
:: Monad m
=> m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d)
-> (c -> e -> Stream m b)
-> (c -> Stream m b)
-> Stream m b
gbracket bef exc aft fexc fnormal =
Stream step GBracketInit
where
{-# INLINE_LATE step #-}
step _ GBracketInit = do
r <- bef
return $ Skip $ GBracketNormal (fnormal r) r
step gst (GBracketNormal (UnStream step1 st) v) = do
res <- exc $ step1 gst st
case res of
Right r -> case r of
Yield x s ->
return $ Yield x (GBracketNormal (Stream step1 s) v)
Skip s -> return $ Skip (GBracketNormal (Stream step1 s) v)
Stop -> aft v >> return Stop
Left e -> return $ Skip (GBracketException (fexc v e))
step gst (GBracketException (UnStream step1 st)) = do
res <- step1 gst st
case res of
Yield x s -> return $ Yield x (GBracketException (Stream step1 s))
Skip s -> return $ Skip (GBracketException (Stream step1 s))
Stop -> return Stop
newFinalizedIORef :: (MonadIO m, MonadBaseControl IO m)
=> m a -> m (IORef (Maybe (IO ())))
newFinalizedIORef finalizer = do
mrun <- captureMonadState
ref <- liftIO $ newIORef $ Just $ liftIO $ void $ do
_ <- runInIO mrun finalizer
return ()
let finalizer1 = do
res <- readIORef ref
case res of
Nothing -> return ()
Just f -> f
_ <- liftIO $ mkWeakIORef ref finalizer1
return ref
runIORefFinalizer :: MonadIO m => IORef (Maybe (IO ())) -> m ()
runIORefFinalizer ref = liftIO $ do
res <- readIORef ref
case res of
Nothing -> return ()
Just f -> writeIORef ref Nothing >> f
clearIORefFinalizer :: MonadIO m => IORef (Maybe (IO ())) -> m ()
clearIORefFinalizer ref = liftIO $ writeIORef ref Nothing
data GbracketIOState s1 s2 v wref
= GBracketIOInit
| GBracketIONormal s1 v wref
| GBracketIOException s2
{-# INLINE_NORMAL gbracketIO #-}
gbracketIO
:: (MonadIO m, MonadBaseControl IO m)
=> m c
-> (forall s. m s -> m (Either e s))
-> (c -> m d)
-> (c -> e -> Stream m b)
-> (c -> Stream m b)
-> Stream m b
gbracketIO bef exc aft fexc fnormal =
Stream step GBracketIOInit
where
{-# INLINE_LATE step #-}
step _ GBracketIOInit = do
(r, ref) <- liftBaseOp_ mask_ $ do
r <- bef
ref <- newFinalizedIORef (aft r)
return (r, ref)
return $ Skip $ GBracketIONormal (fnormal r) r ref
step gst (GBracketIONormal (UnStream step1 st) v ref) = do
res <- exc $ step1 gst st
case res of
Right r -> case r of
Yield x s ->
return $ Yield x (GBracketIONormal (Stream step1 s) v ref)
Skip s ->
return $ Skip (GBracketIONormal (Stream step1 s) v ref)
Stop -> do
runIORefFinalizer ref
return Stop
Left e -> do
clearIORefFinalizer ref
return $ Skip (GBracketIOException (fexc v e))
step gst (GBracketIOException (UnStream step1 st)) = do
res <- step1 gst st
case res of
Yield x s ->
return $ Yield x (GBracketIOException (Stream step1 s))
Skip s -> return $ Skip (GBracketIOException (Stream step1 s))
Stop -> return Stop
{-# INLINE_NORMAL before #-}
before :: Monad m => m b -> Stream m a -> Stream m a
before action (Stream step state) = Stream step' Nothing
where
{-# INLINE_LATE step' #-}
step' _ Nothing = action >> return (Skip (Just state))
step' gst (Just st) = do
res <- step gst st
case res of
Yield x s -> return $ Yield x (Just s)
Skip s -> return $ Skip (Just s)
Stop -> return Stop
{-# INLINE_NORMAL after #-}
after :: Monad m => m b -> Stream m a -> Stream m a
after action (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' gst st = do
res <- step gst st
case res of
Yield x s -> return $ Yield x s
Skip s -> return $ Skip s
Stop -> action >> return Stop
{-# INLINE_NORMAL afterIO #-}
afterIO :: (MonadIO m, MonadBaseControl IO m)
=> m b -> Stream m a -> Stream m a
afterIO action (Stream step state) = Stream step' Nothing
where
{-# INLINE_LATE step' #-}
step' _ Nothing = do
ref <- newFinalizedIORef action
return $ Skip $ Just (state, ref)
step' gst (Just (st, ref)) = do
res <- step gst st
case res of
Yield x s -> return $ Yield x (Just (s, ref))
Skip s -> return $ Skip (Just (s, ref))
Stop -> do
runIORefFinalizer ref
return Stop
{-# INLINE_NORMAL onException #-}
onException :: MonadCatch m => m b -> Stream m a -> Stream m a
onException action str =
gbracket (return ()) MC.try return
(\_ (e :: MC.SomeException) -> nilM (action >> MC.throwM e))
(\_ -> str)
{-# INLINE_NORMAL _onException #-}
_onException :: MonadCatch m => m b -> Stream m a -> Stream m a
_onException action (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' gst st = do
res <- step gst st `MC.onException` action
case res of
Yield x s -> return $ Yield x s
Skip s -> return $ Skip s
Stop -> return Stop
{-# INLINE_NORMAL bracket #-}
bracket :: MonadCatch m => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket bef aft bet =
gbracket bef MC.try aft
(\a (e :: SomeException) -> nilM (aft a >> MC.throwM e)) bet
{-# INLINE_NORMAL bracketIO #-}
bracketIO :: (MonadAsync m, MonadCatch m)
=> m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracketIO bef aft bet =
gbracketIO bef MC.try aft
(\a (e :: SomeException) -> nilM (aft a >> MC.throwM e)) bet
data BracketState s v = BracketInit | BracketRun s v
{-# INLINE_NORMAL _bracket #-}
_bracket :: MonadCatch m => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
_bracket bef aft bet = Stream step' BracketInit
where
{-# INLINE_LATE step' #-}
step' _ BracketInit = bef >>= \x -> return (Skip (BracketRun (bet x) x))
step' gst (BracketRun (UnStream step state) v) = do
res <- MC.try $ step gst state
case res of
Left (e :: SomeException) -> aft v >> MC.throwM e >> return Stop
Right r -> case r of
Yield x s -> return $ Yield x (BracketRun (Stream step s) v)
Skip s -> return $ Skip (BracketRun (Stream step s) v)
Stop -> aft v >> return Stop
{-# INLINE finally #-}
finally :: MonadCatch m => m b -> Stream m a -> Stream m a
finally action xs = bracket (return ()) (\_ -> action) (const xs)
{-# INLINE finallyIO #-}
finallyIO :: (MonadAsync m, MonadCatch m) => m b -> Stream m a -> Stream m a
finallyIO action xs = bracketIO (return ()) (\_ -> action) (const xs)
{-# INLINE_NORMAL handle #-}
handle :: (MonadCatch m, Exception e)
=> (e -> Stream m a) -> Stream m a -> Stream m a
handle f str =
gbracket (return ()) MC.try return (\_ e -> f e) (\_ -> str)
{-# INLINE_NORMAL _handle #-}
_handle :: (MonadCatch m, Exception e)
=> (e -> Stream m a) -> Stream m a -> Stream m a
_handle f (Stream step state) = Stream step' (Left state)
where
{-# INLINE_LATE step' #-}
step' gst (Left st) = do
res <- MC.try $ step gst st
case res of
Left e -> return $ Skip $ Right (f e)
Right r -> case r of
Yield x s -> return $ Yield x (Left s)
Skip s -> return $ Skip (Left s)
Stop -> return Stop
step' gst (Right (UnStream step1 st)) = do
res <- step1 gst st
case res of
Yield x s -> return $ Yield x (Right (Stream step1 s))
Skip s -> return $ Skip (Right (Stream step1 s))
Stop -> return Stop
{-# INLINE_NORMAL transform #-}
transform :: Monad m => Pipe m a b -> Stream m a -> Stream m b
transform (Pipe pstep1 pstep2 pstate) (Stream step state) =
Stream step' (Consume pstate, state)
where
{-# INLINE_LATE step' #-}
step' gst (Consume pst, st) = pst `seq` do
r <- step (adaptState gst) st
case r of
Yield x s -> do
res <- pstep1 pst x
case res of
Pipe.Yield b pst' -> return $ Yield b (pst', s)
Pipe.Continue pst' -> return $ Skip (pst', s)
Skip s -> return $ Skip (Consume pst, s)
Stop -> return Stop
step' _ (Produce pst, st) = pst `seq` do
res <- pstep2 pst
case res of
Pipe.Yield b pst' -> return $ Yield b (pst', st)
Pipe.Continue pst' -> return $ Skip (pst', st)
{-# INLINE_NORMAL prescanlM' #-}
prescanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
prescanlM' f mz (Stream step state) = Stream step' (state, mz)
where
{-# INLINE_LATE step' #-}
step' gst (st, prev) = do
r <- step (adaptState gst) st
case r of
Yield x s -> do
acc <- prev
return $ Yield acc (s, f acc x)
Skip s -> return $ Skip (s, prev)
Stop -> return Stop
{-# INLINE prescanl' #-}
prescanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
prescanl' f z = prescanlM' (\a b -> return (f a b)) (return z)
{-# INLINE_NORMAL postscanlMx' #-}
postscanlMx' :: Monad m
=> (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
postscanlMx' fstep begin done (Stream step state) = do
Stream step' (state, begin)
where
{-# INLINE_LATE step' #-}
step' gst (st, acc) = do
r <- step (adaptState gst) st
case r of
Yield x s -> do
old <- acc
y <- fstep old x
v <- done y
v `seq` y `seq` return (Yield v (s, return y))
Skip s -> return $ Skip (s, acc)
Stop -> return Stop
{-# INLINE_NORMAL postscanlx' #-}
postscanlx' :: Monad m
=> (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
postscanlx' fstep begin done s =
postscanlMx' (\b a -> return (fstep b a)) (return begin) (return . done) s
{-# INLINE scanlMx' #-}
scanlMx' :: Monad m
=> (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
scanlMx' fstep begin done s =
(begin >>= \x -> x `seq` done x) `consM` postscanlMx' fstep begin done s
{-# INLINE scanlx' #-}
scanlx' :: Monad m
=> (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
scanlx' fstep begin done s =
scanlMx' (\b a -> return (fstep b a)) (return begin) (return . done) s
{-# INLINE_NORMAL postscanlM' #-}
postscanlM' :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b
postscanlM' fstep begin (Stream step state) =
begin `seq` Stream step' (state, begin)
where
{-# INLINE_LATE step' #-}
step' gst (st, acc) = acc `seq` do
r <- step (adaptState gst) st
case r of
Yield x s -> do
y <- fstep acc x
y `seq` return (Yield y (s, y))
Skip s -> return $ Skip (s, acc)
Stop -> return Stop
{-# INLINE_NORMAL postscanl' #-}
postscanl' :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a
postscanl' f = postscanlM' (\a b -> return (f a b))
{-# INLINE_NORMAL postscanlM #-}
postscanlM :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b
postscanlM fstep begin (Stream step state) = Stream step' (state, begin)
where
{-# INLINE_LATE step' #-}
step' gst (st, acc) = do
r <- step (adaptState gst) st
case r of
Yield x s -> do
y <- fstep acc x
return (Yield y (s, y))
Skip s -> return $ Skip (s, acc)
Stop -> return Stop
{-# INLINE_NORMAL postscanl #-}
postscanl :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a
postscanl f = postscanlM (\a b -> return (f a b))
{-# INLINE_NORMAL scanlM' #-}
scanlM' :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b
scanlM' fstep begin s = begin `seq` (begin `cons` postscanlM' fstep begin s)
{-# INLINE scanl' #-}
scanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
scanl' f = scanlM' (\a b -> return (f a b))
{-# INLINE_NORMAL scanlM #-}
scanlM :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b
scanlM fstep begin s = begin `cons` postscanlM fstep begin s
{-# INLINE scanl #-}
scanl :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
scanl f = scanlM (\a b -> return (f a b))
{-# INLINE_NORMAL scanl1M #-}
scanl1M :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a
scanl1M fstep (Stream step state) = Stream step' (state, Nothing)
where
{-# INLINE_LATE step' #-}
step' gst (st, Nothing) = do
r <- step gst st
case r of
Yield x s -> return $ Yield x (s, Just x)
Skip s -> return $ Skip (s, Nothing)
Stop -> return Stop
step' gst (st, Just acc) = do
r <- step gst st
case r of
Yield y s -> do
z <- fstep acc y
return $ Yield z (s, Just z)
Skip s -> return $ Skip (s, Just acc)
Stop -> return Stop
{-# INLINE scanl1 #-}
scanl1 :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a
scanl1 f = scanl1M (\x y -> return (f x y))
{-# INLINE_NORMAL scanl1M' #-}
scanl1M' :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a
scanl1M' fstep (Stream step state) = Stream step' (state, Nothing)
where
{-# INLINE_LATE step' #-}
step' gst (st, Nothing) = do
r <- step gst st
case r of
Yield x s -> x `seq` return $ Yield x (s, Just x)
Skip s -> return $ Skip (s, Nothing)
Stop -> return Stop
step' gst (st, Just acc) = acc `seq` do
r <- step gst st
case r of
Yield y s -> do
z <- fstep acc y
z `seq` return $ Yield z (s, Just z)
Skip s -> return $ Skip (s, Just acc)
Stop -> return Stop
{-# INLINE scanl1' #-}
scanl1' :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a
scanl1' f = scanl1M' (\x y -> return (f x y))
data RollingMapState s a = RollingMapInit s | RollingMapGo s a
{-# INLINE rollingMapM #-}
rollingMapM :: Monad m => (a -> a -> m b) -> Stream m a -> Stream m b
rollingMapM f (Stream step1 state1) = Stream step (RollingMapInit state1)
where
step gst (RollingMapInit st) = do
r <- step1 (adaptState gst) st
return $ case r of
Yield x s -> Skip $ RollingMapGo s x
Skip s -> Skip $ RollingMapInit s
Stop -> Stop
step gst (RollingMapGo s1 x1) = do
r <- step1 (adaptState gst) s1
case r of
Yield x s -> do
!res <- f x x1
return $ Yield res $ RollingMapGo s x
Skip s -> return $ Skip $ RollingMapGo s x1
Stop -> return $ Stop
{-# INLINE rollingMap #-}
rollingMap :: Monad m => (a -> a -> b) -> Stream m a -> Stream m b
rollingMap f = rollingMapM (\x y -> return $ f x y)
{-# INLINE tap #-}
tap :: Monad m => Fold m a b -> Stream m a -> Stream m a
tap (Fold fstep initial extract) (Stream step state) = Stream step' Nothing
where
step' _ Nothing = do
r <- initial
return $ Skip (Just (r, state))
step' gst (Just (acc, st)) = acc `seq` do
r <- step gst st
case r of
Yield x s -> do
acc' <- fstep acc x
return $ Yield x (Just (acc', s))
Skip s -> return $ Skip (Just (acc, s))
Stop -> do
void $ extract acc
return $ Stop
{-# INLINE_NORMAL tapOffsetEvery #-}
tapOffsetEvery :: Monad m
=> Int -> Int -> Fold m a b -> Stream m a -> Stream m a
tapOffsetEvery offset n (Fold fstep initial extract) (Stream step state) =
Stream step' Nothing
where
{-# INLINE_LATE step' #-}
step' _ Nothing = do
r <- initial
return $ Skip (Just (r, state, offset `mod` n))
step' gst (Just (acc, st, count)) | count <= 0 = do
r <- step gst st
case r of
Yield x s -> do
!acc' <- fstep acc x
return $ Yield x (Just (acc', s, n - 1))
Skip s -> return $ Skip (Just (acc, s, count))
Stop -> do
void $ extract acc
return $ Stop
step' gst (Just (acc, st, count)) = do
r <- step gst st
case r of
Yield x s -> return $ Yield x (Just (acc, s, count - 1))
Skip s -> return $ Skip (Just (acc, s, count))
Stop -> do
void $ extract acc
return $ Stop
{-# INLINE_NORMAL pollCounts #-}
pollCounts
:: MonadAsync m
=> (a -> Bool)
-> (Stream m Int -> Stream m Int)
-> Fold m Int b
-> Stream m a
-> Stream m a
pollCounts predicate transf fld (Stream step state) = Stream step' Nothing
where
{-# INLINE_LATE step' #-}
step' _ Nothing = do
countVar <- liftIO $ newVar (0 :: Int)
tid <- forkManaged
$ void $ runFold fld
$ transf $ fromPrimVar countVar
return $ Skip (Just (countVar, tid, state))
step' gst (Just (countVar, tid, st)) = do
r <- step gst st
case r of
Yield x s -> do
when (predicate x) $ liftIO $ modifyVar' countVar (+ 1)
return $ Yield x (Just (countVar, tid, s))
Skip s -> return $ Skip (Just (countVar, tid, s))
Stop -> do
liftIO $ killThread tid
return Stop
{-# INLINE_NORMAL tapRate #-}
tapRate ::
(MonadAsync m, MonadCatch m)
=> Double
-> (Int -> m b)
-> Stream m a
-> Stream m a
tapRate samplingRate action (Stream step state) = Stream step' Nothing
where
{-# NOINLINE loop #-}
loop countVar prev = do
i <-
MC.catch
(do liftIO $ threadDelay (round $ samplingRate * 1000000)
i <- liftIO $ readVar countVar
let !diff = i - prev
void $ action diff
return i)
(\(e :: AsyncException) -> do
i <- liftIO $ readVar countVar
let !diff = i - prev
void $ action diff
throwM (MC.toException e))
loop countVar i
{-# INLINE_LATE step' #-}
step' _ Nothing = do
countVar <- liftIO $ newVar 0
tid <- fork $ loop countVar 0
ref <- liftIO $ newIORef ()
_ <- liftIO $ mkWeakIORef ref (killThread tid)
return $ Skip (Just (countVar, tid, state, ref))
step' gst (Just (countVar, tid, st, ref)) = do
r <- step gst st
case r of
Yield x s -> do
liftIO $ modifyVar' countVar (+ 1)
return $ Yield x (Just (countVar, tid, s, ref))
Skip s -> return $ Skip (Just (countVar, tid, s, ref))
Stop -> do
liftIO $ killThread tid
return Stop
{-# INLINE_NORMAL takeWhileM #-}
takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
takeWhileM f (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' gst st = do
r <- step gst st
case r of
Yield x s -> do
b <- f x
return $ if b then Yield x s else Stop
Skip s -> return $ Skip s
Stop -> return Stop
{-# INLINE takeWhile #-}
takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
takeWhile f = takeWhileM (return . f)
{-# INLINE_NORMAL drop #-}
drop :: Monad m => Int -> Stream m a -> Stream m a
drop n (Stream step state) = Stream step' (state, Just n)
where
{-# INLINE_LATE step' #-}
step' gst (st, Just i)
| i > 0 = do
r <- step gst st
return $
case r of
Yield _ s -> Skip (s, Just (i - 1))
Skip s -> Skip (s, Just i)
Stop -> Stop
| otherwise = return $ Skip (st, Nothing)
step' gst (st, Nothing) = do
r <- step gst st
return $
case r of
Yield x s -> Yield x (s, Nothing)
Skip s -> Skip (s, Nothing)
Stop -> Stop
data DropWhileState s a
= DropWhileDrop s
| DropWhileYield a s
| DropWhileNext s
{-# INLINE_NORMAL dropWhileM #-}
dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
dropWhileM f (Stream step state) = Stream step' (DropWhileDrop state)
where
{-# INLINE_LATE step' #-}
step' gst (DropWhileDrop st) = do
r <- step gst st
case r of
Yield x s -> do
b <- f x
if b
then return $ Skip (DropWhileDrop s)
else return $ Skip (DropWhileYield x s)
Skip s -> return $ Skip (DropWhileDrop s)
Stop -> return Stop
step' gst (DropWhileNext st) = do
r <- step gst st
case r of
Yield x s -> return $ Skip (DropWhileYield x s)
Skip s -> return $ Skip (DropWhileNext s)
Stop -> return Stop
step' _ (DropWhileYield x st) = return $ Yield x (DropWhileNext st)
{-# INLINE dropWhile #-}
dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
dropWhile f = dropWhileM (return . f)
{-# INLINE_NORMAL filterM #-}
filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
filterM f (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' gst st = do
r <- step gst st
case r of
Yield x s -> do
b <- f x
return $ if b
then Yield x s
else Skip s
Skip s -> return $ Skip s
Stop -> return Stop
{-# INLINE filter #-}
filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
filter f = filterM (return . f)
{-# INLINE_NORMAL uniq #-}
uniq :: (Eq a, Monad m) => Stream m a -> Stream m a
uniq (Stream step state) = Stream step' (Nothing, state)
where
{-# INLINE_LATE step' #-}
step' gst (Nothing, st) = do
r <- step gst st
case r of
Yield x s -> return $ Yield x (Just x, s)
Skip s -> return $ Skip (Nothing, s)
Stop -> return Stop
step' gst (Just x, st) = do
r <- step gst st
case r of
Yield y s | x == y -> return $ Skip (Just x, s)
| otherwise -> return $ Yield y (Just y, s)
Skip s -> return $ Skip (Just x, s)
Stop -> return Stop
{-# INLINE_NORMAL sequence #-}
sequence :: Monad m => Stream m (m a) -> Stream m a
sequence (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' gst st = do
r <- step (adaptState gst) st
case r of
Yield x s -> x >>= \a -> return (Yield a s)
Skip s -> return $ Skip s
Stop -> return Stop
data LoopState x s = FirstYield s
| InterspersingYield s
| YieldAndCarry x s
{-# INLINE_NORMAL intersperseM #-}
intersperseM :: Monad m => m a -> Stream m a -> Stream m a
intersperseM m (Stream step state) = Stream step' (FirstYield state)
where
{-# INLINE_LATE step' #-}
step' gst (FirstYield st) = do
r <- step gst st
return $
case r of
Yield x s -> Skip (YieldAndCarry x s)
Skip s -> Skip (FirstYield s)
Stop -> Stop
step' gst (InterspersingYield st) = do
r <- step gst st
case r of
Yield x s -> do
a <- m
return $ Yield a (YieldAndCarry x s)
Skip s -> return $ Skip $ InterspersingYield s
Stop -> return Stop
step' _ (YieldAndCarry x st) = return $ Yield x (InterspersingYield st)
data SuffixState s a
= SuffixElem s
| SuffixSuffix s
| SuffixYield a (SuffixState s a)
{-# INLINE_NORMAL intersperseSuffix #-}
intersperseSuffix :: forall m a. Monad m => m a -> Stream m a -> Stream m a
intersperseSuffix action (Stream step state) = Stream step' (SuffixElem state)
where
{-# INLINE_LATE step' #-}
step' gst (SuffixElem st) = do
r <- step gst st
return $ case r of
Yield x s -> Skip (SuffixYield x (SuffixSuffix s))
Skip s -> Skip (SuffixElem s)
Stop -> Stop
step' _ (SuffixSuffix st) = do
action >>= \r -> return $ Skip (SuffixYield r (SuffixElem st))
step' _ (SuffixYield x next) = return $ Yield x next
data SuffixSpanState s a
= SuffixSpanElem s Int
| SuffixSpanSuffix s
| SuffixSpanYield a (SuffixSpanState s a)
| SuffixSpanLast
| SuffixSpanStop
{-# INLINE_NORMAL intersperseSuffixBySpan #-}
intersperseSuffixBySpan :: forall m a. Monad m
=> Int -> m a -> Stream m a -> Stream m a
intersperseSuffixBySpan n action (Stream step state) =
Stream step' (SuffixSpanElem state n)
where
{-# INLINE_LATE step' #-}
step' gst (SuffixSpanElem st i) | i > 0 = do
r <- step gst st
return $ case r of
Yield x s -> Skip (SuffixSpanYield x (SuffixSpanElem s (i - 1)))
Skip s -> Skip (SuffixSpanElem s i)
Stop -> if i == n then Stop else Skip SuffixSpanLast
step' _ (SuffixSpanElem st _) = return $ Skip (SuffixSpanSuffix st)
step' _ (SuffixSpanSuffix st) = do
action >>= \r -> return $ Skip (SuffixSpanYield r (SuffixSpanElem st n))
step' _ (SuffixSpanLast) = do
action >>= \r -> return $ Skip (SuffixSpanYield r SuffixSpanStop)
step' _ (SuffixSpanYield x next) = return $ Yield x next
step' _ (SuffixSpanStop) = return Stop
{-# INLINE intersperse #-}
intersperse :: Monad m => a -> Stream m a -> Stream m a
intersperse a = intersperseM (return a)
{-# INLINE_NORMAL insertBy #-}
insertBy :: Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a
insertBy cmp a (Stream step state) = Stream step' (state, False, Nothing)
where
{-# INLINE_LATE step' #-}
step' gst (st, False, _) = do
r <- step gst st
case r of
Yield x s -> case cmp a x of
GT -> return $ Yield x (s, False, Nothing)
_ -> return $ Yield a (s, True, Just x)
Skip s -> return $ Skip (s, False, Nothing)
Stop -> return $ Yield a (st, True, Nothing)
step' _ (_, True, Nothing) = return Stop
step' gst (st, True, Just prev) = do
r <- step gst st
case r of
Yield x s -> return $ Yield prev (s, True, Just x)
Skip s -> return $ Skip (s, True, Just prev)
Stop -> return $ Yield prev (st, True, Nothing)
{-# INLINE_NORMAL deleteBy #-}
deleteBy :: Monad m => (a -> a -> Bool) -> a -> Stream m a -> Stream m a
deleteBy eq x (Stream step state) = Stream step' (state, False)
where
{-# INLINE_LATE step' #-}
step' gst (st, False) = do
r <- step gst st
case r of
Yield y s -> return $
if eq x y then Skip (s, True) else Yield y (s, False)
Skip s -> return $ Skip (s, False)
Stop -> return Stop
step' gst (st, True) = do
r <- step gst st
case r of
Yield y s -> return $ Yield y (s, True)
Skip s -> return $ Skip (s, True)
Stop -> return Stop
{-# INLINE_NORMAL mapMaybe #-}
mapMaybe :: Monad m => (a -> Maybe b) -> Stream m a -> Stream m b
mapMaybe f = fmap fromJust . filter isJust . map f
{-# INLINE_NORMAL mapMaybeM #-}
mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b
mapMaybeM f = fmap fromJust . filter isJust . mapM f
{-# INLINE_NORMAL indexed #-}
indexed :: Monad m => Stream m a -> Stream m (Int, a)
indexed (Stream step state) = Stream step' (state, 0)
where
{-# INLINE_LATE step' #-}
step' gst (st, i) = i `seq` do
r <- step (adaptState gst) st
case r of
Yield x s -> return $ Yield (i, x) (s, i+1)
Skip s -> return $ Skip (s, i)
Stop -> return Stop
{-# INLINE_NORMAL indexedR #-}
indexedR :: Monad m => Int -> Stream m a -> Stream m (Int, a)
indexedR m (Stream step state) = Stream step' (state, m)
where
{-# INLINE_LATE step' #-}
step' gst (st, i) = i `seq` do
r <- step (adaptState gst) st
case r of
Yield x s -> let i' = i - 1
in return $ Yield (i, x) (s, i')
Skip s -> return $ Skip (s, i)
Stop -> return Stop
{-# INLINE_NORMAL zipWithM #-}
zipWithM :: Monad m
=> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipWithM f (Stream stepa ta) (Stream stepb tb) = Stream step (ta, tb, Nothing)
where
{-# INLINE_LATE step #-}
step gst (sa, sb, Nothing) = do
r <- stepa (adaptState gst) sa
return $
case r of
Yield x sa' -> Skip (sa', sb, Just x)
Skip sa' -> Skip (sa', sb, Nothing)
Stop -> Stop
step gst (sa, sb, Just x) = do
r <- stepb (adaptState gst) sb
case r of
Yield y sb' -> do
z <- f x y
return $ Yield z (sa, sb', Nothing)
Skip sb' -> return $ Skip (sa, sb', Just x)
Stop -> return Stop
#if __GLASGOW_HASKELL__ >= 801
{-# RULES "zipWithM xs xs"
forall f xs. zipWithM @Identity f xs xs = mapM (\x -> f x x) xs #-}
#endif
{-# INLINE zipWith #-}
zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWith f = zipWithM (\a b -> return (f a b))
{-# INLINE_NORMAL mergeByM #-}
mergeByM
:: (Monad m)
=> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
mergeByM cmp (Stream stepa ta) (Stream stepb tb) =
Stream step (Just ta, Just tb, Nothing, Nothing)
where
{-# INLINE_LATE step #-}
step gst (Just sa, sb, Nothing, b) = do
r <- stepa gst sa
return $ case r of
Yield a sa' -> Skip (Just sa', sb, Just a, b)
Skip sa' -> Skip (Just sa', sb, Nothing, b)
Stop -> Skip (Nothing, sb, Nothing, b)
step gst (sa, Just sb, a, Nothing) = do
r <- stepb gst sb
return $ case r of
Yield b sb' -> Skip (sa, Just sb', a, Just b)
Skip sb' -> Skip (sa, Just sb', a, Nothing)
Stop -> Skip (sa, Nothing, a, Nothing)
step _ (sa, sb, Just a, Just b) = do
res <- cmp a b
return $ case res of
GT -> Yield b (sa, sb, Just a, Nothing)
_ -> Yield a (sa, sb, Nothing, Just b)
step _ (Nothing, sb, Nothing, Just b) =
return $ Yield b (Nothing, sb, Nothing, Nothing)
step _ (sa, Nothing, Just a, Nothing) =
return $ Yield a (sa, Nothing, Nothing, Nothing)
step _ (Nothing, Nothing, Nothing, Nothing) = return Stop
{-# INLINE mergeBy #-}
mergeBy
:: (Monad m)
=> (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
mergeBy cmp = mergeByM (\a b -> return $ cmp a b)
{-# INLINE_NORMAL the #-}
the :: (Eq a, Monad m) => Stream m a -> m (Maybe a)
the (Stream step state) = go state
where
go st = do
r <- step defState st
case r of
Yield x s -> go' x s
Skip s -> go s
Stop -> return Nothing
go' n st = do
r <- step defState st
case r of
Yield x s | x == n -> go' n s
| otherwise -> return Nothing
Skip s -> go' n s
Stop -> return (Just n)
{-# INLINE runFold #-}
runFold :: (Monad m) => Fold m a b -> Stream m a -> m b
runFold (Fold step begin done) = foldlMx' step begin done
{-# INLINE toSVarParallel #-}
toSVarParallel :: MonadAsync m
=> State t m a -> SVar t m a -> Stream m a -> m ()
toSVarParallel st sv xs =
if svarInspectMode sv
then forkWithDiag
else do
tid <-
case getYieldLimit st of
Nothing -> doFork (work Nothing)
(svarMrun sv)
(handleChildException sv)
Just _ -> doFork (workLim Nothing)
(svarMrun sv)
(handleChildException sv)
modifyThread sv tid
where
{-# NOINLINE work #-}
work info = (runFold (FL.toParallelSVar sv info) xs)
{-# NOINLINE workLim #-}
workLim info = runFold (FL.toParallelSVarLimited sv info) xs
{-# NOINLINE forkWithDiag #-}
forkWithDiag = do
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
recordMaxWorkers sv
winfo <-
case yieldRateInfo sv of
Nothing -> return Nothing
Just _ -> liftIO $ do
cntRef <- newIORef 0
t <- getTime Monotonic
lat <- newIORef (0, t)
return $ Just WorkerInfo
{ workerYieldMax = 0
, workerYieldCount = cntRef
, workerLatencyStart = lat
}
tid <-
case getYieldLimit st of
Nothing -> doFork (work winfo)
(svarMrun sv)
(handleChildException sv)
Just _ -> doFork (workLim winfo)
(svarMrun sv)
(handleChildException sv)
modifyThread sv tid
{-# INLINE_NORMAL mkParallelD #-}
mkParallelD :: MonadAsync m => Stream m a -> Stream m a
mkParallelD m = Stream step Nothing
where
step gst Nothing = do
sv <- newParallelVar StopNone gst
toSVarParallel gst sv m
return $ Skip $ Just $ fromSVar sv
step gst (Just (UnStream step1 st)) = do
r <- step1 gst st
return $ case r of
Yield a s -> Yield a (Just $ Stream step1 s)
Skip s -> Skip (Just $ Stream step1 s)
Stop -> Stop
{-# INLINE_NORMAL mkParallel #-}
mkParallel :: (K.IsStream t, MonadAsync m) => t m a -> t m a
mkParallel = fromStreamD . mkParallelD . toStreamD
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: (K.IsStream t, MonadAsync m) => m ((a -> m ()), t m a)
newCallbackStream = do
sv <- newParallelVar StopNone defState
liftIO myThreadId >>= modifyThread sv
let callback a = liftIO $ void $ send sv (ChildYield a)
return (callback, fromStreamD (fromSVar sv))
{-# INLINE newFoldSVar #-}
newFoldSVar :: MonadAsync m => State t m a -> Fold m a b -> m (SVar t m a)
newFoldSVar stt f = do
sv <- newParallelVar StopAny (adaptState stt)
liftIO myThreadId >>= modifyThread sv
void $ doFork (work sv) (svarMrun sv) (handleFoldException sv)
return sv
where
{-# NOINLINE work #-}
work sv = void $ runFold f $ fromProducer sv
data TapState sv st = TapInit | Tapping sv st | TapDone st
{-# INLINE_NORMAL tapAsync #-}
tapAsync :: MonadAsync m => Fold m a b -> Stream m a -> Stream m a
tapAsync f (Stream step1 state1) = Stream step TapInit
where
drainFold svr = do
done <- fromConsumer svr
when (not done) $ do
liftIO $ withDiagMVar svr "teeToSVar: waiting to drain"
$ takeMVar (outputDoorBellFromConsumer svr)
drainFold svr
stopFold svr = do
liftIO $ sendStop svr Nothing
drainFold svr
{-# INLINE_LATE step #-}
step gst TapInit = do
sv <- newFoldSVar gst f
return $ Skip (Tapping sv state1)
step gst (Tapping sv st) = do
r <- step1 gst st
case r of
Yield a s -> do
done <- pushToFold sv a
if done
then do
stopFold sv
return $ Yield a (TapDone s)
else return $ Yield a (Tapping sv s)
Skip s -> return $ Skip (Tapping sv s)
Stop -> do
stopFold sv
return $ Stop
step gst (TapDone st) = do
r <- step1 gst st
return $ case r of
Yield a s -> Yield a (TapDone s)
Skip s -> Skip (TapDone s)
Stop -> Stop
{-# INLINE lastN #-}
lastN :: (Storable a, MonadIO m) => Int -> Fold m a (Array a)
lastN n
| n <= 0 = fmap (const mempty) FL.drain
| otherwise = Fold step initial done
where
step (Tuple3' rb rh i) a = do
rh1 <- liftIO $ RB.unsafeInsert rb rh a
return $ Tuple3' rb rh1 (i + 1)
initial = fmap (\(a, b) -> Tuple3' a b (0 :: Int)) $ liftIO $ RB.new n
done (Tuple3' rb rh i) = do
arr <- liftIO $ A.newArray n
foldFunc i rh snoc' arr rb
snoc' b a = liftIO $ A.unsafeSnoc b a
foldFunc i
| i < n = RB.unsafeFoldRingM
| otherwise = RB.unsafeFoldRingFullM
data TakeByTime st s
= TakeByTimeInit st
| TakeByTimeCheck st s
| TakeByTimeYield st s
{-# INLINE_NORMAL takeByTime #-}
takeByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a
takeByTime duration (Stream step1 state1) = Stream step (TakeByTimeInit state1)
where
lim = toRelTime64 duration
{-# INLINE_LATE step #-}
step _ (TakeByTimeInit _) | lim == 0 = return Stop
step _ (TakeByTimeInit st) = do
t0 <- liftIO $ getTime Monotonic
return $ Skip (TakeByTimeYield st t0)
step _ (TakeByTimeCheck st t0) = do
t <- liftIO $ getTime Monotonic
return $
if diffAbsTime64 t t0 > lim
then Stop
else Skip (TakeByTimeYield st t0)
step gst (TakeByTimeYield st t0) = do
r <- step1 gst st
return $ case r of
Yield x s -> Yield x (TakeByTimeCheck s t0)
Skip s -> Skip (TakeByTimeCheck s t0)
Stop -> Stop
data DropByTime st s x
= DropByTimeInit st
| DropByTimeGen st s
| DropByTimeCheck st s x
| DropByTimeYield st
{-# INLINE_NORMAL dropByTime #-}
dropByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a
dropByTime duration (Stream step1 state1) = Stream step (DropByTimeInit state1)
where
lim = toRelTime64 duration
{-# INLINE_LATE step #-}
step _ (DropByTimeInit st) = do
t0 <- liftIO $ getTime Monotonic
return $ Skip (DropByTimeGen st t0)
step gst (DropByTimeGen st t0) = do
r <- step1 gst st
return $ case r of
Yield x s -> Skip (DropByTimeCheck s t0 x)
Skip s -> Skip (DropByTimeGen s t0)
Stop -> Stop
step _ (DropByTimeCheck st t0 x) = do
t <- liftIO $ getTime Monotonic
if diffAbsTime64 t t0 <= lim
then return $ Skip $ DropByTimeGen st t0
else return $ Yield x $ DropByTimeYield st
step gst (DropByTimeYield st) = do
r <- step1 gst st
return $ case r of
Yield x s -> Yield x (DropByTimeYield s)
Skip s -> Skip (DropByTimeYield s)
Stop -> Stop
{-# INLINE_NORMAL currentTime #-}
currentTime :: MonadAsync m => Double -> Stream m AbsTime
currentTime g = Stream step Nothing
where
g' = g * 10 ^ (6 :: Int)
{-# INLINE delayTime #-}
delayTime =
if g' >= fromIntegral (maxBound :: Int)
then maxBound
else round g'
updateTimeVar timeVar = do
threadDelay $ delayTime
MicroSecond64 t <- fromAbsTime <$> getTime Monotonic
modifyVar' timeVar (const t)
{-# INLINE_LATE step #-}
step _ Nothing = do
timeVar <- liftIO $ newVar (0 :: Int64)
tid <- forkManaged $ liftIO $ forever (updateTimeVar timeVar)
return $ Skip $ Just (timeVar, tid)
step _ s@(Just (timeVar, _)) = do
a <- liftIO $ readVar timeVar
return $ Yield (toAbsTime (MicroSecond64 a)) s