{-# OPTIONS_GHC -Wno-deprecations #-}
{-# LANGUAGE UndecidableInstances #-}
module Streamly.Internal.Data.Stream.ZipAsync
{-# DEPRECATED "Use \"Streamly.Data.Stream.MkType\" instead." #-}
( ZipAsyncM(..)
, ZipAsync
, consMZipAsync
, zipAsyncWithK
, zipAsyncWithMK
)
where
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.StreamK (Stream)
import qualified Streamly.Internal.Data.StreamK as K
(mkStream, foldStream, zipWithM, consM)
import qualified Streamly.Internal.Data.Stream as D (fromStreamK)
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import qualified Streamly.Internal.Data.Stream.Serial as Stream
(fromStreamK, toStreamK)
import Streamly.Internal.Data.SVar
import Prelude hiding (map, repeat, zipWith, errorWithoutStackTrace)
#include "Instances.hs"
{-# INLINE zipAsyncWithMK #-}
zipAsyncWithMK :: MonadAsync m
=> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK :: forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK a -> b -> m c
f Stream m a
m1 Stream m b
m2 = (forall r.
State StreamK m c
-> (c -> StreamK m c -> m r) -> (c -> m r) -> m r -> m r)
-> StreamK m c
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m c
-> (c -> StreamK m c -> m r) -> (c -> m r) -> m r -> m r)
-> StreamK m c)
-> (forall r.
State StreamK m c
-> (c -> StreamK m c -> m r) -> (c -> m r) -> m r -> m r)
-> StreamK m c
forall a b. (a -> b) -> a -> b
$ \State StreamK m c
st c -> StreamK m c -> m r
yld c -> m r
sng m r
stp -> do
SVar StreamK m b
sv <- SVarStopStyle -> State StreamK m b -> m (SVar StreamK m b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (State StreamK m c -> State StreamK m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m c
st)
State StreamK m b -> SVar StreamK m b -> Stream m b -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel (State StreamK m c -> State StreamK m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m c
st) SVar StreamK m b
sv (Stream m b -> m ()) -> Stream m b -> m ()
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK Stream m b
m2
State StreamK m c
-> (c -> StreamK m c -> m r)
-> (c -> m r)
-> m r
-> StreamK m c
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m c
st c -> StreamK m c -> m r
yld c -> m r
sng m r
stp
(StreamK m c -> m r) -> StreamK m c -> m r
forall a b. (a -> b) -> a -> b
$ (a -> b -> m c) -> Stream m a -> Stream m b -> StreamK m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
K.zipWithM a -> b -> m c
f Stream m a
m1 (SerialT m b -> Stream m b
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SVar StreamK m b -> SerialT m b
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
SVar.fromSVar SVar StreamK m b
sv))
{-# INLINE zipAsyncWithK #-}
zipAsyncWithK :: MonadAsync m
=> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK :: forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK a -> b -> c
f = (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK (\a
a b
b -> c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> c
f a
a b
b))
newtype ZipAsyncM m a = ZipAsyncM {forall (m :: * -> *) a. ZipAsyncM m a -> Stream m a
getZipAsyncM :: Stream m a}
deriving (NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
(ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a)
-> (NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a)
-> (forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a)
-> Semigroup (ZipAsyncM m a)
forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
stimes :: forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a
$cstimes :: forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
$csconcat :: forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
<> :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$c<> :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
Semigroup, Semigroup (ZipAsyncM m a)
ZipAsyncM m a
Semigroup (ZipAsyncM m a)
-> ZipAsyncM m a
-> (ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a)
-> ([ZipAsyncM m a] -> ZipAsyncM m a)
-> Monoid (ZipAsyncM m a)
[ZipAsyncM m a] -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall (m :: * -> *) a. Semigroup (ZipAsyncM m a)
forall (m :: * -> *) a. ZipAsyncM m a
forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a
$cmconcat :: forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$cmappend :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mempty :: ZipAsyncM m a
$cmempty :: forall (m :: * -> *) a. ZipAsyncM m a
Monoid)
type ZipAsync = ZipAsyncM IO
consMZipAsync :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync :: forall (m :: * -> *) a.
Monad m =>
m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync m a
m (ZipAsyncM Stream m a
r) = Stream m a -> ZipAsyncM m a
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m a -> ZipAsyncM m a) -> Stream m a -> ZipAsyncM m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
K.consM m a
m Stream m a
r
instance Monad m => Functor (ZipAsyncM m) where
{-# INLINE fmap #-}
fmap :: forall a b. (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
fmap a -> b
f (ZipAsyncM Stream m a
m) =
Stream m b -> ZipAsyncM m b
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m b -> ZipAsyncM m b) -> Stream m b -> ZipAsyncM m b
forall a b. (a -> b) -> a -> b
$ SerialT m b -> Stream m b
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SerialT m b -> Stream m b) -> SerialT m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ (a -> b) -> SerialT m a -> SerialT m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
Stream.fromStreamK Stream m a
m)
instance MonadAsync m => Applicative (ZipAsyncM m) where
pure :: forall a. a -> ZipAsyncM m a
pure = Stream m a -> ZipAsyncM m a
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m a -> ZipAsyncM m a)
-> (a -> Stream m a) -> a -> ZipAsyncM m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SerialT m a -> Stream m a)
-> (a -> SerialT m a) -> a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> SerialT m a
forall (m :: * -> *) a. Monad m => a -> SerialT m a
Serial.repeat
{-# INLINE (<*>) #-}
ZipAsyncM Stream m (a -> b)
m1 <*> :: forall a b. ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
<*> ZipAsyncM Stream m a
m2 = Stream m b -> ZipAsyncM m b
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m b -> ZipAsyncM m b) -> Stream m b -> ZipAsyncM m b
forall a b. (a -> b) -> a -> b
$ ((a -> b) -> a -> b)
-> Stream m (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK (a -> b) -> a -> b
forall a. a -> a
id Stream m (a -> b)
m1 Stream m a
m2