{-# LANGUAGE CPP #-}
-- |
-- Module      : Streamly.Internal.Data.Stream.StreamD.Container
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Stream operations that require transformers or containers like Set or Map.

module Streamly.Internal.Data.Stream.StreamD.Container
    (
      nub

    -- * Joins for unconstrained types
    , joinLeftGeneric
    , joinOuterGeneric

    -- * Joins with Ord constraint
    , joinInner
    , joinLeft
    , joinOuter
    )
where

#include "inline.hs"

import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Trans.State.Strict (get, put)
import Data.Function ((&))
import Data.Maybe (isJust)
import Streamly.Internal.Data.Stream.StreamD.Step (Step(..))
import Streamly.Internal.Data.Stream.StreamD.Type
    (Stream(..), mkCross, unCross)

import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Internal.Data.Array.Generic as Array
import qualified Streamly.Internal.Data.Array.Mut.Type as MA
import qualified Streamly.Internal.Data.Stream.StreamD.Type as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Nesting as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Generate as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Transform as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Transformer as Stream

#include "DocTestDataStream.hs"

-- | The memory used is proportional to the number of unique elements in the
-- stream. If we want to limit the memory we can just use "take" to limit the
-- uniq elements in the stream.
{-# INLINE_NORMAL nub #-}
nub :: (Monad m, Ord a) => Stream m a -> Stream m a
nub :: Stream m a -> Stream m a
nub (Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = (State StreamK m a -> (Set a, s) -> m (Step (Set a, s) a))
-> (Set a, s) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> (Set a, s) -> m (Step (Set a, s) a)
step (Set a
forall a. Set a
Set.empty, s
state1)

    where

    step :: State StreamK m a -> (Set a, s) -> m (Step (Set a, s) a)
step State StreamK m a
gst (Set a
set, s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        Step (Set a, s) a -> m (Step (Set a, s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (Set a, s) a -> m (Step (Set a, s) a))
-> Step (Set a, s) a -> m (Step (Set a, s) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
                Yield a
x s
s ->
                    if a -> Set a -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member a
x Set a
set
                    then (Set a, s) -> Step (Set a, s) a
forall s a. s -> Step s a
Skip (Set a
set, s
s)
                    else a -> (Set a, s) -> Step (Set a, s) a
forall s a. a -> s -> Step s a
Yield a
x (a -> Set a -> Set a
forall a. Ord a => a -> Set a -> Set a
Set.insert a
x Set a
set, s
s)
                Skip s
s -> (Set a, s) -> Step (Set a, s) a
forall s a. s -> Step s a
Skip (Set a
set, s
s)
                Step s a
Stop -> Step (Set a, s) a
forall s a. Step s a
Stop

-- XXX Generate error if a duplicate insertion is attempted?
toMap ::  (Monad m, Ord k) => Stream m (k, v) -> m (Map.Map k v)
toMap :: Stream m (k, v) -> m (Map k v)
toMap =
    let f :: Fold m (k, a) (Map k a)
f = (Map k a -> (k, a) -> Map k a)
-> Map k a -> Fold m (k, a) (Map k a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' (\Map k a
kv (k
k, a
b) -> k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) Map k a
forall k a. Map k a
Map.empty
     in Fold m (k, v) (Map k v) -> Stream m (k, v) -> m (Map k v)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m (k, v) (Map k v)
forall a. Fold m (k, a) (Map k a)
f

-- If the second stream is too big it can be partitioned based on hashes and
-- then we can process one parition at a time.
--
-- XXX An IntMap may be faster when the keys are Int.
-- XXX Use hashmap instead of map?
--
-- | Like 'joinInner' but uses a 'Map' for efficiency.
--
-- If the input streams have duplicate keys, the behavior is undefined.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- Space: O(n)
--
-- Time: O(m + n)
--
-- /Pre-release/
{-# INLINE joinInner #-}
joinInner :: (Monad m, Ord k) =>
    Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, b)
joinInner :: Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, b)
joinInner Stream m (k, a)
s1 Stream m (k, b)
s2 =
    m (Stream m (k, a, b)) -> Stream m (k, a, b)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (m (Stream m (k, a, b)) -> Stream m (k, a, b))
-> m (Stream m (k, a, b)) -> Stream m (k, a, b)
forall a b. (a -> b) -> a -> b
$ do
        Map k b
km <- Stream m (k, b) -> m (Map k b)
forall (m :: * -> *) k v.
(Monad m, Ord k) =>
Stream m (k, v) -> m (Map k v)
toMap Stream m (k, b)
s2
        Stream m (k, a, b) -> m (Stream m (k, a, b))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream m (k, a, b) -> m (Stream m (k, a, b)))
-> Stream m (k, a, b) -> m (Stream m (k, a, b))
forall a b. (a -> b) -> a -> b
$ ((k, a) -> Maybe (k, a, b))
-> Stream m (k, a) -> Stream m (k, a, b)
forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
Stream.mapMaybe (Map k b -> (k, a) -> Maybe (k, a, b)
forall a c b. Ord a => Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map k b
km) Stream m (k, a)
s1

    where

    joinAB :: Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map a c
kvm (a
k, b
a) =
        case a
k a -> Map a c -> Maybe c
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a c
kvm of
            Just c
b -> (a, b, c) -> Maybe (a, b, c)
forall a. a -> Maybe a
Just (a
k, b
a, c
b)
            Maybe c
Nothing -> Maybe (a, b, c)
forall a. Maybe a
Nothing

-- XXX We can do this concurrently.
-- XXX If the second stream is sorted and passed as an Array or a seek capable
-- stream then we could use binary search if we have an Ord instance or
-- Ordering returning function. The time complexity would then become (m x log
-- n).

-- XXX Check performance of StreamD vs StreamK

-- | Like 'joinInner' but emit @(a, Just b)@, and additionally, for those @a@'s
-- that are not equal to any @b@ emit @(a, Nothing)@.
--
-- The second stream is evaluated multiple times. If the stream is a
-- consume-once stream then the caller should cache it in an 'Data.Array.Array'
-- before calling this function. Caching may also improve performance if the
-- stream is expensive to evaluate.
--
-- >>> joinRightGeneric eq = flip (Stream.joinLeftGeneric eq)
--
-- Space: O(n) assuming the second stream is cached in memory.
--
-- Time: O(m x n)
--
-- /Unimplemented/
{-# INLINE joinLeftGeneric #-}
joinLeftGeneric :: Monad m =>
    (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftGeneric :: (a -> b -> Bool)
-> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftGeneric a -> b -> Bool
eq Stream m a
s1 Stream m b
s2 = m Bool
-> Stream (StateT Bool m) (a, Maybe b) -> Stream m (a, Maybe b)
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m a
Stream.evalStateT (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) (Stream (StateT Bool m) (a, Maybe b) -> Stream m (a, Maybe b))
-> Stream (StateT Bool m) (a, Maybe b) -> Stream m (a, Maybe b)
forall a b. (a -> b) -> a -> b
$ CrossStream (StateT Bool m) (a, Maybe b)
-> Stream (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. CrossStream m a -> Stream m a
unCross (CrossStream (StateT Bool m) (a, Maybe b)
 -> Stream (StateT Bool m) (a, Maybe b))
-> CrossStream (StateT Bool m) (a, Maybe b)
-> Stream (StateT Bool m) (a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
    a
a <- Stream (StateT Bool m) a -> CrossStream (StateT Bool m) a
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (Stream m a -> Stream (StateT Bool m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m a
s1)
    -- XXX should we use StreamD monad here?
    -- XXX Is there a better way to perform some action at the end of a loop
    -- iteration?
    Stream (StateT Bool m) () -> CrossStream (StateT Bool m) ()
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (StateT Bool m () -> Stream (StateT Bool m) ()
forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect (StateT Bool m () -> Stream (StateT Bool m) ())
-> StateT Bool m () -> Stream (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False)
    let final :: Stream (StateT Bool m) (Maybe a)
final = StateT Bool m (Stream (StateT Bool m) (Maybe a))
-> Stream (StateT Bool m) (Maybe a)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (StateT Bool m (Stream (StateT Bool m) (Maybe a))
 -> Stream (StateT Bool m) (Maybe a))
-> StateT Bool m (Stream (StateT Bool m) (Maybe a))
-> Stream (StateT Bool m) (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
            Bool
r <- StateT Bool m Bool
forall (m :: * -> *) s. Monad m => StateT s m s
get
            if Bool
r
            then Stream (StateT Bool m) (Maybe a)
-> StateT Bool m (Stream (StateT Bool m) (Maybe a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Stream (StateT Bool m) (Maybe a)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
            else Stream (StateT Bool m) (Maybe a)
-> StateT Bool m (Stream (StateT Bool m) (Maybe a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> Stream (StateT Bool m) (Maybe a)
forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure Maybe a
forall a. Maybe a
Nothing)
    Maybe b
b <- Stream (StateT Bool m) (Maybe b)
-> CrossStream (StateT Bool m) (Maybe b)
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross ((b -> Maybe b)
-> Stream (StateT Bool m) b -> Stream (StateT Bool m) (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap b -> Maybe b
forall a. a -> Maybe a
Just (Stream m b -> Stream (StateT Bool m) b
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m b
s2) Stream (StateT Bool m) (Maybe b)
-> Stream (StateT Bool m) (Maybe b)
-> Stream (StateT Bool m) (Maybe b)
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` Stream (StateT Bool m) (Maybe b)
forall a. Stream (StateT Bool m) (Maybe a)
final)
    case Maybe b
b of
        Just b
b1 ->
            if a
a a -> b -> Bool
`eq` b
b1
            then do
                Stream (StateT Bool m) () -> CrossStream (StateT Bool m) ()
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (StateT Bool m () -> Stream (StateT Bool m) ()
forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect (StateT Bool m () -> Stream (StateT Bool m) ())
-> StateT Bool m () -> Stream (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True)
                (a, Maybe b) -> CrossStream (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b -> Maybe b
forall a. a -> Maybe a
Just b
b1)
            else Stream (StateT Bool m) (a, Maybe b)
-> CrossStream (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross Stream (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
        Maybe b
Nothing -> (a, Maybe b) -> CrossStream (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Maybe b
forall a. Maybe a
Nothing)

-- XXX rename to joinLeftOrd?

-- | A more efficient 'joinLeft' using a hashmap for efficiency.
--
-- Space: O(n)
--
-- Time: O(m + n)
--
-- /Pre-release/
{-# INLINE joinLeft #-}
joinLeft :: (Ord k, Monad m) =>
    Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, Maybe b)
joinLeft :: Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, Maybe b)
joinLeft Stream m (k, a)
s1 Stream m (k, b)
s2 =
    m (Stream m (k, a, Maybe b)) -> Stream m (k, a, Maybe b)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (m (Stream m (k, a, Maybe b)) -> Stream m (k, a, Maybe b))
-> m (Stream m (k, a, Maybe b)) -> Stream m (k, a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
        Map k b
km <- Stream m (k, b) -> m (Map k b)
forall (m :: * -> *) k v.
(Monad m, Ord k) =>
Stream m (k, v) -> m (Map k v)
toMap Stream m (k, b)
s2
        Stream m (k, a, Maybe b) -> m (Stream m (k, a, Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m (k, a, Maybe b) -> m (Stream m (k, a, Maybe b)))
-> Stream m (k, a, Maybe b) -> m (Stream m (k, a, Maybe b))
forall a b. (a -> b) -> a -> b
$ ((k, a) -> (k, a, Maybe b))
-> Stream m (k, a) -> Stream m (k, a, Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Map k b -> (k, a) -> (k, a, Maybe b)
forall a a b. Ord a => Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map k b
km) Stream m (k, a)
s1

            where

            joinAB :: Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map a a
km (a
k, b
a) =
                case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
                    Just a
b -> (a
k, b
a, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
                    Maybe a
Nothing -> (a
k, b
a, Maybe a
forall a. Maybe a
Nothing)

-- XXX We can do this concurrently.

-- XXX Check performance of StreamD vs StreamK

-- | Like 'joinLeft' but emits a @(Just a, Just b)@. Like 'joinLeft', for those
-- @a@'s that are not equal to any @b@ emit @(Just a, Nothing)@, but
-- additionally, for those @b@'s that are not equal to any @a@ emit @(Nothing,
-- Just b)@.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- Space: O(n)
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE joinOuterGeneric #-}
joinOuterGeneric :: MonadIO m =>
       (a -> b -> Bool)
    -> Stream m a
    -> Stream m b
    -> Stream m (Maybe a, Maybe b)
joinOuterGeneric :: (a -> b -> Bool)
-> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b)
joinOuterGeneric a -> b -> Bool
eq Stream m a
s1 Stream m b
s =
    m (Stream m (Maybe a, Maybe b)) -> Stream m (Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (m (Stream m (Maybe a, Maybe b)) -> Stream m (Maybe a, Maybe b))
-> m (Stream m (Maybe a, Maybe b)) -> Stream m (Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
        Array b
inputArr <- Stream m b -> m (Array b)
forall (m :: * -> *) a. MonadIO m => Stream m a -> m (Array a)
Array.fromStream Stream m b
s
        let len :: Int
len = Array b -> Int
forall a. Array a -> Int
Array.length Array b
inputArr
        MutArray Bool
foundArr <-
            Fold m Bool (MutArray Bool) -> Stream m Bool -> m (MutArray Bool)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold
            (Int -> Fold m Bool (MutArray Bool)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m a (MutArray a)
MA.writeN Int
len)
            ([Bool] -> Stream m Bool
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList (Int -> Bool -> [Bool]
forall a. Int -> a -> [a]
Prelude.replicate Int
len Bool
False))
        Stream m (Maybe a, Maybe b) -> m (Stream m (Maybe a, Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m (Maybe a, Maybe b) -> m (Stream m (Maybe a, Maybe b)))
-> Stream m (Maybe a, Maybe b) -> m (Stream m (Maybe a, Maybe b))
forall a b. (a -> b) -> a -> b
$ Array b -> MutArray Bool -> Stream m (Maybe a, Maybe b)
go Array b
inputArr MutArray Bool
foundArr Stream m (Maybe a, Maybe b)
-> Stream m (Maybe a, Maybe b) -> Stream m (Maybe a, Maybe b)
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` Array b -> MutArray Bool -> Stream m (Maybe a, Maybe b)
forall (m :: * -> *) a a.
MonadIO m =>
Array a -> MutArray Bool -> Stream m (Maybe a, Maybe a)
leftOver Array b
inputArr MutArray Bool
foundArr

    where

    leftOver :: Array a -> MutArray Bool -> Stream m (Maybe a, Maybe a)
leftOver Array a
inputArr MutArray Bool
foundArr =
            let stream1 :: Stream m a
stream1 = Array a -> Stream m a
forall (m :: * -> *) a. Monad m => Array a -> Stream m a
Array.read Array a
inputArr
                stream2 :: Stream m Bool
stream2 = Unfold m (MutArray Bool) Bool -> MutArray Bool -> Stream m Bool
forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
Stream.unfold Unfold m (MutArray Bool) Bool
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Unfold m (MutArray a) a
MA.reader MutArray Bool
foundArr
            in (Maybe (Maybe a, Maybe a) -> Bool)
-> Stream m (Maybe (Maybe a, Maybe a))
-> Stream m (Maybe (Maybe a, Maybe a))
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter
                    Maybe (Maybe a, Maybe a) -> Bool
forall a. Maybe a -> Bool
isJust
                    ( (a -> Bool -> Maybe (Maybe a, Maybe a))
-> Stream m a
-> Stream m Bool
-> Stream m (Maybe (Maybe a, Maybe a))
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
Stream.zipWith (\a
x Bool
y ->
                        if Bool
y
                        then Maybe (Maybe a, Maybe a)
forall a. Maybe a
Nothing
                        else (Maybe a, Maybe a) -> Maybe (Maybe a, Maybe a)
forall a. a -> Maybe a
Just (Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
x)
                        ) Stream m a
stream1 Stream m Bool
stream2
                    ) Stream m (Maybe (Maybe a, Maybe a))
-> (Stream m (Maybe (Maybe a, Maybe a))
    -> Stream m (Maybe a, Maybe a))
-> Stream m (Maybe a, Maybe a)
forall a b. a -> (a -> b) -> b
& Stream m (Maybe (Maybe a, Maybe a)) -> Stream m (Maybe a, Maybe a)
forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes

    evalState :: CrossStream (StateT Bool m) a -> Stream m a
evalState = m Bool -> Stream (StateT Bool m) a -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m a
Stream.evalStateT (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) (Stream (StateT Bool m) a -> Stream m a)
-> (CrossStream (StateT Bool m) a -> Stream (StateT Bool m) a)
-> CrossStream (StateT Bool m) a
-> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CrossStream (StateT Bool m) a -> Stream (StateT Bool m) a
forall (m :: * -> *) a. CrossStream m a -> Stream m a
unCross

    go :: Array b -> MutArray Bool -> Stream m (Maybe a, Maybe b)
go Array b
inputArr MutArray Bool
foundArr = CrossStream (StateT Bool m) (Maybe a, Maybe b)
-> Stream m (Maybe a, Maybe b)
forall a. CrossStream (StateT Bool m) a -> Stream m a
evalState (CrossStream (StateT Bool m) (Maybe a, Maybe b)
 -> Stream m (Maybe a, Maybe b))
-> CrossStream (StateT Bool m) (Maybe a, Maybe b)
-> Stream m (Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
        a
a <- Stream (StateT Bool m) a -> CrossStream (StateT Bool m) a
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (Stream m a -> Stream (StateT Bool m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m a
s1)
        -- XXX should we use StreamD monad here?
        -- XXX Is there a better way to perform some action at the end of a loop
        -- iteration?
        Stream (StateT Bool m) () -> CrossStream (StateT Bool m) ()
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (StateT Bool m () -> Stream (StateT Bool m) ()
forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect (StateT Bool m () -> Stream (StateT Bool m) ())
-> StateT Bool m () -> Stream (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False)
        let final :: Stream (StateT Bool m) (Maybe a)
final = StateT Bool m (Stream (StateT Bool m) (Maybe a))
-> Stream (StateT Bool m) (Maybe a)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (StateT Bool m (Stream (StateT Bool m) (Maybe a))
 -> Stream (StateT Bool m) (Maybe a))
-> StateT Bool m (Stream (StateT Bool m) (Maybe a))
-> Stream (StateT Bool m) (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
                Bool
r <- StateT Bool m Bool
forall (m :: * -> *) s. Monad m => StateT s m s
get
                if Bool
r
                then Stream (StateT Bool m) (Maybe a)
-> StateT Bool m (Stream (StateT Bool m) (Maybe a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Stream (StateT Bool m) (Maybe a)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
                else Stream (StateT Bool m) (Maybe a)
-> StateT Bool m (Stream (StateT Bool m) (Maybe a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> Stream (StateT Bool m) (Maybe a)
forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure Maybe a
forall a. Maybe a
Nothing)
        (Int
i, Maybe b
b) <-
            let stream :: Stream m b
stream = Array b -> Stream m b
forall (m :: * -> *) a. Monad m => Array a -> Stream m a
Array.read Array b
inputArr
             in Stream (StateT Bool m) (Int, Maybe b)
-> CrossStream (StateT Bool m) (Int, Maybe b)
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross
                (Stream (StateT Bool m) (Maybe b)
-> Stream (StateT Bool m) (Int, Maybe b)
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
Stream.indexed (Stream (StateT Bool m) (Maybe b)
 -> Stream (StateT Bool m) (Int, Maybe b))
-> Stream (StateT Bool m) (Maybe b)
-> Stream (StateT Bool m) (Int, Maybe b)
forall a b. (a -> b) -> a -> b
$ (b -> Maybe b)
-> Stream (StateT Bool m) b -> Stream (StateT Bool m) (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap b -> Maybe b
forall a. a -> Maybe a
Just (Stream m b -> Stream (StateT Bool m) b
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m b
stream) Stream (StateT Bool m) (Maybe b)
-> Stream (StateT Bool m) (Maybe b)
-> Stream (StateT Bool m) (Maybe b)
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` Stream (StateT Bool m) (Maybe b)
forall a. Stream (StateT Bool m) (Maybe a)
final)

        case Maybe b
b of
            Just b
b1 ->
                if a
a a -> b -> Bool
`eq` b
b1
                then do
                    Stream (StateT Bool m) () -> CrossStream (StateT Bool m) ()
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (StateT Bool m () -> Stream (StateT Bool m) ()
forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect (StateT Bool m () -> Stream (StateT Bool m) ())
-> StateT Bool m () -> Stream (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True)
                    Int -> MutArray Bool -> Bool -> CrossStream (StateT Bool m) ()
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> MutArray a -> a -> m ()
MA.putIndex Int
i MutArray Bool
foundArr Bool
True
                    (Maybe a, Maybe b)
-> CrossStream (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a, b -> Maybe b
forall a. a -> Maybe a
Just b
b1)
                else Stream (StateT Bool m) (Maybe a, Maybe b)
-> CrossStream (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross Stream (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
            Maybe b
Nothing -> (Maybe a, Maybe b)
-> CrossStream (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe b
forall a. Maybe a
Nothing)

-- Put the b's that have been paired, in another hash or mutate the hash to set
-- a flag. At the end go through @Stream m b@ and find those that are not in that
-- hash to return (Nothing, b).

-- | Like 'joinOuter' but uses a 'Map' for efficiency.
--
-- Space: O(m + n)
--
-- Time: O(m + n)
--
-- /Pre-release/
{-# INLINE joinOuter #-}
joinOuter ::
    (Ord k, MonadIO m) =>
    Stream m (k, a) -> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
joinOuter :: Stream m (k, a)
-> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
joinOuter Stream m (k, a)
s1 Stream m (k, b)
s2 =
    m (Stream m (k, Maybe a, Maybe b))
-> Stream m (k, Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (m (Stream m (k, Maybe a, Maybe b))
 -> Stream m (k, Maybe a, Maybe b))
-> m (Stream m (k, Maybe a, Maybe b))
-> Stream m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
        Map k a
km1 <- Stream m (k, a) -> m (Map k a)
forall a. Stream m (k, a) -> m (Map k a)
kvFold Stream m (k, a)
s1
        Map k b
km2 <- Stream m (k, b) -> m (Map k b)
forall a. Stream m (k, a) -> m (Map k a)
kvFold Stream m (k, b)
s2

        -- XXX Not sure if toList/fromList would fuse optimally. We may have to
        -- create a fused Map.toStream function.
        let res1 :: Stream m (k, Maybe a, Maybe b)
res1 = ((k, a) -> (k, Maybe a, Maybe b))
-> Stream m (k, a) -> Stream m (k, Maybe a, Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Map k b -> (k, a) -> (k, Maybe a, Maybe b)
forall a a a. Ord a => Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map k b
km2)
                        (Stream m (k, a) -> Stream m (k, Maybe a, Maybe b))
-> Stream m (k, a) -> Stream m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ [(k, a)] -> Stream m (k, a)
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList ([(k, a)] -> Stream m (k, a)) -> [(k, a)] -> Stream m (k, a)
forall a b. (a -> b) -> a -> b
$ Map k a -> [(k, a)]
forall k a. Map k a -> [(k, a)]
Map.toList Map k a
km1
                    where
                    joinAB :: Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
a) =
                        case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
                            Just a
b -> (a
k, a -> Maybe a
forall a. a -> Maybe a
Just a
a, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
                            Maybe a
Nothing -> (a
k, a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe a
forall a. Maybe a
Nothing)

        -- XXX We can take advantage of the lookups in the first pass above to
        -- reduce the number of lookups in this pass. If we keep mutable cells
        -- in the second Map, we can flag it in the first pass and not do any
        -- lookup in the second pass if it is flagged.
        let res2 :: Stream m (k, Maybe a, Maybe b)
res2 = ((k, b) -> Maybe (k, Maybe a, Maybe b))
-> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
Stream.mapMaybe (Map k a -> (k, b) -> Maybe (k, Maybe a, Maybe b)
forall a a a a.
Ord a =>
Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map k a
km1)
                        (Stream m (k, b) -> Stream m (k, Maybe a, Maybe b))
-> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ [(k, b)] -> Stream m (k, b)
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList ([(k, b)] -> Stream m (k, b)) -> [(k, b)] -> Stream m (k, b)
forall a b. (a -> b) -> a -> b
$ Map k b -> [(k, b)]
forall k a. Map k a -> [(k, a)]
Map.toList Map k b
km2
                    where
                    joinAB :: Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
b) =
                        case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
                            Just a
_ -> Maybe (a, Maybe a, Maybe a)
forall a. Maybe a
Nothing
                            Maybe a
Nothing -> (a, Maybe a, Maybe a) -> Maybe (a, Maybe a, Maybe a)
forall a. a -> Maybe a
Just (a
k, Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
b)

        Stream m (k, Maybe a, Maybe b)
-> m (Stream m (k, Maybe a, Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m (k, Maybe a, Maybe b)
 -> m (Stream m (k, Maybe a, Maybe b)))
-> Stream m (k, Maybe a, Maybe b)
-> m (Stream m (k, Maybe a, Maybe b))
forall a b. (a -> b) -> a -> b
$ Stream m (k, Maybe a, Maybe b)
-> Stream m (k, Maybe a, Maybe b) -> Stream m (k, Maybe a, Maybe b)
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
Stream.append Stream m (k, Maybe a, Maybe b)
res1 Stream m (k, Maybe a, Maybe b)
forall a. Stream m (k, Maybe a, Maybe b)
res2

        where

        -- XXX Generate error if a duplicate insertion is attempted?
        kvFold :: Stream m (k, a) -> m (Map k a)
kvFold =
            let f :: Fold m (k, a) (Map k a)
f = (Map k a -> (k, a) -> Map k a)
-> Map k a -> Fold m (k, a) (Map k a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' (\Map k a
kv (k
k, a
b) -> k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) Map k a
forall k a. Map k a
Map.empty
             in Fold m (k, a) (Map k a) -> Stream m (k, a) -> m (Map k a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m (k, a) (Map k a)
forall a. Fold m (k, a) (Map k a)
f