{-# LANGUAGE CPP #-}
module Streamly.Internal.Data.Stream.StreamD.Container
(
nub
, joinLeftGeneric
, joinOuterGeneric
, joinInner
, joinLeft
, joinOuter
)
where
#include "inline.hs"
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Trans.State.Strict (get, put)
import Data.Function ((&))
import Data.Maybe (isJust)
import Streamly.Internal.Data.Stream.StreamD.Step (Step(..))
import Streamly.Internal.Data.Stream.StreamD.Type
(Stream(..), mkCross, unCross)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Internal.Data.Array.Generic as Array
import qualified Streamly.Internal.Data.Array.Mut.Type as MA
import qualified Streamly.Internal.Data.Stream.StreamD.Type as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Nesting as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Generate as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Transform as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Transformer as Stream
#include "DocTestDataStream.hs"
{-# INLINE_NORMAL nub #-}
nub :: (Monad m, Ord a) => Stream m a -> Stream m a
nub :: Stream m a -> Stream m a
nub (Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = (State StreamK m a -> (Set a, s) -> m (Step (Set a, s) a))
-> (Set a, s) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> (Set a, s) -> m (Step (Set a, s) a)
step (Set a
forall a. Set a
Set.empty, s
state1)
where
step :: State StreamK m a -> (Set a, s) -> m (Step (Set a, s) a)
step State StreamK m a
gst (Set a
set, s
st) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
Step (Set a, s) a -> m (Step (Set a, s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
(Step (Set a, s) a -> m (Step (Set a, s) a))
-> Step (Set a, s) a -> m (Step (Set a, s) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
x s
s ->
if a -> Set a -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member a
x Set a
set
then (Set a, s) -> Step (Set a, s) a
forall s a. s -> Step s a
Skip (Set a
set, s
s)
else a -> (Set a, s) -> Step (Set a, s) a
forall s a. a -> s -> Step s a
Yield a
x (a -> Set a -> Set a
forall a. Ord a => a -> Set a -> Set a
Set.insert a
x Set a
set, s
s)
Skip s
s -> (Set a, s) -> Step (Set a, s) a
forall s a. s -> Step s a
Skip (Set a
set, s
s)
Step s a
Stop -> Step (Set a, s) a
forall s a. Step s a
Stop
toMap :: (Monad m, Ord k) => Stream m (k, v) -> m (Map.Map k v)
toMap :: Stream m (k, v) -> m (Map k v)
toMap =
let f :: Fold m (k, a) (Map k a)
f = (Map k a -> (k, a) -> Map k a)
-> Map k a -> Fold m (k, a) (Map k a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' (\Map k a
kv (k
k, a
b) -> k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) Map k a
forall k a. Map k a
Map.empty
in Fold m (k, v) (Map k v) -> Stream m (k, v) -> m (Map k v)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m (k, v) (Map k v)
forall a. Fold m (k, a) (Map k a)
f
{-# INLINE joinInner #-}
joinInner :: (Monad m, Ord k) =>
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, b)
joinInner :: Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, b)
joinInner Stream m (k, a)
s1 Stream m (k, b)
s2 =
m (Stream m (k, a, b)) -> Stream m (k, a, b)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (m (Stream m (k, a, b)) -> Stream m (k, a, b))
-> m (Stream m (k, a, b)) -> Stream m (k, a, b)
forall a b. (a -> b) -> a -> b
$ do
Map k b
km <- Stream m (k, b) -> m (Map k b)
forall (m :: * -> *) k v.
(Monad m, Ord k) =>
Stream m (k, v) -> m (Map k v)
toMap Stream m (k, b)
s2
Stream m (k, a, b) -> m (Stream m (k, a, b))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Stream m (k, a, b) -> m (Stream m (k, a, b)))
-> Stream m (k, a, b) -> m (Stream m (k, a, b))
forall a b. (a -> b) -> a -> b
$ ((k, a) -> Maybe (k, a, b))
-> Stream m (k, a) -> Stream m (k, a, b)
forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
Stream.mapMaybe (Map k b -> (k, a) -> Maybe (k, a, b)
forall a c b. Ord a => Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map k b
km) Stream m (k, a)
s1
where
joinAB :: Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map a c
kvm (a
k, b
a) =
case a
k a -> Map a c -> Maybe c
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a c
kvm of
Just c
b -> (a, b, c) -> Maybe (a, b, c)
forall a. a -> Maybe a
Just (a
k, b
a, c
b)
Maybe c
Nothing -> Maybe (a, b, c)
forall a. Maybe a
Nothing
{-# INLINE joinLeftGeneric #-}
joinLeftGeneric :: Monad m =>
(a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftGeneric :: (a -> b -> Bool)
-> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftGeneric a -> b -> Bool
eq Stream m a
s1 Stream m b
s2 = m Bool
-> Stream (StateT Bool m) (a, Maybe b) -> Stream m (a, Maybe b)
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m a
Stream.evalStateT (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) (Stream (StateT Bool m) (a, Maybe b) -> Stream m (a, Maybe b))
-> Stream (StateT Bool m) (a, Maybe b) -> Stream m (a, Maybe b)
forall a b. (a -> b) -> a -> b
$ CrossStream (StateT Bool m) (a, Maybe b)
-> Stream (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. CrossStream m a -> Stream m a
unCross (CrossStream (StateT Bool m) (a, Maybe b)
-> Stream (StateT Bool m) (a, Maybe b))
-> CrossStream (StateT Bool m) (a, Maybe b)
-> Stream (StateT Bool m) (a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
a
a <- Stream (StateT Bool m) a -> CrossStream (StateT Bool m) a
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (Stream m a -> Stream (StateT Bool m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m a
s1)
Stream (StateT Bool m) () -> CrossStream (StateT Bool m) ()
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (StateT Bool m () -> Stream (StateT Bool m) ()
forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect (StateT Bool m () -> Stream (StateT Bool m) ())
-> StateT Bool m () -> Stream (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False)
let final :: Stream (StateT Bool m) (Maybe a)
final = StateT Bool m (Stream (StateT Bool m) (Maybe a))
-> Stream (StateT Bool m) (Maybe a)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (StateT Bool m (Stream (StateT Bool m) (Maybe a))
-> Stream (StateT Bool m) (Maybe a))
-> StateT Bool m (Stream (StateT Bool m) (Maybe a))
-> Stream (StateT Bool m) (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
Bool
r <- StateT Bool m Bool
forall (m :: * -> *) s. Monad m => StateT s m s
get
if Bool
r
then Stream (StateT Bool m) (Maybe a)
-> StateT Bool m (Stream (StateT Bool m) (Maybe a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Stream (StateT Bool m) (Maybe a)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
else Stream (StateT Bool m) (Maybe a)
-> StateT Bool m (Stream (StateT Bool m) (Maybe a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> Stream (StateT Bool m) (Maybe a)
forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure Maybe a
forall a. Maybe a
Nothing)
Maybe b
b <- Stream (StateT Bool m) (Maybe b)
-> CrossStream (StateT Bool m) (Maybe b)
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross ((b -> Maybe b)
-> Stream (StateT Bool m) b -> Stream (StateT Bool m) (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap b -> Maybe b
forall a. a -> Maybe a
Just (Stream m b -> Stream (StateT Bool m) b
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m b
s2) Stream (StateT Bool m) (Maybe b)
-> Stream (StateT Bool m) (Maybe b)
-> Stream (StateT Bool m) (Maybe b)
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` Stream (StateT Bool m) (Maybe b)
forall a. Stream (StateT Bool m) (Maybe a)
final)
case Maybe b
b of
Just b
b1 ->
if a
a a -> b -> Bool
`eq` b
b1
then do
Stream (StateT Bool m) () -> CrossStream (StateT Bool m) ()
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (StateT Bool m () -> Stream (StateT Bool m) ()
forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect (StateT Bool m () -> Stream (StateT Bool m) ())
-> StateT Bool m () -> Stream (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True)
(a, Maybe b) -> CrossStream (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b -> Maybe b
forall a. a -> Maybe a
Just b
b1)
else Stream (StateT Bool m) (a, Maybe b)
-> CrossStream (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross Stream (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
Maybe b
Nothing -> (a, Maybe b) -> CrossStream (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Maybe b
forall a. Maybe a
Nothing)
{-# INLINE joinLeft #-}
joinLeft :: (Ord k, Monad m) =>
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, Maybe b)
joinLeft :: Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, Maybe b)
joinLeft Stream m (k, a)
s1 Stream m (k, b)
s2 =
m (Stream m (k, a, Maybe b)) -> Stream m (k, a, Maybe b)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (m (Stream m (k, a, Maybe b)) -> Stream m (k, a, Maybe b))
-> m (Stream m (k, a, Maybe b)) -> Stream m (k, a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
Map k b
km <- Stream m (k, b) -> m (Map k b)
forall (m :: * -> *) k v.
(Monad m, Ord k) =>
Stream m (k, v) -> m (Map k v)
toMap Stream m (k, b)
s2
Stream m (k, a, Maybe b) -> m (Stream m (k, a, Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m (k, a, Maybe b) -> m (Stream m (k, a, Maybe b)))
-> Stream m (k, a, Maybe b) -> m (Stream m (k, a, Maybe b))
forall a b. (a -> b) -> a -> b
$ ((k, a) -> (k, a, Maybe b))
-> Stream m (k, a) -> Stream m (k, a, Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Map k b -> (k, a) -> (k, a, Maybe b)
forall a a b. Ord a => Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map k b
km) Stream m (k, a)
s1
where
joinAB :: Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map a a
km (a
k, b
a) =
case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
b -> (a
k, b
a, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
Maybe a
Nothing -> (a
k, b
a, Maybe a
forall a. Maybe a
Nothing)
{-# INLINE joinOuterGeneric #-}
joinOuterGeneric :: MonadIO m =>
(a -> b -> Bool)
-> Stream m a
-> Stream m b
-> Stream m (Maybe a, Maybe b)
joinOuterGeneric :: (a -> b -> Bool)
-> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b)
joinOuterGeneric a -> b -> Bool
eq Stream m a
s1 Stream m b
s =
m (Stream m (Maybe a, Maybe b)) -> Stream m (Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (m (Stream m (Maybe a, Maybe b)) -> Stream m (Maybe a, Maybe b))
-> m (Stream m (Maybe a, Maybe b)) -> Stream m (Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
Array b
inputArr <- Stream m b -> m (Array b)
forall (m :: * -> *) a. MonadIO m => Stream m a -> m (Array a)
Array.fromStream Stream m b
s
let len :: Int
len = Array b -> Int
forall a. Array a -> Int
Array.length Array b
inputArr
MutArray Bool
foundArr <-
Fold m Bool (MutArray Bool) -> Stream m Bool -> m (MutArray Bool)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold
(Int -> Fold m Bool (MutArray Bool)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m a (MutArray a)
MA.writeN Int
len)
([Bool] -> Stream m Bool
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList (Int -> Bool -> [Bool]
forall a. Int -> a -> [a]
Prelude.replicate Int
len Bool
False))
Stream m (Maybe a, Maybe b) -> m (Stream m (Maybe a, Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m (Maybe a, Maybe b) -> m (Stream m (Maybe a, Maybe b)))
-> Stream m (Maybe a, Maybe b) -> m (Stream m (Maybe a, Maybe b))
forall a b. (a -> b) -> a -> b
$ Array b -> MutArray Bool -> Stream m (Maybe a, Maybe b)
go Array b
inputArr MutArray Bool
foundArr Stream m (Maybe a, Maybe b)
-> Stream m (Maybe a, Maybe b) -> Stream m (Maybe a, Maybe b)
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` Array b -> MutArray Bool -> Stream m (Maybe a, Maybe b)
forall (m :: * -> *) a a.
MonadIO m =>
Array a -> MutArray Bool -> Stream m (Maybe a, Maybe a)
leftOver Array b
inputArr MutArray Bool
foundArr
where
leftOver :: Array a -> MutArray Bool -> Stream m (Maybe a, Maybe a)
leftOver Array a
inputArr MutArray Bool
foundArr =
let stream1 :: Stream m a
stream1 = Array a -> Stream m a
forall (m :: * -> *) a. Monad m => Array a -> Stream m a
Array.read Array a
inputArr
stream2 :: Stream m Bool
stream2 = Unfold m (MutArray Bool) Bool -> MutArray Bool -> Stream m Bool
forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
Stream.unfold Unfold m (MutArray Bool) Bool
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Unfold m (MutArray a) a
MA.reader MutArray Bool
foundArr
in (Maybe (Maybe a, Maybe a) -> Bool)
-> Stream m (Maybe (Maybe a, Maybe a))
-> Stream m (Maybe (Maybe a, Maybe a))
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter
Maybe (Maybe a, Maybe a) -> Bool
forall a. Maybe a -> Bool
isJust
( (a -> Bool -> Maybe (Maybe a, Maybe a))
-> Stream m a
-> Stream m Bool
-> Stream m (Maybe (Maybe a, Maybe a))
forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
Stream.zipWith (\a
x Bool
y ->
if Bool
y
then Maybe (Maybe a, Maybe a)
forall a. Maybe a
Nothing
else (Maybe a, Maybe a) -> Maybe (Maybe a, Maybe a)
forall a. a -> Maybe a
Just (Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
x)
) Stream m a
stream1 Stream m Bool
stream2
) Stream m (Maybe (Maybe a, Maybe a))
-> (Stream m (Maybe (Maybe a, Maybe a))
-> Stream m (Maybe a, Maybe a))
-> Stream m (Maybe a, Maybe a)
forall a b. a -> (a -> b) -> b
& Stream m (Maybe (Maybe a, Maybe a)) -> Stream m (Maybe a, Maybe a)
forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes
evalState :: CrossStream (StateT Bool m) a -> Stream m a
evalState = m Bool -> Stream (StateT Bool m) a -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m a
Stream.evalStateT (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) (Stream (StateT Bool m) a -> Stream m a)
-> (CrossStream (StateT Bool m) a -> Stream (StateT Bool m) a)
-> CrossStream (StateT Bool m) a
-> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CrossStream (StateT Bool m) a -> Stream (StateT Bool m) a
forall (m :: * -> *) a. CrossStream m a -> Stream m a
unCross
go :: Array b -> MutArray Bool -> Stream m (Maybe a, Maybe b)
go Array b
inputArr MutArray Bool
foundArr = CrossStream (StateT Bool m) (Maybe a, Maybe b)
-> Stream m (Maybe a, Maybe b)
forall a. CrossStream (StateT Bool m) a -> Stream m a
evalState (CrossStream (StateT Bool m) (Maybe a, Maybe b)
-> Stream m (Maybe a, Maybe b))
-> CrossStream (StateT Bool m) (Maybe a, Maybe b)
-> Stream m (Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
a
a <- Stream (StateT Bool m) a -> CrossStream (StateT Bool m) a
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (Stream m a -> Stream (StateT Bool m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m a
s1)
Stream (StateT Bool m) () -> CrossStream (StateT Bool m) ()
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (StateT Bool m () -> Stream (StateT Bool m) ()
forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect (StateT Bool m () -> Stream (StateT Bool m) ())
-> StateT Bool m () -> Stream (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False)
let final :: Stream (StateT Bool m) (Maybe a)
final = StateT Bool m (Stream (StateT Bool m) (Maybe a))
-> Stream (StateT Bool m) (Maybe a)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (StateT Bool m (Stream (StateT Bool m) (Maybe a))
-> Stream (StateT Bool m) (Maybe a))
-> StateT Bool m (Stream (StateT Bool m) (Maybe a))
-> Stream (StateT Bool m) (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
Bool
r <- StateT Bool m Bool
forall (m :: * -> *) s. Monad m => StateT s m s
get
if Bool
r
then Stream (StateT Bool m) (Maybe a)
-> StateT Bool m (Stream (StateT Bool m) (Maybe a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Stream (StateT Bool m) (Maybe a)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
else Stream (StateT Bool m) (Maybe a)
-> StateT Bool m (Stream (StateT Bool m) (Maybe a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> Stream (StateT Bool m) (Maybe a)
forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure Maybe a
forall a. Maybe a
Nothing)
(Int
i, Maybe b
b) <-
let stream :: Stream m b
stream = Array b -> Stream m b
forall (m :: * -> *) a. Monad m => Array a -> Stream m a
Array.read Array b
inputArr
in Stream (StateT Bool m) (Int, Maybe b)
-> CrossStream (StateT Bool m) (Int, Maybe b)
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross
(Stream (StateT Bool m) (Maybe b)
-> Stream (StateT Bool m) (Int, Maybe b)
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
Stream.indexed (Stream (StateT Bool m) (Maybe b)
-> Stream (StateT Bool m) (Int, Maybe b))
-> Stream (StateT Bool m) (Maybe b)
-> Stream (StateT Bool m) (Int, Maybe b)
forall a b. (a -> b) -> a -> b
$ (b -> Maybe b)
-> Stream (StateT Bool m) b -> Stream (StateT Bool m) (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap b -> Maybe b
forall a. a -> Maybe a
Just (Stream m b -> Stream (StateT Bool m) b
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
Stream.liftInner Stream m b
stream) Stream (StateT Bool m) (Maybe b)
-> Stream (StateT Bool m) (Maybe b)
-> Stream (StateT Bool m) (Maybe b)
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` Stream (StateT Bool m) (Maybe b)
forall a. Stream (StateT Bool m) (Maybe a)
final)
case Maybe b
b of
Just b
b1 ->
if a
a a -> b -> Bool
`eq` b
b1
then do
Stream (StateT Bool m) () -> CrossStream (StateT Bool m) ()
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross (StateT Bool m () -> Stream (StateT Bool m) ()
forall (m :: * -> *) a. Applicative m => m a -> Stream m a
Stream.fromEffect (StateT Bool m () -> Stream (StateT Bool m) ())
-> StateT Bool m () -> Stream (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True)
Int -> MutArray Bool -> Bool -> CrossStream (StateT Bool m) ()
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> MutArray a -> a -> m ()
MA.putIndex Int
i MutArray Bool
foundArr Bool
True
(Maybe a, Maybe b)
-> CrossStream (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a, b -> Maybe b
forall a. a -> Maybe a
Just b
b1)
else Stream (StateT Bool m) (Maybe a, Maybe b)
-> CrossStream (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Stream m a -> CrossStream m a
mkCross Stream (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
Maybe b
Nothing -> (Maybe a, Maybe b)
-> CrossStream (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe b
forall a. Maybe a
Nothing)
{-# INLINE joinOuter #-}
joinOuter ::
(Ord k, MonadIO m) =>
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
joinOuter :: Stream m (k, a)
-> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
joinOuter Stream m (k, a)
s1 Stream m (k, b)
s2 =
m (Stream m (k, Maybe a, Maybe b))
-> Stream m (k, Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect (m (Stream m (k, Maybe a, Maybe b))
-> Stream m (k, Maybe a, Maybe b))
-> m (Stream m (k, Maybe a, Maybe b))
-> Stream m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
Map k a
km1 <- Stream m (k, a) -> m (Map k a)
forall a. Stream m (k, a) -> m (Map k a)
kvFold Stream m (k, a)
s1
Map k b
km2 <- Stream m (k, b) -> m (Map k b)
forall a. Stream m (k, a) -> m (Map k a)
kvFold Stream m (k, b)
s2
let res1 :: Stream m (k, Maybe a, Maybe b)
res1 = ((k, a) -> (k, Maybe a, Maybe b))
-> Stream m (k, a) -> Stream m (k, Maybe a, Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Map k b -> (k, a) -> (k, Maybe a, Maybe b)
forall a a a. Ord a => Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map k b
km2)
(Stream m (k, a) -> Stream m (k, Maybe a, Maybe b))
-> Stream m (k, a) -> Stream m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ [(k, a)] -> Stream m (k, a)
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList ([(k, a)] -> Stream m (k, a)) -> [(k, a)] -> Stream m (k, a)
forall a b. (a -> b) -> a -> b
$ Map k a -> [(k, a)]
forall k a. Map k a -> [(k, a)]
Map.toList Map k a
km1
where
joinAB :: Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
a) =
case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
b -> (a
k, a -> Maybe a
forall a. a -> Maybe a
Just a
a, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
Maybe a
Nothing -> (a
k, a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe a
forall a. Maybe a
Nothing)
let res2 :: Stream m (k, Maybe a, Maybe b)
res2 = ((k, b) -> Maybe (k, Maybe a, Maybe b))
-> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
Stream.mapMaybe (Map k a -> (k, b) -> Maybe (k, Maybe a, Maybe b)
forall a a a a.
Ord a =>
Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map k a
km1)
(Stream m (k, b) -> Stream m (k, Maybe a, Maybe b))
-> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ [(k, b)] -> Stream m (k, b)
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList ([(k, b)] -> Stream m (k, b)) -> [(k, b)] -> Stream m (k, b)
forall a b. (a -> b) -> a -> b
$ Map k b -> [(k, b)]
forall k a. Map k a -> [(k, a)]
Map.toList Map k b
km2
where
joinAB :: Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
b) =
case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
_ -> Maybe (a, Maybe a, Maybe a)
forall a. Maybe a
Nothing
Maybe a
Nothing -> (a, Maybe a, Maybe a) -> Maybe (a, Maybe a, Maybe a)
forall a. a -> Maybe a
Just (a
k, Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
Stream m (k, Maybe a, Maybe b)
-> m (Stream m (k, Maybe a, Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m (k, Maybe a, Maybe b)
-> m (Stream m (k, Maybe a, Maybe b)))
-> Stream m (k, Maybe a, Maybe b)
-> m (Stream m (k, Maybe a, Maybe b))
forall a b. (a -> b) -> a -> b
$ Stream m (k, Maybe a, Maybe b)
-> Stream m (k, Maybe a, Maybe b) -> Stream m (k, Maybe a, Maybe b)
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
Stream.append Stream m (k, Maybe a, Maybe b)
res1 Stream m (k, Maybe a, Maybe b)
forall a. Stream m (k, Maybe a, Maybe b)
res2
where
kvFold :: Stream m (k, a) -> m (Map k a)
kvFold =
let f :: Fold m (k, a) (Map k a)
f = (Map k a -> (k, a) -> Map k a)
-> Map k a -> Fold m (k, a) (Map k a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' (\Map k a
kv (k
k, a
b) -> k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) Map k a
forall k a. Map k a
Map.empty
in Fold m (k, a) (Map k a) -> Stream m (k, a) -> m (Map k a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m (k, a) (Map k a)
forall a. Fold m (k, a) (Map k a)
f