{-# LANGUAGE UndecidableInstances #-}
module Streamly.Internal.Data.Stream.Zip
(
ZipSerialM (..)
, ZipSerial
, consMZip
, zipWithK
, zipWithMK
, ZipAsyncM(..)
, ZipAsync
, consMZipAsync
, zipAsyncWithK
, zipAsyncWithMK
, ZipStream
)
where
import Control.Applicative (liftA2)
import Control.DeepSeq (NFData(..))
#if MIN_VERSION_deepseq(1,4,3)
import Control.DeepSeq (NFData1(..))
#endif
import Data.Foldable (Foldable(foldl'), fold)
import Data.Functor.Identity (Identity(..), runIdentity)
import Data.Maybe (fromMaybe)
import Data.Semigroup (Endo(..))
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import GHC.Exts (IsList(..), IsString(..))
import Text.Read
( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec
, readListPrecDefault)
import Streamly.Internal.BaseCompat ((#.), errorWithoutStackTrace, oneShot)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.Stream.Serial (SerialT(..))
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Prelude as P
(cmpBy, eqBy, foldl', foldr, fromList, toList)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import Prelude hiding (map, repeat, zipWith, errorWithoutStackTrace)
#include "Instances.hs"
{-# INLINE zipWithMK #-}
zipWithMK :: Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipWithMK :: (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipWithMK a -> b -> m c
f Stream m a
m1 Stream m b
m2 =
Stream m c -> Stream m c
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
D.toStreamK (Stream m c -> Stream m c) -> Stream m c -> Stream m c
forall a b. (a -> b) -> a -> b
$ (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
D.zipWithM a -> b -> m c
f (Stream m a -> Stream m a
forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK Stream m a
m1) (Stream m b -> Stream m b
forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK Stream m b
m2)
{-# INLINE zipWithK #-}
zipWithK :: Monad m
=> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWithK :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWithK a -> b -> c
f = (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipWithMK (\a
a b
b -> c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> c
f a
a b
b))
{-# INLINE zipAsyncWithMK #-}
zipAsyncWithMK :: MonadAsync m
=> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK :: (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK a -> b -> m c
f Stream m a
m1 Stream m b
m2 = Stream m c -> Stream m c
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
D.toStreamK (Stream m c -> Stream m c) -> Stream m c -> Stream m c
forall a b. (a -> b) -> a -> b
$
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
D.zipWithM a -> b -> m c
f (Stream m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
Par.mkParallelD (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a
forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK Stream m a
m1)
(Stream m b -> Stream m b
forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
Par.mkParallelD (Stream m b -> Stream m b) -> Stream m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ Stream m b -> Stream m b
forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK Stream m b
m2)
{-# INLINE zipAsyncWithK #-}
zipAsyncWithK :: MonadAsync m
=> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK a -> b -> c
f = (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithMK (\a
a b
b -> c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> c
f a
a b
b))
newtype ZipSerialM m a = ZipSerialM {ZipSerialM m a -> Stream m a
getZipSerialM :: Stream m a}
deriving (b -> ZipSerialM m a -> ZipSerialM m a
NonEmpty (ZipSerialM m a) -> ZipSerialM m a
ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
(ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a)
-> (NonEmpty (ZipSerialM m a) -> ZipSerialM m a)
-> (forall b. Integral b => b -> ZipSerialM m a -> ZipSerialM m a)
-> Semigroup (ZipSerialM m a)
forall b. Integral b => b -> ZipSerialM m a -> ZipSerialM m a
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall (m :: * -> *) a. NonEmpty (ZipSerialM m a) -> ZipSerialM m a
forall (m :: * -> *) a.
ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
forall (m :: * -> *) a b.
Integral b =>
b -> ZipSerialM m a -> ZipSerialM m a
stimes :: b -> ZipSerialM m a -> ZipSerialM m a
$cstimes :: forall (m :: * -> *) a b.
Integral b =>
b -> ZipSerialM m a -> ZipSerialM m a
sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a
$csconcat :: forall (m :: * -> *) a. NonEmpty (ZipSerialM m a) -> ZipSerialM m a
<> :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
$c<> :: forall (m :: * -> *) a.
ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
Semigroup, Semigroup (ZipSerialM m a)
ZipSerialM m a
Semigroup (ZipSerialM m a)
-> ZipSerialM m a
-> (ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a)
-> ([ZipSerialM m a] -> ZipSerialM m a)
-> Monoid (ZipSerialM m a)
[ZipSerialM m a] -> ZipSerialM m a
ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall (m :: * -> *) a. Semigroup (ZipSerialM m a)
forall (m :: * -> *) a. ZipSerialM m a
forall (m :: * -> *) a. [ZipSerialM m a] -> ZipSerialM m a
forall (m :: * -> *) a.
ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
mconcat :: [ZipSerialM m a] -> ZipSerialM m a
$cmconcat :: forall (m :: * -> *) a. [ZipSerialM m a] -> ZipSerialM m a
mappend :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
$cmappend :: forall (m :: * -> *) a.
ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
mempty :: ZipSerialM m a
$cmempty :: forall (m :: * -> *) a. ZipSerialM m a
$cp1Monoid :: forall (m :: * -> *) a. Semigroup (ZipSerialM m a)
Monoid)
{-# DEPRECATED ZipStream "Please use 'ZipSerialM' instead." #-}
type ZipStream = ZipSerialM
type ZipSerial = ZipSerialM IO
consMZip :: Monad m => m a -> ZipSerialM m a -> ZipSerialM m a
consMZip :: m a -> ZipSerialM m a -> ZipSerialM m a
consMZip m a
m (ZipSerialM Stream m a
r) = Stream m a -> ZipSerialM m a
forall (m :: * -> *) a. Stream m a -> ZipSerialM m a
ZipSerialM (Stream m a -> ZipSerialM m a) -> Stream m a -> ZipSerialM m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
K.consM m a
m Stream m a
r
LIST_INSTANCES(ZipSerialM)
NFDATA1_INSTANCE(ZipSerialM)
instance Monad m => Functor (ZipSerialM m) where
{-# INLINE fmap #-}
fmap :: (a -> b) -> ZipSerialM m a -> ZipSerialM m b
fmap a -> b
f (ZipSerialM Stream m a
m) = Stream m b -> ZipSerialM m b
forall (m :: * -> *) a. Stream m a -> ZipSerialM m a
ZipSerialM (Stream m b -> ZipSerialM m b) -> Stream m b -> ZipSerialM m b
forall a b. (a -> b) -> a -> b
$ SerialT m b -> Stream m b
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SerialT m b -> Stream m b) -> SerialT m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ (a -> b) -> SerialT m a -> SerialT m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT Stream m a
m)
instance Monad m => Applicative (ZipSerialM m) where
pure :: a -> ZipSerialM m a
pure = Stream m a -> ZipSerialM m a
forall (m :: * -> *) a. Stream m a -> ZipSerialM m a
ZipSerialM (Stream m a -> ZipSerialM m a)
-> (a -> Stream m a) -> a -> ZipSerialM m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SerialT m a -> Stream m a)
-> (a -> SerialT m a) -> a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> SerialT m a
forall (m :: * -> *) a. Monad m => a -> SerialT m a
Serial.repeat
{-# INLINE (<*>) #-}
ZipSerialM Stream m (a -> b)
m1 <*> :: ZipSerialM m (a -> b) -> ZipSerialM m a -> ZipSerialM m b
<*> ZipSerialM Stream m a
m2 = Stream m b -> ZipSerialM m b
forall (m :: * -> *) a. Stream m a -> ZipSerialM m a
ZipSerialM (Stream m b -> ZipSerialM m b) -> Stream m b -> ZipSerialM m b
forall a b. (a -> b) -> a -> b
$ ((a -> b) -> a -> b)
-> Stream m (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWithK (a -> b) -> a -> b
forall a. a -> a
id Stream m (a -> b)
m1 Stream m a
m2
FOLDABLE_INSTANCE(ZipSerialM)
TRAVERSABLE_INSTANCE(ZipSerialM)
newtype ZipAsyncM m a = ZipAsyncM {ZipAsyncM m a -> Stream m a
getZipAsyncM :: Stream m a}
deriving (b -> ZipAsyncM m a -> ZipAsyncM m a
NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
(ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a)
-> (NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a)
-> (forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a)
-> Semigroup (ZipAsyncM m a)
forall b. Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
stimes :: b -> ZipAsyncM m a -> ZipAsyncM m a
$cstimes :: forall (m :: * -> *) a b.
Integral b =>
b -> ZipAsyncM m a -> ZipAsyncM m a
sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
$csconcat :: forall (m :: * -> *) a. NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a
<> :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$c<> :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
Semigroup, Semigroup (ZipAsyncM m a)
ZipAsyncM m a
Semigroup (ZipAsyncM m a)
-> ZipAsyncM m a
-> (ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a)
-> ([ZipAsyncM m a] -> ZipAsyncM m a)
-> Monoid (ZipAsyncM m a)
[ZipAsyncM m a] -> ZipAsyncM m a
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall (m :: * -> *) a. Semigroup (ZipAsyncM m a)
forall (m :: * -> *) a. ZipAsyncM m a
forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a
$cmconcat :: forall (m :: * -> *) a. [ZipAsyncM m a] -> ZipAsyncM m a
mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
$cmappend :: forall (m :: * -> *) a.
ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a
mempty :: ZipAsyncM m a
$cmempty :: forall (m :: * -> *) a. ZipAsyncM m a
$cp1Monoid :: forall (m :: * -> *) a. Semigroup (ZipAsyncM m a)
Monoid)
type ZipAsync = ZipAsyncM IO
consMZipAsync :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync :: m a -> ZipAsyncM m a -> ZipAsyncM m a
consMZipAsync m a
m (ZipAsyncM Stream m a
r) = Stream m a -> ZipAsyncM m a
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m a -> ZipAsyncM m a) -> Stream m a -> ZipAsyncM m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
K.consM m a
m Stream m a
r
instance Monad m => Functor (ZipAsyncM m) where
{-# INLINE fmap #-}
fmap :: (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
fmap a -> b
f (ZipAsyncM Stream m a
m) = Stream m b -> ZipAsyncM m b
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m b -> ZipAsyncM m b) -> Stream m b -> ZipAsyncM m b
forall a b. (a -> b) -> a -> b
$ SerialT m b -> Stream m b
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SerialT m b -> Stream m b) -> SerialT m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ (a -> b) -> SerialT m a -> SerialT m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT Stream m a
m)
instance MonadAsync m => Applicative (ZipAsyncM m) where
pure :: a -> ZipAsyncM m a
pure = Stream m a -> ZipAsyncM m a
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m a -> ZipAsyncM m a)
-> (a -> Stream m a) -> a -> ZipAsyncM m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SerialT m a -> Stream m a)
-> (a -> SerialT m a) -> a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> SerialT m a
forall (m :: * -> *) a. Monad m => a -> SerialT m a
Serial.repeat
{-# INLINE (<*>) #-}
ZipAsyncM Stream m (a -> b)
m1 <*> :: ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b
<*> ZipAsyncM Stream m a
m2 = Stream m b -> ZipAsyncM m b
forall (m :: * -> *) a. Stream m a -> ZipAsyncM m a
ZipAsyncM (Stream m b -> ZipAsyncM m b) -> Stream m b -> ZipAsyncM m b
forall a b. (a -> b) -> a -> b
$ ((a -> b) -> a -> b)
-> Stream m (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b c.
MonadAsync m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithK (a -> b) -> a -> b
forall a. a -> a
id Stream m (a -> b)
m1 Stream m a
m2