{-# LANGUAGE CPP #-}
-- |
-- Module      : Streamly.Internal.Data.Stream.StreamD.Top
-- Copyright   : (c) 2020 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Top level module that can depend on all other lower level Stream modules.

module Streamly.Internal.Data.Stream.StreamD.Top
    (
    -- * Transformation
    -- ** Sampling
    -- | Value agnostic filtering.
      strideFromThen

    -- * Nesting
    -- ** Set like operations
    -- | These are not exactly set operations because streams are not
    -- necessarily sets, they may have duplicated elements. These operations
    -- are generic i.e. they work on streams of unconstrained types, therefore,
    -- they have quadratic performance characterstics. For better performance
    -- using Set structures see the Streamly.Internal.Data.Stream.Container
    -- module.
    , filterInStreamGenericBy
    , deleteInStreamGenericBy
    , unionWithStreamGenericBy

    -- ** Set like operations on sorted streams
    , filterInStreamAscBy
    , deleteInStreamAscBy
    , unionWithStreamAscBy

    -- ** Join operations
    , joinInnerGeneric

    -- * Joins on sorted stream
    , joinInnerAscBy
    , joinLeftAscBy
    , joinOuterAscBy
    )
where

#include "inline.hs"

import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (newIORef, readIORef, modifyIORef')
import Streamly.Internal.Data.Fold.Type (Fold)
import Streamly.Internal.Data.Stream.Common ()
import Streamly.Internal.Data.Stream.StreamD.Type (Stream, cross)

import qualified Data.List as List
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream.StreamD.Type as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Nesting as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Transform as Stream

import Prelude hiding (filter, zipWith, concatMap, concat)

#include "DocTestDataStream.hs"

------------------------------------------------------------------------------
-- Sampling
------------------------------------------------------------------------------

-- XXX We can implement this using addition instead of "mod" to make it more
-- efficient.

-- | @strideFromthen offset stride@ takes the element at @offset@ index and
-- then every element at strides of @stride@.
--
-- >>> Stream.fold Fold.toList $ Stream.strideFromThen 2 3 $ Stream.enumerateFromTo 0 10
-- [2,5,8]
--
{-# INLINE strideFromThen #-}
strideFromThen :: Monad m => Int -> Int -> Stream m a -> Stream m a
strideFromThen :: Int -> Int -> Stream m a -> Stream m a
strideFromThen Int
offset Int
stride =
    (Stream m a -> Stream m (Int, a))
-> (((Int, a) -> Bool) -> Stream m (Int, a) -> Stream m (Int, a))
-> ((Int, a) -> Bool)
-> Stream m a
-> Stream m a
forall (m :: * -> *) a s b.
Monad m =>
(Stream m a -> Stream m (s, a))
-> (((s, a) -> b) -> Stream m (s, a) -> Stream m (s, a))
-> ((s, a) -> b)
-> Stream m a
-> Stream m a
Stream.with Stream m a -> Stream m (Int, a)
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
Stream.indexed ((Int, a) -> Bool) -> Stream m (Int, a) -> Stream m (Int, a)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter
        (\(Int
i, a
_) -> Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
offset Bool -> Bool -> Bool
&& (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
stride Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0)

------------------------------------------------------------------------------
-- SQL Joins
------------------------------------------------------------------------------
--
-- Some references:
-- * https://en.wikipedia.org/wiki/Relational_algebra
-- * https://en.wikipedia.org/wiki/Join_(SQL)

-- TODO: OrdSet/IntSet/hashmap based versions of these. With Eq only
-- constraint, the best would be to use an Array with linear search. If the
-- second stream is sorted we can also use a binary search, using Ord
-- constraint or an ordering function.
--
-- For Storables we can cache the second stream into an unboxed array for
-- possibly faster access/compact representation?
--
-- If we do not want to keep the stream in memory but always read it from the
-- source (disk/network) every time we iterate through it then we can do that
-- too by reading the stream every time, the stream must have immutable state
-- in that case and the user is responsible for the behavior if the stream
-- source changes during iterations. We can also use an Unfold instead of
-- stream. We probably need a way to distinguish streams that can be read
-- mutliple times without any interference (e.g. unfolding a stream using an
-- immutable handle would work i.e. using pread/pwrite instead of maintaining
-- an offset in the handle).

-- XXX We can do this concurrently.
-- XXX If the second stream is sorted and passed as an Array we could use
-- binary search if we have an Ord instance or Ordering returning function. The
-- time complexity would then become (m x log n).

-- | Like 'cross' but emits only those tuples where @a == b@ using the
-- supplied equality predicate.
--
-- Definition:
--
-- >>> joinInnerGeneric eq s1 s2 = Stream.filter (\(a, b) -> a `eq` b) $ Stream.cross s1 s2
--
-- You should almost always prefer @joinInnerOrd@ over 'joinInnerGeneric' if
-- possible. @joinInnerOrd@ is an order of magnitude faster but may take more
-- space for caching the second stream.
--
-- See 'Streamly.Internal.Data.Unfold.joinInnerGeneric' for a much faster fused
-- alternative.
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE joinInnerGeneric #-}
joinInnerGeneric :: Monad m =>
    (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b)
joinInnerGeneric :: (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b)
joinInnerGeneric a -> b -> Bool
eq Stream m a
s1 Stream m b
s2 = ((a, b) -> Bool) -> Stream m (a, b) -> Stream m (a, b)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter (\(a
a, b
b) -> a
a a -> b -> Bool
`eq` b
b) (Stream m (a, b) -> Stream m (a, b))
-> Stream m (a, b) -> Stream m (a, b)
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m b -> Stream m (a, b)
forall (m :: * -> *) a b.
Monad m =>
Stream m a -> Stream m b -> Stream m (a, b)
cross Stream m a
s1 Stream m b
s2
{-
joinInnerGeneric eq s1 s2 = do
    -- ConcatMap works faster than bind
    Stream.concatMap (\a ->
        Stream.concatMap (\b ->
            if a `eq` b
            then Stream.fromPure (a, b)
            else Stream.nil
            ) s2
        ) s1
-}

-- | A more efficient 'joinInner' for sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE joinInnerAscBy #-}
joinInnerAscBy ::
    (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
joinInnerAscBy :: (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
joinInnerAscBy = (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
forall a. HasCallStack => a
undefined

-- | A more efficient 'joinLeft' for sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE joinLeftAscBy #-}
joinLeftAscBy :: -- Monad m =>
    (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftAscBy :: (a -> b -> Ordering)
-> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftAscBy a -> b -> Ordering
_eq Stream m a
_s1 Stream m b
_s2 = Stream m (a, Maybe b)
forall a. HasCallStack => a
undefined

-- | A more efficient 'joinOuter' for sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE joinOuterAscBy #-}
joinOuterAscBy :: -- Monad m =>
       (a -> b -> Ordering)
    -> Stream m a
    -> Stream m b
    -> Stream m (Maybe a, Maybe b)
joinOuterAscBy :: (a -> b -> Ordering)
-> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b)
joinOuterAscBy a -> b -> Ordering
_eq Stream m a
_s1 Stream m b
_s2 = Stream m (Maybe a, Maybe b)
forall a. HasCallStack => a
undefined

------------------------------------------------------------------------------
-- Set operations (special joins)
------------------------------------------------------------------------------
--
-- TODO: OrdSet/IntSet/hashmap based versions of these. With Eq only constraint
-- the best would be to use an Array with linear search. If the second stream
-- is sorted we can also use a binary search, using Ord constraint.

-- | Keep only those elements in the second stream that are present in the
-- first stream too. The first stream is folded to a container using the
-- supplied fold and then the elements in the container are looked up using the
-- supplied lookup function.
--
-- The first stream must be finite and must not block.
{-# INLINE filterStreamWith #-}
filterStreamWith :: Monad m =>
       Fold m a (f a)
    -> (a -> f a -> Bool)
    -> Stream m a
    -> Stream m a
    -> Stream m a
filterStreamWith :: Fold m a (f a)
-> (a -> f a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterStreamWith Fold m a (f a)
fld a -> f a -> Bool
member Stream m a
s1 Stream m a
s2 =
    m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
        (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
            f a
xs <- Fold m a (f a) -> Stream m a -> m (f a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a (f a)
fld Stream m a
s1
            Stream m a -> m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter (a -> f a -> Bool
`member` f a
xs) Stream m a
s2

-- | 'filterInStreamGenericBy' retains only those elements in the second stream that
-- are present in the first stream.
--
-- >>> Stream.fold Fold.toList $ Stream.filterInStreamGenericBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
-- [2,1,1]
--
-- >>> Stream.fold Fold.toList $ Stream.filterInStreamGenericBy (==) (Stream.fromList [2,1,1,3]) (Stream.fromList [1,2,2,4])
-- [1,2,2]
--
-- Similar to the list intersectBy operation but with the stream argument order
-- flipped.
--
-- The first stream must be finite and must not block. Second stream is
-- processed only after the first stream is fully realized.
--
-- Space: O(n) where @n@ is the number of elements in the second stream.
--
-- Time: O(m x n) where @m@ is the number of elements in the first stream and
-- @n@ is the number of elements in the second stream.
--
-- /Pre-release/
{-# INLINE filterInStreamGenericBy #-}
filterInStreamGenericBy :: Monad m =>
    (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterInStreamGenericBy :: (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterInStreamGenericBy a -> a -> Bool
eq =
    -- XXX Use an (unboxed) array instead.
    Fold m a [a]
-> (a -> [a] -> Bool) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a (f :: * -> *).
Monad m =>
Fold m a (f a)
-> (a -> f a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterStreamWith
        (Fold m a (Maybe a) -> Fold m a [a] -> Fold m a [a]
forall (m :: * -> *) a b c.
Monad m =>
Fold m a (Maybe b) -> Fold m b c -> Fold m a c
Fold.scanMaybe ((a -> a -> Bool) -> Fold m a (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> Fold m a (Maybe a)
Fold.uniqBy a -> a -> Bool
eq) Fold m a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
Fold.toListRev)
        ((a -> Bool) -> [a] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
List.any ((a -> Bool) -> [a] -> Bool)
-> (a -> a -> Bool) -> a -> [a] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Bool
eq)

-- | Like 'filterInStreamGenericBy' but assumes that the input streams are sorted in
-- ascending order. To use it on streams sorted in descending order pass an
-- inverted comparison function returning GT for less than and LT for greater
-- than.
--
-- Space: O(1)
--
-- Time: O(m+n)
--
-- /Pre-release/
{-# INLINE filterInStreamAscBy #-}
filterInStreamAscBy :: Monad m =>
    (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
filterInStreamAscBy :: (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
filterInStreamAscBy a -> a -> Ordering
eq Stream m a
s1 Stream m a
s2 = (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
Stream.intersectBySorted a -> a -> Ordering
eq Stream m a
s2 Stream m a
s1

-- | Delete all elements of the first stream from the seconds stream. If an
-- element occurs multiple times in the first stream as many occurrences of it
-- are deleted from the second stream.
--
-- >>> Stream.fold Fold.toList $ Stream.deleteInStreamGenericBy (==) (Stream.fromList [1,2,3]) (Stream.fromList [1,2,2])
-- [2]
--
-- The following laws hold:
--
-- > deleteInStreamGenericBy (==) s1 (s1 `append` s2) === s2
-- > deleteInStreamGenericBy (==) s1 (s1 `interleave` s2) === s2
--
-- Same as the list 'Data.List.//' operation but with argument order flipped.
--
-- The first stream must be finite and must not block. Second stream is
-- processed only after the first stream is fully realized.
--
-- Space: O(m) where @m@ is the number of elements in the first stream.
--
-- Time: O(m x n) where @m@ is the number of elements in the first stream and
-- @n@ is the number of elements in the second stream.
--
-- /Pre-release/
{-# INLINE deleteInStreamGenericBy #-}
deleteInStreamGenericBy :: Monad m =>
    (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
deleteInStreamGenericBy :: (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
deleteInStreamGenericBy a -> a -> Bool
eq Stream m a
s1 Stream m a
s2 =
    m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
        (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
            -- This may work well if s1 is small
            -- If s1 is big we can go through s1, deleting elements from s2 and
            -- not emitting an element if it was successfully deleted from s2.
            -- we will need a deleteBy that can return whether the element was
            -- deleted or not.
            [a]
xs <- Fold m a [a] -> Stream m a -> m [a]
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
Fold.toList Stream m a
s2
            let f :: Fold m a [a]
f = ([a] -> a -> [a]) -> [a] -> Fold m a [a]
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' ((a -> [a] -> [a]) -> [a] -> a -> [a]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq)) [a]
xs
            ([a] -> Stream m a) -> m [a] -> m (Stream m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [a] -> Stream m a
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList (m [a] -> m (Stream m a)) -> m [a] -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Fold m a [a] -> Stream m a -> m [a]
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a [a]
f Stream m a
s1

-- | A more efficient 'deleteInStreamGenericBy' for streams sorted in ascending order.
--
-- Space: O(1)
--
-- /Unimplemented/
{-# INLINE deleteInStreamAscBy #-}
deleteInStreamAscBy :: -- (Monad m) =>
    (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
deleteInStreamAscBy :: (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
deleteInStreamAscBy a -> a -> Ordering
_eq Stream m a
_s1 Stream m a
_s2 = Stream m a
forall a. HasCallStack => a
undefined

-- XXX Remove the MonadIO constraint. We can just cache one stream and then
-- implement using differenceEqBy.

-- | This essentially appends to the second stream all the occurrences of
-- elements in the first stream that are not already present in the second
-- stream.
--
-- Equivalent to the following except that @s2@ is evaluated only once:
--
-- >>> unionWithStreamGenericBy eq s1 s2 = s2 `Stream.append` (Stream.deleteInStreamGenericBy eq s2 s1)
--
-- Example:
--
-- >>> Stream.fold Fold.toList $ Stream.unionWithStreamGenericBy (==) (Stream.fromList [1,1,2,3]) (Stream.fromList [1,2,2,4])
-- [1,2,2,4,3]
--
-- Space: O(n)
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE unionWithStreamGenericBy #-}
unionWithStreamGenericBy :: MonadIO m =>
    (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
unionWithStreamGenericBy :: (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
unionWithStreamGenericBy a -> a -> Bool
eq Stream m a
s1 Stream m a
s2 =
    m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
        (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
            [a]
xs <- Fold m a [a] -> Stream m a -> m [a]
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
Fold.toList  Stream m a
s1
            -- XXX we can use postscanlMAfter' instead of IORef
            IORef [a]
ref <- IO (IORef [a]) -> m (IORef [a])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [a]) -> m (IORef [a]))
-> IO (IORef [a]) -> m (IORef [a])
forall a b. (a -> b) -> a -> b
$ [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef ([a] -> IO (IORef [a])) -> [a] -> IO (IORef [a])
forall a b. (a -> b) -> a -> b
$! (a -> a -> Bool) -> [a] -> [a]
forall a. (a -> a -> Bool) -> [a] -> [a]
List.nubBy a -> a -> Bool
eq [a]
xs
            let f :: a -> m a
f a
x = do
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [a]
ref ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq a
x)
                    a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
                s3 :: Stream m a
s3 = m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
                        (m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
                            [a]
xs1 <- IO [a] -> m [a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [a] -> m [a]) -> IO [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ IORef [a] -> IO [a]
forall a. IORef a -> IO a
readIORef IORef [a]
ref
                            Stream m a -> m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ [a] -> Stream m a
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList [a]
xs1
            Stream m a -> m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ (a -> m a) -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
Stream.mapM a -> m a
forall (m :: * -> *). MonadIO m => a -> m a
f Stream m a
s2 Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` Stream m a
s3

-- | A more efficient 'unionWithStreamGenericBy' for sorted streams.
--
-- Space: O(1)
--
-- /Unimplemented/
{-# INLINE unionWithStreamAscBy #-}
unionWithStreamAscBy :: -- (Monad m) =>
    (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
unionWithStreamAscBy :: (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
unionWithStreamAscBy a -> a -> Ordering
_eq Stream m a
_s1 Stream m a
_s2 = Stream m a
forall a. HasCallStack => a
undefined