{-# LANGUAGE UndecidableInstances #-}
module Streamly.Internal.Data.Stream.ZipAsync
( ZipAsyncM(..)
, ZipAsync
, consMZipAsync
, zipAsyncWithK
, zipAsyncWithMK
)
where
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.Serial (SerialT(..))
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import Streamly.Internal.Data.SVar
import Prelude hiding (map, repeat, zipWith, errorWithoutStackTrace)
#include "Instances.hs"
{-# INLINE zipAsyncWithMK #-}
zipAsyncWithMK :: MonadAsync m
=> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK :: forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK a -> b -> m c
f Stream m a
m1 Stream m b
m2 = forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m c
st c -> Stream m c -> m r
yld c -> m r
sng m r
stp -> do
SVar Stream m b
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m c
st)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m c
st) SVar Stream m b
sv forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK Stream m b
m2
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m c
st c -> Stream m c -> m r
yld c -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
K.zipWithM a -> b -> m c
f Stream m a
m1 (forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a
SVar.fromSVar SVar Stream m b
sv))
{-# INLINE zipAsyncWithK #-}
zipAsyncWithK :: MonadAsync m
=> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK :: forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK a -> b -> c
f = forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK (\a
a b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> c
f a
a b
b))
newtype ZipAsyncM m a = ZipAsyncM {forall (m :: * -> *) a. ZipAsyncM m a -> Stream m a
getZipAsyncM :: Stream m a}
deriving (NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
stimes :: forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a
$cstimes :: forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
$csconcat :: forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
<> :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$c<> :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
Semigroup, ZipAsyncM m a
[ZipAsyncM m a] -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall (m :: * -> *) a. Semigroup (ZipAsyncM m a)
forall (m :: * -> *) a. ZipAsyncM m a
forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a
$cmconcat :: forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$cmappend :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mempty :: ZipAsyncM m a
$cmempty :: forall (m :: * -> *) a. ZipAsyncM m a
Monoid)
type ZipAsync = ZipAsyncM IO
consMZipAsync :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync :: forall (m :: * -> *) a.
Monad m =>
m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync m a
m (ZipAsyncM Stream m a
r) = forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
K.consM m a
m Stream m a
r
instance Monad m => Functor (ZipAsyncM m) where
{-# INLINE fmap #-}
fmap :: forall a b. (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
fmap a -> b
f (ZipAsyncM Stream m a
m) = forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT Stream m a
m)
instance MonadAsync m => Applicative (ZipAsyncM m) where
pure :: forall a. a -> ZipAsyncM m a
pure = forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => a -> SerialT m a
Serial.repeat
{-# INLINE (<*>) #-}
ZipAsyncM Stream m (a -> b)
m1 <*> :: forall a b. ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
<*> ZipAsyncM Stream m a
m2 = forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK forall a. a -> a
id Stream m (a -> b)
m1 Stream m a
m2