{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-deprecations #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.Zip
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- To run examples in this module:
--
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Internal.Data.Stream as D
-- >>> import qualified Streamly.Data.Fold as Fold
--
module Streamly.Internal.Data.Stream.Zip
    {-# DEPRECATED "Use \"Streamly.Data.Stream.MkType\" instead." #-}
    (
      ZipSerialM (..)
    , ZipSerial
    , consMZip
    , zipWithK
    , zipWithMK

    , ZipConcurrent (..)

    -- * Deprecated
    , ZipStream
    )
where

#if !MIN_VERSION_base(4,18,0)
import Control.Applicative (liftA2)
#endif
import Control.DeepSeq (NFData(..))
#if MIN_VERSION_deepseq(1,4,3)
import Control.DeepSeq (NFData1(..))
#endif
import Data.Foldable (Foldable(foldl'), fold)
import Data.Functor.Identity (Identity(..), runIdentity)
import Data.Maybe (fromMaybe)
import Data.Semigroup (Endo(..))
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import GHC.Exts (IsList(..), IsString(..), oneShot)
import Text.Read
       ( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec
       , readListPrecDefault)
import Streamly.Internal.BaseCompat ((#.))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.Stream.Serial (SerialT(..))
import Streamly.Internal.Data.StreamK (Stream)
import Streamly.Internal.Data.Stream.Concurrent (MonadAsync, parZipWith)

import qualified Streamly.Internal.Data.Stream.Common as P
import qualified Streamly.Internal.Data.StreamK as K
import qualified Streamly.Internal.Data.Stream as D
import qualified Streamly.Internal.Data.Stream.Serial as Serial

import Prelude hiding (map, repeat, zipWith)

#include "Instances.hs"

-- $setup
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Internal.Data.Stream as D
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
--  delay n = do
--      threadDelay (n * 1000000)   -- sleep for n seconds
--      putStrLn (show n ++ " sec") -- print "n sec"
--      return n                    -- IO Int
-- :}

{-# INLINE zipWithMK #-}
zipWithMK :: Monad m =>
    (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipWithMK :: forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipWithMK a -> b -> m c
f Stream m a
m1 Stream m b
m2 =
    Stream m c -> StreamK m c
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK (Stream m c -> StreamK m c) -> Stream m c -> StreamK m c
forall a b. (a -> b) -> a -> b
$ (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
D.zipWithM a -> b -> m c
f (Stream m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK Stream m a
m1) (Stream m b -> Stream m b
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK Stream m b
m2)

{-# INLINE zipWithK #-}
zipWithK :: Monad m
    => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWithK :: forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWithK a -> b -> c
f = (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipWithMK (\a
a b
b -> c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> c
f a
a b
b))

------------------------------------------------------------------------------
-- Serially Zipping Streams
------------------------------------------------------------------------------

-- | For 'ZipSerialM' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.serial'
-- (<*>) = 'Streamly.Prelude.serial.zipWith' id
-- @
--
-- Applicative evaluates the streams being zipped serially:
--
-- >>> s1 = Stream.fromFoldable [1, 2]
-- >>> s2 = Stream.fromFoldable [3, 4]
-- >>> s3 = Stream.fromFoldable [5, 6]
-- >>> Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3
-- [(1,3,5),(2,4,6)]
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
newtype ZipSerialM m a = ZipSerialM {forall (m :: * -> *) a. ZipSerialM m a -> Stream m a
getZipSerialM :: Stream m a}
        deriving (NonEmpty (ZipSerialM m a) -> ZipSerialM m a
ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
(ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a)
-> (NonEmpty (ZipSerialM m a) -> ZipSerialM m a)
-> (forall b. Integral b => b -> ZipSerialM m a -> ZipSerialM m a)
-> Semigroup (ZipSerialM m a)
forall b. Integral b => b -> ZipSerialM m a -> ZipSerialM m a
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
forall (m :: * -> *) a. NonEmpty (ZipSerialM m a) -> ZipSerialM m a
forall (m :: * -> *) a.
ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
forall (m :: * -> *) a b.
Integral b =>
b -> ZipSerialM m a -> ZipSerialM m a
stimes :: forall b. Integral b => b -> ZipSerialM m a -> ZipSerialM m a
$cstimes :: forall (m :: * -> *) a b.
Integral b =>
b -> ZipSerialM m a -> ZipSerialM m a
sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a
$csconcat :: forall (m :: * -> *) a. NonEmpty (ZipSerialM m a) -> ZipSerialM m a
<> :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
$c<> :: forall (m :: * -> *) a.
ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
Semigroup, Semigroup (ZipSerialM m a)
ZipSerialM m a
Semigroup (ZipSerialM m a)
-> ZipSerialM m a
-> (ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a)
-> ([ZipSerialM m a] -> ZipSerialM m a)
-> Monoid (ZipSerialM m a)
[ZipSerialM m a] -> ZipSerialM m a
ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
forall (m :: * -> *) a. Semigroup (ZipSerialM m a)
forall (m :: * -> *) a. ZipSerialM m a
forall (m :: * -> *) a. [ZipSerialM m a] -> ZipSerialM m a
forall (m :: * -> *) a.
ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
mconcat :: [ZipSerialM m a] -> ZipSerialM m a
$cmconcat :: forall (m :: * -> *) a. [ZipSerialM m a] -> ZipSerialM m a
mappend :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
$cmappend :: forall (m :: * -> *) a.
ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a
mempty :: ZipSerialM m a
$cmempty :: forall (m :: * -> *) a. ZipSerialM m a
Monoid)

-- |
-- @since 0.1.0
{-# DEPRECATED ZipStream "Please use 'ZipSerialM' instead." #-}
type ZipStream = ZipSerialM

-- | An IO stream whose applicative instance zips streams serially.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
type ZipSerial = ZipSerialM IO

consMZip :: Monad m => m a -> ZipSerialM m a -> ZipSerialM m a
consMZip :: forall (m :: * -> *) a.
Monad m =>
m a -> ZipSerialM m a -> ZipSerialM m a
consMZip m a
m (ZipSerialM Stream m a
r) = Stream m a -> ZipSerialM m a
forall (m :: * -> *) a. Stream m a -> ZipSerialM m a
ZipSerialM (Stream m a -> ZipSerialM m a) -> Stream m a -> ZipSerialM m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
K.consM m a
m Stream m a
r

LIST_INSTANCES(ZipSerialM)
NFDATA1_INSTANCE(ZipSerialM)

instance Monad m => Functor (ZipSerialM m) where
    {-# INLINE fmap #-}
    fmap :: forall a b. (a -> b) -> ZipSerialM m a -> ZipSerialM m b
fmap a -> b
f (ZipSerialM Stream m a
m) = Stream m b -> ZipSerialM m b
forall (m :: * -> *) a. Stream m a -> ZipSerialM m a
ZipSerialM (Stream m b -> ZipSerialM m b) -> Stream m b -> ZipSerialM m b
forall a b. (a -> b) -> a -> b
$ SerialT m b -> Stream m b
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SerialT m b -> Stream m b) -> SerialT m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ (a -> b) -> SerialT m a -> SerialT m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (Stream m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT Stream m a
m)

instance Monad m => Applicative (ZipSerialM m) where
    pure :: forall a. a -> ZipSerialM m a
pure = Stream m a -> ZipSerialM m a
forall (m :: * -> *) a. Stream m a -> ZipSerialM m a
ZipSerialM (Stream m a -> ZipSerialM m a)
-> (a -> Stream m a) -> a -> ZipSerialM m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SerialT m a -> Stream m a)
-> (a -> SerialT m a) -> a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> SerialT m a
forall (m :: * -> *) a. Monad m => a -> SerialT m a
Serial.repeat

    {-# INLINE (<*>) #-}
    ZipSerialM Stream m (a -> b)
m1 <*> :: forall a b.
ZipSerialM m (a -> b) -> ZipSerialM m a -> ZipSerialM m b
<*> ZipSerialM Stream m a
m2 = Stream m b -> ZipSerialM m b
forall (m :: * -> *) a. Stream m a -> ZipSerialM m a
ZipSerialM (Stream m b -> ZipSerialM m b) -> Stream m b -> ZipSerialM m b
forall a b. (a -> b) -> a -> b
$ ((a -> b) -> a -> b)
-> Stream m (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWithK (a -> b) -> a -> b
forall a. a -> a
id Stream m (a -> b)
m1 Stream m a
m2

FOLDABLE_INSTANCE(ZipSerialM)
TRAVERSABLE_INSTANCE(ZipSerialM)

-------------------------------------------------------------------------------
-- ZipConcurrent
-------------------------------------------------------------------------------

newtype ZipConcurrent m a = ZipConcurrent {forall (m :: * -> *) a. ZipConcurrent m a -> Stream m a
getZipConcurrent :: D.Stream m a}
      deriving ((forall a b. (a -> b) -> ZipConcurrent m a -> ZipConcurrent m b)
-> (forall a b. a -> ZipConcurrent m b -> ZipConcurrent m a)
-> Functor (ZipConcurrent m)
forall a b. a -> ZipConcurrent m b -> ZipConcurrent m a
forall a b. (a -> b) -> ZipConcurrent m a -> ZipConcurrent m b
forall (m :: * -> *) a b.
Monad m =>
a -> ZipConcurrent m b -> ZipConcurrent m a
forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> ZipConcurrent m a -> ZipConcurrent m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> ZipConcurrent m b -> ZipConcurrent m a
$c<$ :: forall (m :: * -> *) a b.
Monad m =>
a -> ZipConcurrent m b -> ZipConcurrent m a
fmap :: forall a b. (a -> b) -> ZipConcurrent m a -> ZipConcurrent m b
$cfmap :: forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> ZipConcurrent m a -> ZipConcurrent m b
Functor)

-- | An IO stream whose applicative instance zips streams concurrently. Note
-- that it uses the default concurrency options.
--
-- >>> s = ZipConcurrent $ D.fromList [1, 2, 3]
-- >>> x = (,,) <$> s <*> s <*> s
-- >>> D.fold Fold.toList (getZipConcurrent x)
-- [(1,1,1),(2,2,2),(3,3,3)]
--
-- @since 0.9.0

instance MonadAsync m => Applicative (ZipConcurrent m) where
    pure :: forall a. a -> ZipConcurrent m a
pure = Stream m a -> ZipConcurrent m a
forall (m :: * -> *) a. Stream m a -> ZipConcurrent m a
ZipConcurrent (Stream m a -> ZipConcurrent m a)
-> (a -> Stream m a) -> a -> ZipConcurrent m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (m :: * -> *) a. Monad m => a -> Stream m a
D.repeat

    {-# INLINE (<*>) #-}
    ZipConcurrent Stream m (a -> b)
m1 <*> :: forall a b.
ZipConcurrent m (a -> b) -> ZipConcurrent m a -> ZipConcurrent m b
<*> ZipConcurrent Stream m a
m2 =
        Stream m b -> ZipConcurrent m b
forall (m :: * -> *) a. Stream m a -> ZipConcurrent m a
ZipConcurrent (Stream m b -> ZipConcurrent m b)
-> Stream m b -> ZipConcurrent m b
forall a b. (a -> b) -> a -> b
$ (Config -> Config)
-> ((a -> b) -> a -> b)
-> Stream m (a -> b)
-> Stream m a
-> Stream m b
forall (m :: * -> *) a b c.
MonadAsync m =>
(Config -> Config)
-> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
parZipWith Config -> Config
forall a. a -> a
id (a -> b) -> a -> b
forall a. a -> a
id Stream m (a -> b)
m1 Stream m a
m2