-- |
-- Module      : Streamly.Internal.Data.Fold.Container
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--

module Streamly.Internal.Data.Fold.Container
    (
    -- * Set operations
      toSet
    , toIntSet
    , countDistinct
    , countDistinctInt
    , nub
    , nubInt

    -- * Map operations
    , frequency

    -- ** Demultiplexing
    -- | Direct values in the input stream to different folds using an n-ary
    -- fold selector. 'demux' is a generalization of 'classify' (and
    -- 'partition') where each key of the classifier can use a different fold.
    --
    -- You need to see only 'demux' if you are looking to find the capabilities
    -- of these combinators, all others are variants of that.

    -- *** Output is a container
    -- | The fold state snapshot returns the key-value container of in-progress
    -- folds.
    , demuxToContainer
    , demuxToContainerIO
    , demuxToMap
    , demuxToMapIO

    -- *** Input is explicit key-value tuple
    -- | Like above but inputs are in explicit key-value pair form.
    , demuxKvToContainer
    , demuxKvToMap

    -- *** Scan of finished fold results
    -- | Like above, but the resulting fold state snapshot contains the key
    -- value container as well as the finished key result if a fold in the
    -- container finished.
    , demuxGeneric
    , demux
    , demuxGenericIO
    , demuxIO

    -- TODO: These can be implemented using the above operations
    -- , demuxSel -- Stop when the fold for the specified key stops
    -- , demuxMin -- Stop when any of the folds stop
    -- , demuxAll -- Stop when all the folds stop (run once)

    -- ** Classifying
    -- | In an input stream of key value pairs fold values for different keys
    -- in individual output buckets using the given fold. 'classify' is a
    -- special case of 'demux' where all the branches of the demultiplexer use
    -- the same fold.
    --
    -- Different types of maps can be used with these combinators via the IsMap
    -- type class. Hashmap performs better when there are more collisions, trie
    -- Map performs better otherwise. Trie has an advantage of sorting the keys
    -- at the same time.  For example if we want to store a dictionary of words
    -- and their meanings then trie Map would be better if we also want to
    -- display them in sorted order.

    , kvToMap

    , toContainer
    , toContainerIO
    , toMap
    , toMapIO

    , classifyGeneric
    , classify
    , classifyGenericIO
    , classifyIO
    -- , toContainerSel
    -- , toContainerMin
    )
where

#include "inline.hs"
#include "ArrayMacros.h"

import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (newIORef, readIORef, writeIORef)
import Data.Map.Strict (Map)
import Data.IntSet (IntSet)
import Data.Set (Set)
import Streamly.Internal.Data.IsMap (IsMap(..))
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))

import qualified Data.IntSet as IntSet
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.IsMap as IsMap

import Prelude hiding (Foldable(..))
import Streamly.Internal.Data.Fold.Type
import Streamly.Internal.Data.Fold.Combinators

-- $setup
-- >>> :m
-- >>> :set -XFlexibleContexts
-- >>> import qualified Data.Map as Map
-- >>> import qualified Data.Set as Set
-- >>> import qualified Data.IntSet as IntSet
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Data.Stream as Stream
-- >>> import qualified Streamly.Internal.Data.Fold.Container as Fold

-- | Fold the input to a set.
--
-- Definition:
--
-- >>> toSet = Fold.foldl' (flip Set.insert) Set.empty
--
{-# INLINE toSet #-}
toSet :: (Monad m, Ord a) => Fold m a (Set a)
toSet :: forall (m :: * -> *) a. (Monad m, Ord a) => Fold m a (Set a)
toSet = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
foldl' (forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a. Ord a => a -> Set a -> Set a
Set.insert) forall a. Set a
Set.empty

-- | Fold the input to an int set. For integer inputs this performs better than
-- 'toSet'.
--
-- Definition:
--
-- >>> toIntSet = Fold.foldl' (flip IntSet.insert) IntSet.empty
--
{-# INLINE toIntSet #-}
toIntSet :: Monad m => Fold m Int IntSet
toIntSet :: forall (m :: * -> *). Monad m => Fold m Int IntSet
toIntSet = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
foldl' (forall a b c. (a -> b -> c) -> b -> a -> c
flip Int -> IntSet -> IntSet
IntSet.insert) IntSet
IntSet.empty

-- XXX Name as nubOrd? Or write a nubGeneric

-- | Used as a scan. Returns 'Just' for the first occurrence of an element,
-- returns 'Nothing' for any other occurrences.
--
-- Example:
--
-- >>> stream = Stream.fromList [1::Int,1,2,3,4,4,5,1,5,7]
-- >>> Stream.fold Fold.toList $ Stream.scanMaybe Fold.nub stream
-- [1,2,3,4,5,7]
--
-- /Pre-release/
{-# INLINE nub #-}
nub :: (Monad m, Ord a) => Fold m a (Maybe a)
nub :: forall (m :: * -> *) a. (Monad m, Ord a) => Fold m a (Maybe a)
nub = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(Tuple' Set a
_ Maybe a
x) -> Maybe a
x) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
foldl' forall {a} {b}.
Ord a =>
Tuple' (Set a) b -> a -> Tuple' (Set a) (Maybe a)
step forall {a} {a}. Tuple' (Set a) (Maybe a)
initial

    where

    initial :: Tuple' (Set a) (Maybe a)
initial = forall a b. a -> b -> Tuple' a b
Tuple' forall a. Set a
Set.empty forall a. Maybe a
Nothing

    step :: Tuple' (Set a) b -> a -> Tuple' (Set a) (Maybe a)
step (Tuple' Set a
set b
_) a
x =
        if forall a. Ord a => a -> Set a -> Bool
Set.member a
x Set a
set
        then forall a b. a -> b -> Tuple' a b
Tuple' Set a
set forall a. Maybe a
Nothing
        else forall a b. a -> b -> Tuple' a b
Tuple' (forall a. Ord a => a -> Set a -> Set a
Set.insert a
x Set a
set) (forall a. a -> Maybe a
Just a
x)

-- | Like 'nub' but specialized to a stream of 'Int', for better performance.
--
-- /Pre-release/
{-# INLINE nubInt #-}
nubInt :: Monad m => Fold m Int (Maybe Int)
nubInt :: forall (m :: * -> *). Monad m => Fold m Int (Maybe Int)
nubInt = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(Tuple' IntSet
_ Maybe Int
x) -> Maybe Int
x) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
foldl' forall {b}. Tuple' IntSet b -> Int -> Tuple' IntSet (Maybe Int)
step forall {a}. Tuple' IntSet (Maybe a)
initial

    where

    initial :: Tuple' IntSet (Maybe a)
initial = forall a b. a -> b -> Tuple' a b
Tuple' IntSet
IntSet.empty forall a. Maybe a
Nothing

    step :: Tuple' IntSet b -> Int -> Tuple' IntSet (Maybe Int)
step (Tuple' IntSet
set b
_) Int
x =
        if Int -> IntSet -> Bool
IntSet.member Int
x IntSet
set
        then forall a b. a -> b -> Tuple' a b
Tuple' IntSet
set forall a. Maybe a
Nothing
        else forall a b. a -> b -> Tuple' a b
Tuple' (Int -> IntSet -> IntSet
IntSet.insert Int
x IntSet
set) (forall a. a -> Maybe a
Just Int
x)

-- XXX Try Hash set
-- XXX Add a countDistinct window fold
-- XXX Add a bloom filter fold

-- | Count non-duplicate elements in the stream.
--
-- Definition:
--
-- >>> countDistinct = fmap Set.size Fold.toSet
-- >>> countDistinct = Fold.postscan Fold.nub $ Fold.catMaybes $ Fold.length
--
-- The memory used is proportional to the number of distinct elements in the
-- stream, to guard against using too much memory use it as a scan and
-- terminate if the count reaches more than a threshold.
--
-- /Space/: \(\mathcal{O}(n)\)
--
-- /Pre-release/
--
{-# INLINE countDistinct #-}
countDistinct :: (Monad m, Ord a) => Fold m a Int
-- countDistinct = postscan nub $ catMaybes length
countDistinct :: forall (m :: * -> *) a. (Monad m, Ord a) => Fold m a Int
countDistinct = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Set a -> Int
Set.size forall (m :: * -> *) a. (Monad m, Ord a) => Fold m a (Set a)
toSet
{-
countDistinct = fmap (\(Tuple' _ n) -> n) $ foldl' step initial

    where

    initial = Tuple' Set.empty 0

    step (Tuple' set n) x = do
        if Set.member x set
        then
            Tuple' set n
        else
            let cnt = n + 1
             in Tuple' (Set.insert x set) cnt
-}

-- | Like 'countDistinct' but specialized to a stream of 'Int', for better
-- performance.
--
-- Definition:
--
-- >>> countDistinctInt = fmap IntSet.size Fold.toIntSet
-- >>> countDistinctInt = Fold.postscan Fold.nubInt $ Fold.catMaybes $ Fold.length
--
-- /Pre-release/
{-# INLINE countDistinctInt #-}
countDistinctInt :: Monad m => Fold m Int Int
-- countDistinctInt = postscan nubInt $ catMaybes length
countDistinctInt :: forall (m :: * -> *). Monad m => Fold m Int Int
countDistinctInt = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap IntSet -> Int
IntSet.size forall (m :: * -> *). Monad m => Fold m Int IntSet
toIntSet
{-
countDistinctInt = fmap (\(Tuple' _ n) -> n) $ foldl' step initial

    where

    initial = Tuple' IntSet.empty 0

    step (Tuple' set n) x = do
        if IntSet.member x set
        then
            Tuple' set n
        else
            let cnt = n + 1
             in Tuple' (IntSet.insert x set) cnt
 -}

------------------------------------------------------------------------------
-- demux: in a key value stream fold each key sub-stream with a different fold
------------------------------------------------------------------------------

-- TODO Demultiplex an input element into a number of typed variants. We want
-- to statically restrict the target values within a set of predefined types,
-- an enumeration of a GADT.
--
-- This is the consumer side dual of the producer side 'mux' operation (XXX to
-- be implemented).
--
-- XXX If we use Refold in it, it can perhaps fuse/be more efficient. For
-- example we can store just the result rather than storing the whole fold in
-- the Map.
--
-- Note: There are separate functions to determine Key and Fold from the input
-- because key is to be determined on each input whereas fold is to be
-- determined only once for a key.

-- | This is the most general of all demux, classify operations.
--
-- See 'demux' for documentation.
{-# INLINE demuxGeneric #-}
demuxGeneric :: (Monad m, IsMap f, Traversable f) =>
       (a -> Key f)
    -> (a -> m (Fold m a b))
    -> Fold m a (m (f b), Maybe (Key f, b))
demuxGeneric :: forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, IsMap f, Traversable f) =>
(a -> Key f)
-> (a -> m (Fold m a b)) -> Fold m a (m (f b), Maybe (Key f, b))
demuxGeneric a -> Key f
getKey a -> m (Fold m a b)
getFold =
    forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold (\Tuple' (f (Fold m a b)) (Maybe (Key f, b))
s a
a -> forall s b. s -> Step s b
Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall {b}.
Tuple' (f (Fold m a b)) b
-> a -> m (Tuple' (f (Fold m a b)) (Maybe (Key f, b)))
step Tuple' (f (Fold m a b)) (Maybe (Key f, b))
s a
a) (forall s b. s -> Step s b
Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall {a} {a}. m (Tuple' (f a) (Maybe a))
initial) forall {t :: * -> *} {m :: * -> *} {m :: * -> *} {a} {b} {b}.
(Traversable t, Monad m, Monad m) =>
Tuple' (t (Fold m a b)) b -> m (m (t b), b)
extract forall {t :: * -> *} {m :: * -> *} {m :: * -> *} {a} {b} {b}.
(Traversable t, Monad m, Monad m) =>
Tuple' (t (Fold m a b)) b -> m (m (t b), b)
final

    where

    initial :: m (Tuple' (f a) (Maybe a))
initial = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> Tuple' a b
Tuple' forall (f :: * -> *) a. IsMap f => f a
IsMap.mapEmpty forall a. Maybe a
Nothing

    {-# INLINE runFold #-}
    runFold :: f (Fold m a b)
-> Fold m a b
-> (Key f, a)
-> m (Tuple' (f (Fold m a b)) (Maybe (Key f, b)))
runFold f (Fold m a b)
kv (Fold s -> a -> m (Step s b)
step1 m (Step s b)
initial1 s -> m b
extract1 s -> m b
final1) (Key f
k, a
a) = do
         Step s b
res <- m (Step s b)
initial1
         case Step s b
res of
            Partial s
s -> do
                Step s b
res1 <- s -> a -> m (Step s b)
step1 s
s a
a
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ case Step s b
res1 of
                        Partial s
_ ->
                            let fld :: Fold m a b
fld = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
step1 (forall (m :: * -> *) a. Monad m => a -> m a
return Step s b
res1) s -> m b
extract1 s -> m b
final1
                             in forall a b. a -> b -> Tuple' a b
Tuple' (forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert Key f
k Fold m a b
fld f (Fold m a b)
kv) forall a. Maybe a
Nothing
                        Done b
b -> forall a b. a -> b -> Tuple' a b
Tuple' (forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
k f (Fold m a b)
kv) (forall a. a -> Maybe a
Just (Key f
k, b
b))
            Done b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> Tuple' a b
Tuple' f (Fold m a b)
kv (forall a. a -> Maybe a
Just (Key f
k, b
b))

    step :: Tuple' (f (Fold m a b)) b
-> a -> m (Tuple' (f (Fold m a b)) (Maybe (Key f, b)))
step (Tuple' f (Fold m a b)
kv b
_) a
a = do
        let k :: Key f
k = a -> Key f
getKey a
a
        case forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
k f (Fold m a b)
kv of
            Maybe (Fold m a b)
Nothing -> do
                Fold m a b
fld <- a -> m (Fold m a b)
getFold a
a
                forall {m :: * -> *} {f :: * -> *} {a} {b}.
(IsMap f, Monad m) =>
f (Fold m a b)
-> Fold m a b
-> (Key f, a)
-> m (Tuple' (f (Fold m a b)) (Maybe (Key f, b)))
runFold f (Fold m a b)
kv Fold m a b
fld (Key f
k, a
a)
            Just Fold m a b
f -> forall {m :: * -> *} {f :: * -> *} {a} {b}.
(IsMap f, Monad m) =>
f (Fold m a b)
-> Fold m a b
-> (Key f, a)
-> m (Tuple' (f (Fold m a b)) (Maybe (Key f, b)))
runFold f (Fold m a b)
kv Fold m a b
f (Key f
k, a
a)

    extract :: Tuple' (t (Fold m a b)) b -> m (m (t b), b)
extract (Tuple' t (Fold m a b)
kv b
x) = forall (m :: * -> *) a. Monad m => a -> m a
return (forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
Prelude.mapM forall {m :: * -> *} {a} {b}. Monad m => Fold m a b -> m b
f t (Fold m a b)
kv, b
x)

        where

        f :: Fold m a b -> m b
f (Fold s -> a -> m (Step s b)
_ m (Step s b)
i s -> m b
e s -> m b
_) = do
            Step s b
r <- m (Step s b)
i
            case Step s b
r of
                Partial s
s -> s -> m b
e s
s
                Step s b
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"demuxGeneric: unreachable code"

    final :: Tuple' (t (Fold m a b)) b -> m (m (t b), b)
final (Tuple' t (Fold m a b)
kv b
x) = forall (m :: * -> *) a. Monad m => a -> m a
return (forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
Prelude.mapM forall {m :: * -> *} {a} {b}. Monad m => Fold m a b -> m b
f t (Fold m a b)
kv, b
x)

        where

        f :: Fold m a b -> m b
f (Fold s -> a -> m (Step s b)
_ m (Step s b)
i s -> m b
_ s -> m b
fin) = do
            Step s b
r <- m (Step s b)
i
            case Step s b
r of
                Partial s
s -> s -> m b
fin s
s
                Step s b
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"demuxGeneric: unreachable code"

-- | @demux getKey getFold@: In a key value stream, fold values corresponding
-- to each key using a key specific fold. @getFold@ is invoked to generate a
-- key specific fold when a key is encountered for the first time in the
-- stream.
--
-- The first component of the output tuple is a key-value Map of in-progress
-- folds. The fold returns the fold result as the second component of the
-- output tuple whenever a fold terminates.
--
-- If a fold terminates, another instance of the fold is started upon receiving
-- an input with that key, @getFold@ is invoked again whenever the key is
-- encountered again.
--
-- This can be used to scan a stream and collect the results from the scan
-- output.
--
-- Since the fold generator function is monadic we can add folds dynamically.
-- For example, we can maintain a Map of keys to folds in an IORef and lookup
-- the fold from that corresponding to a key. This Map can be changed
-- dynamically, folds for new keys can be added or folds for old keys can be
-- deleted or modified.
--
-- Compare with 'classify', the fold in 'classify' is a static fold.
--
-- /Pre-release/
--
{-# INLINE demux #-}
demux :: (Monad m, Ord k) =>
       (a -> k)
    -> (a -> m (Fold m a b))
    -> Fold m a (m (Map k b), Maybe (k, b))
demux :: forall (m :: * -> *) k a b.
(Monad m, Ord k) =>
(a -> k)
-> (a -> m (Fold m a b)) -> Fold m a (m (Map k b), Maybe (k, b))
demux = forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, IsMap f, Traversable f) =>
(a -> Key f)
-> (a -> m (Fold m a b)) -> Fold m a (m (f b), Maybe (Key f, b))
demuxGeneric

-- | This is specialized version of 'demuxGeneric' that uses mutable IO cells
-- as fold accumulators for better performance.
{-# INLINE demuxGenericIO #-}
demuxGenericIO :: (MonadIO m, IsMap f, Traversable f) =>
       (a -> Key f)
    -> (a -> m (Fold m a b))
    -> Fold m a (m (f b), Maybe (Key f, b))
demuxGenericIO :: forall (m :: * -> *) (f :: * -> *) a b.
(MonadIO m, IsMap f, Traversable f) =>
(a -> Key f)
-> (a -> m (Fold m a b)) -> Fold m a (m (f b), Maybe (Key f, b))
demuxGenericIO a -> Key f
getKey a -> m (Fold m a b)
getFold =
    forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold (\Tuple' (f (IORef (Fold m a b))) (Maybe (Key f, b))
s a
a -> forall s b. s -> Step s b
Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall {b}.
Tuple' (f (IORef (Fold m a b))) b
-> a -> m (Tuple' (f (IORef (Fold m a b))) (Maybe (Key f, b)))
step Tuple' (f (IORef (Fold m a b))) (Maybe (Key f, b))
s a
a) (forall s b. s -> Step s b
Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall {a} {a}. m (Tuple' (f a) (Maybe a))
initial) forall {t :: * -> *} {m :: * -> *} {m :: * -> *} {a} {b} {b}.
(Traversable t, Monad m, MonadIO m) =>
Tuple' (t (IORef (Fold m a b))) b -> m (m (t b), b)
extract forall {t :: * -> *} {m :: * -> *} {m :: * -> *} {a} {b} {b}.
(Traversable t, Monad m, MonadIO m) =>
Tuple' (t (IORef (Fold m a b))) b -> m (m (t b), b)
final

    where

    initial :: m (Tuple' (f a) (Maybe a))
initial = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> Tuple' a b
Tuple' forall (f :: * -> *) a. IsMap f => f a
IsMap.mapEmpty forall a. Maybe a
Nothing

    {-# INLINE initFold #-}
    initFold :: f (IORef (Fold m a b))
-> Fold m a b
-> (Key f, a)
-> m (Tuple' (f (IORef (Fold m a b))) (Maybe (Key f, b)))
initFold f (IORef (Fold m a b))
kv (Fold s -> a -> m (Step s b)
step1 m (Step s b)
initial1 s -> m b
extract1 s -> m b
final1) (Key f
k, a
a) = do
         Step s b
res <- m (Step s b)
initial1
         case Step s b
res of
            Partial s
s -> do
                Step s b
res1 <- s -> a -> m (Step s b)
step1 s
s a
a
                case Step s b
res1 of
                    Partial s
_ -> do
                        let fld :: Fold m a b
fld = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
step1 (forall (m :: * -> *) a. Monad m => a -> m a
return Step s b
res1) s -> m b
extract1 s -> m b
final1
                        IORef (Fold m a b)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef Fold m a b
fld
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> Tuple' a b
Tuple' (forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert Key f
k IORef (Fold m a b)
ref f (IORef (Fold m a b))
kv) forall a. Maybe a
Nothing
                    Done b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> Tuple' a b
Tuple' f (IORef (Fold m a b))
kv (forall a. a -> Maybe a
Just (Key f
k, b
b))
            Done b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> Tuple' a b
Tuple' f (IORef (Fold m a b))
kv (forall a. a -> Maybe a
Just (Key f
k, b
b))

    {-# INLINE runFold #-}
    runFold :: f a
-> IORef (Fold m a b)
-> Fold m a b
-> (Key f, a)
-> m (Tuple' (f a) (Maybe (Key f, b)))
runFold f a
kv IORef (Fold m a b)
ref (Fold s -> a -> m (Step s b)
step1 m (Step s b)
initial1 s -> m b
extract1 s -> m b
final1) (Key f
k, a
a) = do
         Step s b
res <- m (Step s b)
initial1
         case Step s b
res of
            Partial s
s -> do
                Step s b
res1 <- s -> a -> m (Step s b)
step1 s
s a
a
                case Step s b
res1 of
                        Partial s
_ -> do
                            let fld :: Fold m a b
fld = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
step1 (forall (m :: * -> *) a. Monad m => a -> m a
return Step s b
res1) s -> m b
extract1 s -> m b
final1
                            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Fold m a b)
ref Fold m a b
fld
                            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> Tuple' a b
Tuple' f a
kv forall a. Maybe a
Nothing
                        Done b
b ->
                            let kv1 :: f a
kv1 = forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
k f a
kv
                             in forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> Tuple' a b
Tuple' f a
kv1 (forall a. a -> Maybe a
Just (Key f
k, b
b))
            Done b
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"demuxGenericIO: unreachable"

    step :: Tuple' (f (IORef (Fold m a b))) b
-> a -> m (Tuple' (f (IORef (Fold m a b))) (Maybe (Key f, b)))
step (Tuple' f (IORef (Fold m a b))
kv b
_) a
a = do
        let k :: Key f
k = a -> Key f
getKey a
a
        case forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
k f (IORef (Fold m a b))
kv of
            Maybe (IORef (Fold m a b))
Nothing -> do
                Fold m a b
f <- a -> m (Fold m a b)
getFold a
a
                forall {m :: * -> *} {f :: * -> *} {a} {b}.
(MonadIO m, IsMap f) =>
f (IORef (Fold m a b))
-> Fold m a b
-> (Key f, a)
-> m (Tuple' (f (IORef (Fold m a b))) (Maybe (Key f, b)))
initFold f (IORef (Fold m a b))
kv Fold m a b
f (Key f
k, a
a)
            Just IORef (Fold m a b)
ref -> do
                Fold m a b
f <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Fold m a b)
ref
                forall {m :: * -> *} {f :: * -> *} {a} {a} {b}.
(MonadIO m, IsMap f) =>
f a
-> IORef (Fold m a b)
-> Fold m a b
-> (Key f, a)
-> m (Tuple' (f a) (Maybe (Key f, b)))
runFold f (IORef (Fold m a b))
kv IORef (Fold m a b)
ref Fold m a b
f (Key f
k, a
a)

    extract :: Tuple' (t (IORef (Fold m a b))) b -> m (m (t b), b)
extract (Tuple' t (IORef (Fold m a b))
kv b
x) = forall (m :: * -> *) a. Monad m => a -> m a
return (forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
Prelude.mapM forall {m :: * -> *} {a} {b}.
MonadIO m =>
IORef (Fold m a b) -> m b
f t (IORef (Fold m a b))
kv, b
x)

        where

        f :: IORef (Fold m a b) -> m b
f IORef (Fold m a b)
ref = do
            Fold s -> a -> m (Step s b)
_ m (Step s b)
i s -> m b
e s -> m b
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Fold m a b)
ref
            Step s b
r <- m (Step s b)
i
            case Step s b
r of
                Partial s
s -> s -> m b
e s
s
                Step s b
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"demuxGenericIO: unreachable code"

    final :: Tuple' (t (IORef (Fold m a b))) b -> m (m (t b), b)
final (Tuple' t (IORef (Fold m a b))
kv b
x) = forall (m :: * -> *) a. Monad m => a -> m a
return (forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
Prelude.mapM forall {m :: * -> *} {a} {b}.
MonadIO m =>
IORef (Fold m a b) -> m b
f t (IORef (Fold m a b))
kv, b
x)

        where

        f :: IORef (Fold m a b) -> m b
f IORef (Fold m a b)
ref = do
            Fold s -> a -> m (Step s b)
_ m (Step s b)
i s -> m b
_ s -> m b
fin <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Fold m a b)
ref
            Step s b
r <- m (Step s b)
i
            case Step s b
r of
                Partial s
s -> s -> m b
fin s
s
                Step s b
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"demuxGenericIO: unreachable code"

-- | This is specialized version of 'demux' that uses mutable IO cells as
-- fold accumulators for better performance.
--
-- Keep in mind that the values in the returned Map may be changed by the
-- ongoing fold if you are using those concurrently in another thread.
--
{-# INLINE demuxIO #-}
demuxIO :: (MonadIO m, Ord k) =>
       (a -> k)
    -> (a -> m (Fold m a b))
    -> Fold m a (m (Map k b), Maybe (k, b))
demuxIO :: forall (m :: * -> *) k a b.
(MonadIO m, Ord k) =>
(a -> k)
-> (a -> m (Fold m a b)) -> Fold m a (m (Map k b), Maybe (k, b))
demuxIO = forall (m :: * -> *) (f :: * -> *) a b.
(MonadIO m, IsMap f, Traversable f) =>
(a -> Key f)
-> (a -> m (Fold m a b)) -> Fold m a (m (f b), Maybe (Key f, b))
demuxGenericIO

-- | Fold a key value stream to a key-value Map. If the same key appears
-- multiple times, only the last value is retained.
{-# INLINE kvToMapOverwriteGeneric #-}
kvToMapOverwriteGeneric :: (Monad m, IsMap f) => Fold m (Key f, a) (f a)
kvToMapOverwriteGeneric :: forall (m :: * -> *) (f :: * -> *) a.
(Monad m, IsMap f) =>
Fold m (Key f, a) (f a)
kvToMapOverwriteGeneric =
    forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
foldl' (\f a
kv (Key f
k, a
v) -> forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert Key f
k a
v f a
kv) forall (f :: * -> *) a. IsMap f => f a
IsMap.mapEmpty

{-# INLINE demuxToContainer #-}
demuxToContainer :: (Monad m, IsMap f, Traversable f) =>
    (a -> Key f) -> (a -> m (Fold m a b)) -> Fold m a (f b)
demuxToContainer :: forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, IsMap f, Traversable f) =>
(a -> Key f) -> (a -> m (Fold m a b)) -> Fold m a (f b)
demuxToContainer a -> Key f
getKey a -> m (Fold m a b)
getFold =
    let
        classifier :: Fold m a (m (f b), Maybe (Key f, b))
classifier = forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, IsMap f, Traversable f) =>
(a -> Key f)
-> (a -> m (Fold m a b)) -> Fold m a (m (f b), Maybe (Key f, b))
demuxGeneric a -> Key f
getKey a -> m (Fold m a b)
getFold
        getMap :: Maybe (f (f a)) -> f (f a)
getMap Maybe (f (f a))
Nothing = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall (f :: * -> *) a. IsMap f => f a
IsMap.mapEmpty
        getMap (Just f (f a)
action) = f (f a)
action
        aggregator :: Fold m (m (f a), Maybe (Key f, a)) (f a)
aggregator =
            forall (m :: * -> *) a b c x.
Monad m =>
(a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
teeWith forall (f :: * -> *) a. IsMap f => f a -> f a -> f a
IsMap.mapUnion
                (forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> Fold m a b -> Fold m a c
rmapM forall {f :: * -> *} {f :: * -> *} {a}.
(Applicative f, IsMap f) =>
Maybe (f (f a)) -> f (f a)
getMap forall a b. (a -> b) -> a -> b
$ forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
lmap forall a b. (a, b) -> a
fst forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
latest)
                (forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
lmap forall a b. (a, b) -> b
snd forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m (Maybe a) b
catMaybes forall (m :: * -> *) (f :: * -> *) a.
(Monad m, IsMap f) =>
Fold m (Key f, a) (f a)
kvToMapOverwriteGeneric)
    in forall (m :: * -> *) a b c.
Monad m =>
Fold m a b -> Fold m b c -> Fold m a c
postscan Fold m a (m (f b), Maybe (Key f, b))
classifier forall {a}. Fold m (m (f a), Maybe (Key f, a)) (f a)
aggregator

-- | This collects all the results of 'demux' in a Map.
--
{-# INLINE demuxToMap #-}
demuxToMap :: (Monad m, Ord k) =>
    (a -> k) -> (a -> m (Fold m a b)) -> Fold m a (Map k b)
demuxToMap :: forall (m :: * -> *) k a b.
(Monad m, Ord k) =>
(a -> k) -> (a -> m (Fold m a b)) -> Fold m a (Map k b)
demuxToMap = forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, IsMap f, Traversable f) =>
(a -> Key f) -> (a -> m (Fold m a b)) -> Fold m a (f b)
demuxToContainer

{-# INLINE demuxToContainerIO #-}
demuxToContainerIO :: (MonadIO m, IsMap f, Traversable f) =>
    (a -> Key f) -> (a -> m (Fold m a b)) -> Fold m a (f b)
demuxToContainerIO :: forall (m :: * -> *) (f :: * -> *) a b.
(MonadIO m, IsMap f, Traversable f) =>
(a -> Key f) -> (a -> m (Fold m a b)) -> Fold m a (f b)
demuxToContainerIO a -> Key f
getKey a -> m (Fold m a b)
getFold =
    let
        classifier :: Fold m a (m (f b), Maybe (Key f, b))
classifier = forall (m :: * -> *) (f :: * -> *) a b.
(MonadIO m, IsMap f, Traversable f) =>
(a -> Key f)
-> (a -> m (Fold m a b)) -> Fold m a (m (f b), Maybe (Key f, b))
demuxGenericIO a -> Key f
getKey a -> m (Fold m a b)
getFold
        getMap :: Maybe (f (f a)) -> f (f a)
getMap Maybe (f (f a))
Nothing = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall (f :: * -> *) a. IsMap f => f a
IsMap.mapEmpty
        getMap (Just f (f a)
action) = f (f a)
action
        aggregator :: Fold m (m (f a), Maybe (Key f, a)) (f a)
aggregator =
            forall (m :: * -> *) a b c x.
Monad m =>
(a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
teeWith forall (f :: * -> *) a. IsMap f => f a -> f a -> f a
IsMap.mapUnion
                (forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> Fold m a b -> Fold m a c
rmapM forall {f :: * -> *} {f :: * -> *} {a}.
(Applicative f, IsMap f) =>
Maybe (f (f a)) -> f (f a)
getMap forall a b. (a -> b) -> a -> b
$ forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
lmap forall a b. (a, b) -> a
fst forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
latest)
                (forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
lmap forall a b. (a, b) -> b
snd forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m (Maybe a) b
catMaybes forall (m :: * -> *) (f :: * -> *) a.
(Monad m, IsMap f) =>
Fold m (Key f, a) (f a)
kvToMapOverwriteGeneric)
    in forall (m :: * -> *) a b c.
Monad m =>
Fold m a b -> Fold m b c -> Fold m a c
postscan Fold m a (m (f b), Maybe (Key f, b))
classifier forall {a}. Fold m (m (f a), Maybe (Key f, a)) (f a)
aggregator

-- | Same as 'demuxToMap' but uses 'demuxIO' for better performance.
--
{-# INLINE demuxToMapIO #-}
demuxToMapIO :: (MonadIO m, Ord k) =>
    (a -> k) -> (a -> m (Fold m a b)) -> Fold m a (Map k b)
demuxToMapIO :: forall (m :: * -> *) k a b.
(MonadIO m, Ord k) =>
(a -> k) -> (a -> m (Fold m a b)) -> Fold m a (Map k b)
demuxToMapIO = forall (m :: * -> *) (f :: * -> *) a b.
(MonadIO m, IsMap f, Traversable f) =>
(a -> Key f) -> (a -> m (Fold m a b)) -> Fold m a (f b)
demuxToContainerIO

{-# INLINE demuxKvToContainer #-}
demuxKvToContainer :: (Monad m, IsMap f, Traversable f) =>
    (Key f -> m (Fold m a b)) -> Fold m (Key f, a) (f b)
demuxKvToContainer :: forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, IsMap f, Traversable f) =>
(Key f -> m (Fold m a b)) -> Fold m (Key f, a) (f b)
demuxKvToContainer Key f -> m (Fold m a b)
f = forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, IsMap f, Traversable f) =>
(a -> Key f) -> (a -> m (Fold m a b)) -> Fold m a (f b)
demuxToContainer forall a b. (a, b) -> a
fst (\(Key f
k, a
_) -> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
lmap forall a b. (a, b) -> b
snd) (Key f -> m (Fold m a b)
f Key f
k))

-- | Fold a stream of key value pairs using a function that maps keys to folds.
--
-- Definition:
--
-- >>> demuxKvToMap f = Fold.demuxToContainer fst (Fold.lmap snd . f)
--
-- Example:
--
-- >>> import Data.Map (Map)
-- >>> :{
--  let f "SUM" = return Fold.sum
--      f _ = return Fold.product
--      input = Stream.fromList [("SUM",1),("PRODUCT",2),("SUM",3),("PRODUCT",4)]
--   in Stream.fold (Fold.demuxKvToMap f) input :: IO (Map String Int)
-- :}
-- fromList [("PRODUCT",8),("SUM",4)]
--
-- /Pre-release/
{-# INLINE demuxKvToMap #-}
demuxKvToMap :: (Monad m, Ord k) =>
    (k -> m (Fold m a b)) -> Fold m (k, a) (Map k b)
demuxKvToMap :: forall (m :: * -> *) k a b.
(Monad m, Ord k) =>
(k -> m (Fold m a b)) -> Fold m (k, a) (Map k b)
demuxKvToMap = forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, IsMap f, Traversable f) =>
(Key f -> m (Fold m a b)) -> Fold m (Key f, a) (f b)
demuxKvToContainer

------------------------------------------------------------------------------
-- Classify: Like demux but uses the same fold for all keys.
------------------------------------------------------------------------------

-- XXX Change these to make the behavior similar to demux* variants. We can
-- implement this using classifyScanManyWith. Maintain a set of done folds in
-- the underlying monad, and when initial is called look it up, if the fold is
-- done then initial would set a flag in the state to ignore the input or
-- return an error.

{-# INLINE classifyGeneric #-}
classifyGeneric :: (Monad m, IsMap f, Traversable f, Ord (Key f)) =>
    -- Note: we need to return the Map itself to display the in-progress values
    -- e.g. to implement top. We could possibly create a separate abstraction
    -- for that use case. We return an action because we want it to be lazy so
    -- that the downstream consumers can choose to process or discard it.
    (a -> Key f) -> Fold m a b -> Fold m a (m (f b), Maybe (Key f, b))
classifyGeneric :: forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, IsMap f, Traversable f, Ord (Key f)) =>
(a -> Key f) -> Fold m a b -> Fold m a (m (f b), Maybe (Key f, b))
classifyGeneric a -> Key f
f (Fold s -> a -> m (Step s b)
step1 m (Step s b)
initial1 s -> m b
extract1 s -> m b
final1) =
    forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold (\Tuple3' (f s) (Set (Key f)) (Maybe (Key f, b))
s a
a -> forall s b. s -> Step s b
Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall {c}.
Tuple3' (f s) (Set (Key f)) c
-> a -> m (Tuple3' (f s) (Set (Key f)) (Maybe (Key f, b)))
step Tuple3' (f s) (Set (Key f)) (Maybe (Key f, b))
s a
a) (forall s b. s -> Step s b
Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall {a} {a} {a}. m (Tuple3' (f a) (Set a) (Maybe a))
initial) forall {t :: * -> *} {m :: * -> *} {b} {b}.
(Traversable t, Monad m) =>
Tuple3' (t s) b b -> m (m (t b), b)
extract forall {m :: * -> *} {f :: * -> *} {b}.
(Monad m, IsMap f, Ord (Key f)) =>
Tuple3' (f s) (Set (Key f)) b -> m (m (f b), b)
final

    where

    initial :: m (Tuple3' (f a) (Set a) (Maybe a))
initial = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' forall (f :: * -> *) a. IsMap f => f a
IsMap.mapEmpty forall a. Set a
Set.empty forall a. Maybe a
Nothing

    {-# INLINE initFold #-}
    initFold :: f s
-> Set (Key f)
-> Key f
-> a
-> m (Tuple3' (f s) (Set (Key f)) (Maybe (Key f, b)))
initFold f s
kv Set (Key f)
set Key f
k a
a = do
        Step s b
x <- m (Step s b)
initial1
        case Step s b
x of
              Partial s
s -> do
                Step s b
r <- s -> a -> m (Step s b)
step1 s
s a
a
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ case Step s b
r of
                          Partial s
s1 ->
                            forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' (forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert Key f
k s
s1 f s
kv) Set (Key f)
set forall a. Maybe a
Nothing
                          Done b
b ->
                            forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' f s
kv Set (Key f)
set (forall a. a -> Maybe a
Just (Key f
k, b
b))
              Done b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' f s
kv (forall a. Ord a => a -> Set a -> Set a
Set.insert Key f
k Set (Key f)
set) (forall a. a -> Maybe a
Just (Key f
k, b
b)))

    step :: Tuple3' (f s) (Set (Key f)) c
-> a -> m (Tuple3' (f s) (Set (Key f)) (Maybe (Key f, b)))
step (Tuple3' f s
kv Set (Key f)
set c
_) a
a = do
        let k :: Key f
k = a -> Key f
f a
a
        case forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
k f s
kv of
            Maybe s
Nothing -> do
                if forall a. Ord a => a -> Set a -> Bool
Set.member Key f
k Set (Key f)
set
                then forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' f s
kv Set (Key f)
set forall a. Maybe a
Nothing)
                else forall {f :: * -> *}.
(IsMap f, Ord (Key f)) =>
f s
-> Set (Key f)
-> Key f
-> a
-> m (Tuple3' (f s) (Set (Key f)) (Maybe (Key f, b)))
initFold f s
kv Set (Key f)
set Key f
k a
a
            Just s
s -> do
                Step s b
r <- s -> a -> m (Step s b)
step1 s
s a
a
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ case Step s b
r of
                          Partial s
s1 ->
                            forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' (forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert Key f
k s
s1 f s
kv) Set (Key f)
set forall a. Maybe a
Nothing
                          Done b
b ->
                            let kv1 :: f s
kv1 = forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
k f s
kv
                             in forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' f s
kv1 (forall a. Ord a => a -> Set a -> Set a
Set.insert Key f
k Set (Key f)
set) (forall a. a -> Maybe a
Just (Key f
k, b
b))

    extract :: Tuple3' (t s) b b -> m (m (t b), b)
extract (Tuple3' t s
kv b
_ b
x) = forall (m :: * -> *) a. Monad m => a -> m a
return (forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
Prelude.mapM s -> m b
extract1 t s
kv, b
x)

    final :: Tuple3' (f s) (Set (Key f)) b -> m (m (f b), b)
final (Tuple3' f s
kv Set (Key f)
set b
x) = forall (m :: * -> *) a. Monad m => a -> m a
return (forall (f :: * -> *) (t :: * -> *) a b.
(IsMap f, Applicative t) =>
(Key f -> a -> t b) -> f a -> t (f b)
IsMap.mapTraverseWithKey Key f -> s -> m b
f1 f s
kv, b
x)

        where

        f1 :: Key f -> s -> m b
f1 Key f
k s
s = do
            if forall a. Ord a => a -> Set a -> Bool
Set.member Key f
k Set (Key f)
set
            then s -> m b
extract1 s
s
            else s -> m b
final1 s
s

-- | Folds the values for each key using the supplied fold. When scanning, as
-- soon as the fold is complete, its result is available in the second
-- component of the tuple.  The first component of the tuple is a snapshot of
-- the in-progress folds.
--
-- Once the fold for a key is done, any future values of the key are ignored.
--
-- Definition:
--
-- >>> classify f fld = Fold.demux f (const fld)
--
{-# INLINE classify #-}
classify :: (Monad m, Ord k) =>
    (a -> k) -> Fold m a b -> Fold m a (m (Map k b), Maybe (k, b))
classify :: forall (m :: * -> *) k a b.
(Monad m, Ord k) =>
(a -> k) -> Fold m a b -> Fold m a (m (Map k b), Maybe (k, b))
classify = forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, IsMap f, Traversable f, Ord (Key f)) =>
(a -> Key f) -> Fold m a b -> Fold m a (m (f b), Maybe (Key f, b))
classifyGeneric

-- XXX we can use a Prim IORef if we can constrain the state "s" to be Prim
--
-- The code is almost the same as classifyGeneric except the IORef operations.

{-# INLINE classifyGenericIO #-}
classifyGenericIO :: (MonadIO m, IsMap f, Traversable f, Ord (Key f)) =>
    (a -> Key f) -> Fold m a b -> Fold m a (m (f b), Maybe (Key f, b))
classifyGenericIO :: forall (m :: * -> *) (f :: * -> *) a b.
(MonadIO m, IsMap f, Traversable f, Ord (Key f)) =>
(a -> Key f) -> Fold m a b -> Fold m a (m (f b), Maybe (Key f, b))
classifyGenericIO a -> Key f
f (Fold s -> a -> m (Step s b)
step1 m (Step s b)
initial1 s -> m b
extract1 s -> m b
final1) =
    forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold (\Tuple3' (f (IORef s)) (Set (Key f)) (Maybe (Key f, b))
s a
a -> forall s b. s -> Step s b
Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall {c}.
Tuple3' (f (IORef s)) (Set (Key f)) c
-> a -> m (Tuple3' (f (IORef s)) (Set (Key f)) (Maybe (Key f, b)))
step Tuple3' (f (IORef s)) (Set (Key f)) (Maybe (Key f, b))
s a
a) (forall s b. s -> Step s b
Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall {a} {a} {a}. m (Tuple3' (f a) (Set a) (Maybe a))
initial) forall {t :: * -> *} {m :: * -> *} {b} {b}.
(Traversable t, Monad m) =>
Tuple3' (t (IORef s)) b b -> m (m (t b), b)
extract forall {m :: * -> *} {f :: * -> *} {b}.
(Monad m, IsMap f, Ord (Key f)) =>
Tuple3' (f (IORef s)) (Set (Key f)) b -> m (m (f b), b)
final

    where

    initial :: m (Tuple3' (f a) (Set a) (Maybe a))
initial = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' forall (f :: * -> *) a. IsMap f => f a
IsMap.mapEmpty forall a. Set a
Set.empty forall a. Maybe a
Nothing

    {-# INLINE initFold #-}
    initFold :: f (IORef s)
-> Set (Key f)
-> Key f
-> a
-> m (Tuple3' (f (IORef s)) (Set (Key f)) (Maybe (Key f, b)))
initFold f (IORef s)
kv Set (Key f)
set Key f
k a
a = do
        Step s b
x <- m (Step s b)
initial1
        case Step s b
x of
              Partial s
s -> do
                Step s b
r <- s -> a -> m (Step s b)
step1 s
s a
a
                case Step s b
r of
                      Partial s
s1 -> do
                        IORef s
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef s
s1
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' (forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert Key f
k IORef s
ref f (IORef s)
kv) Set (Key f)
set forall a. Maybe a
Nothing
                      Done b
b ->
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' f (IORef s)
kv Set (Key f)
set (forall a. a -> Maybe a
Just (Key f
k, b
b))
              Done b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' f (IORef s)
kv (forall a. Ord a => a -> Set a -> Set a
Set.insert Key f
k Set (Key f)
set) (forall a. a -> Maybe a
Just (Key f
k, b
b)))

    step :: Tuple3' (f (IORef s)) (Set (Key f)) c
-> a -> m (Tuple3' (f (IORef s)) (Set (Key f)) (Maybe (Key f, b)))
step (Tuple3' f (IORef s)
kv Set (Key f)
set c
_) a
a = do
        let k :: Key f
k = a -> Key f
f a
a
        case forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
k f (IORef s)
kv of
            Maybe (IORef s)
Nothing -> do
                if forall a. Ord a => a -> Set a -> Bool
Set.member Key f
k Set (Key f)
set
                then forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' f (IORef s)
kv Set (Key f)
set forall a. Maybe a
Nothing)
                else forall {f :: * -> *}.
(IsMap f, Ord (Key f)) =>
f (IORef s)
-> Set (Key f)
-> Key f
-> a
-> m (Tuple3' (f (IORef s)) (Set (Key f)) (Maybe (Key f, b)))
initFold f (IORef s)
kv Set (Key f)
set Key f
k a
a
            Just IORef s
ref -> do
                s
s <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef s
ref
                Step s b
r <- s -> a -> m (Step s b)
step1 s
s a
a
                case Step s b
r of
                      Partial s
s1 -> do
                        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef s
ref s
s1
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' f (IORef s)
kv Set (Key f)
set forall a. Maybe a
Nothing
                      Done b
b ->
                        let kv1 :: f (IORef s)
kv1 = forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
k f (IORef s)
kv
                         in forall (m :: * -> *) a. Monad m => a -> m a
return
                                forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' f (IORef s)
kv1 (forall a. Ord a => a -> Set a -> Set a
Set.insert Key f
k Set (Key f)
set) (forall a. a -> Maybe a
Just (Key f
k, b
b))

    extract :: Tuple3' (t (IORef s)) b b -> m (m (t b), b)
extract (Tuple3' t (IORef s)
kv b
_ b
x) = forall (m :: * -> *) a. Monad m => a -> m a
return (forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
Prelude.mapM IORef s -> m b
g t (IORef s)
kv, b
x)

        where

        g :: IORef s -> m b
g IORef s
ref = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a. IORef a -> IO a
readIORef IORef s
ref) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> m b
extract1

    final :: Tuple3' (f (IORef s)) (Set (Key f)) b -> m (m (f b), b)
final (Tuple3' f (IORef s)
kv Set (Key f)
set b
x) = forall (m :: * -> *) a. Monad m => a -> m a
return (forall (f :: * -> *) (t :: * -> *) a b.
(IsMap f, Applicative t) =>
(Key f -> a -> t b) -> f a -> t (f b)
IsMap.mapTraverseWithKey Key f -> IORef s -> m b
g f (IORef s)
kv, b
x)

        where

        g :: Key f -> IORef s -> m b
g Key f
k IORef s
ref = do
            s
s <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef s
ref
            if forall a. Ord a => a -> Set a -> Bool
Set.member Key f
k Set (Key f)
set
            then s -> m b
extract1 s
s
            else s -> m b
final1 s
s

-- | Same as classify except that it uses mutable IORef cells in the
-- Map providing better performance. Be aware that if this is used as a scan,
-- the values in the intermediate Maps would be mutable.
--
-- Definitions:
--
-- >>> classifyIO f fld = Fold.demuxIO f (const fld)
--
{-# INLINE classifyIO #-}
classifyIO :: (MonadIO m, Ord k) =>
    (a -> k) -> Fold m a b -> Fold m a (m (Map k b), Maybe (k, b))
classifyIO :: forall (m :: * -> *) k a b.
(MonadIO m, Ord k) =>
(a -> k) -> Fold m a b -> Fold m a (m (Map k b), Maybe (k, b))
classifyIO = forall (m :: * -> *) (f :: * -> *) a b.
(MonadIO m, IsMap f, Traversable f, Ord (Key f)) =>
(a -> Key f) -> Fold m a b -> Fold m a (m (f b), Maybe (Key f, b))
classifyGenericIO

{-# INLINE toContainer #-}
toContainer :: (Monad m, IsMap f, Traversable f, Ord (Key f)) =>
    (a -> Key f) -> Fold m a b -> Fold m a (f b)
toContainer :: forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, IsMap f, Traversable f, Ord (Key f)) =>
(a -> Key f) -> Fold m a b -> Fold m a (f b)
toContainer a -> Key f
f Fold m a b
fld =
    let
        classifier :: Fold m a (m (f b), Maybe (Key f, b))
classifier = forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, IsMap f, Traversable f, Ord (Key f)) =>
(a -> Key f) -> Fold m a b -> Fold m a (m (f b), Maybe (Key f, b))
classifyGeneric a -> Key f
f Fold m a b
fld
        getMap :: Maybe (f (f a)) -> f (f a)
getMap Maybe (f (f a))
Nothing = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall (f :: * -> *) a. IsMap f => f a
IsMap.mapEmpty
        getMap (Just f (f a)
action) = f (f a)
action
        aggregator :: Fold m (m (f a), Maybe (Key f, a)) (f a)
aggregator =
            forall (m :: * -> *) a b c x.
Monad m =>
(a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
teeWith forall (f :: * -> *) a. IsMap f => f a -> f a -> f a
IsMap.mapUnion
                (forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> Fold m a b -> Fold m a c
rmapM forall {f :: * -> *} {f :: * -> *} {a}.
(Applicative f, IsMap f) =>
Maybe (f (f a)) -> f (f a)
getMap forall a b. (a -> b) -> a -> b
$ forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
lmap forall a b. (a, b) -> a
fst forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
latest)
                (forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
lmap forall a b. (a, b) -> b
snd forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m (Maybe a) b
catMaybes forall (m :: * -> *) (f :: * -> *) a.
(Monad m, IsMap f) =>
Fold m (Key f, a) (f a)
kvToMapOverwriteGeneric)
    in forall (m :: * -> *) a b c.
Monad m =>
Fold m a b -> Fold m b c -> Fold m a c
postscan Fold m a (m (f b), Maybe (Key f, b))
classifier forall {a}. Fold m (m (f a), Maybe (Key f, a)) (f a)
aggregator

-- | Split the input stream based on a key field and fold each split using the
-- given fold. Useful for map/reduce, bucketizing the input in different bins
-- or for generating histograms.
--
-- Example:
--
-- >>> import Data.Map.Strict (Map)
-- >>> :{
--  let input = Stream.fromList [("ONE",1),("ONE",1.1),("TWO",2), ("TWO",2.2)]
--      classify = Fold.toMap fst (Fold.lmap snd Fold.toList)
--   in Stream.fold classify input :: IO (Map String [Double])
-- :}
-- fromList [("ONE",[1.0,1.1]),("TWO",[2.0,2.2])]
--
-- Once the classifier fold terminates for a particular key any further inputs
-- in that bucket are ignored.
--
-- Space used is proportional to the number of keys seen till now and
-- monotonically increases because it stores whether a key has been seen or
-- not.
--
-- See 'demuxToMap' for a more powerful version where you can use a different
-- fold for each key. A simpler version of 'toMap' retaining only the last
-- value for a key can be written as:
--
-- >>> toMap = Fold.foldl' (\kv (k, v) -> Map.insert k v kv) Map.empty
--
-- /Stops: never/
--
-- /Pre-release/
--
{-# INLINE toMap #-}
toMap :: (Monad m, Ord k) =>
    (a -> k) -> Fold m a b -> Fold m a (Map k b)
toMap :: forall (m :: * -> *) k a b.
(Monad m, Ord k) =>
(a -> k) -> Fold m a b -> Fold m a (Map k b)
toMap = forall (m :: * -> *) (f :: * -> *) a b.
(Monad m, IsMap f, Traversable f, Ord (Key f)) =>
(a -> Key f) -> Fold m a b -> Fold m a (f b)
toContainer

{-# INLINE toContainerIO #-}
toContainerIO :: (MonadIO m, IsMap f, Traversable f, Ord (Key f)) =>
    (a -> Key f) -> Fold m a b -> Fold m a (f b)
toContainerIO :: forall (m :: * -> *) (f :: * -> *) a b.
(MonadIO m, IsMap f, Traversable f, Ord (Key f)) =>
(a -> Key f) -> Fold m a b -> Fold m a (f b)
toContainerIO a -> Key f
f Fold m a b
fld =
    let
        classifier :: Fold m a (m (f b), Maybe (Key f, b))
classifier = forall (m :: * -> *) (f :: * -> *) a b.
(MonadIO m, IsMap f, Traversable f, Ord (Key f)) =>
(a -> Key f) -> Fold m a b -> Fold m a (m (f b), Maybe (Key f, b))
classifyGenericIO a -> Key f
f Fold m a b
fld
        getMap :: Maybe (f (f a)) -> f (f a)
getMap Maybe (f (f a))
Nothing = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall (f :: * -> *) a. IsMap f => f a
IsMap.mapEmpty
        getMap (Just f (f a)
action) = f (f a)
action
        aggregator :: Fold m (m (f a), Maybe (Key f, a)) (f a)
aggregator =
            forall (m :: * -> *) a b c x.
Monad m =>
(a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
teeWith forall (f :: * -> *) a. IsMap f => f a -> f a -> f a
IsMap.mapUnion
                (forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> Fold m a b -> Fold m a c
rmapM forall {f :: * -> *} {f :: * -> *} {a}.
(Applicative f, IsMap f) =>
Maybe (f (f a)) -> f (f a)
getMap forall a b. (a -> b) -> a -> b
$ forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
lmap forall a b. (a, b) -> a
fst forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
latest)
                (forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
lmap forall a b. (a, b) -> b
snd forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m (Maybe a) b
catMaybes forall (m :: * -> *) (f :: * -> *) a.
(Monad m, IsMap f) =>
Fold m (Key f, a) (f a)
kvToMapOverwriteGeneric)
    in forall (m :: * -> *) a b c.
Monad m =>
Fold m a b -> Fold m b c -> Fold m a c
postscan Fold m a (m (f b), Maybe (Key f, b))
classifier forall {a}. Fold m (m (f a), Maybe (Key f, a)) (f a)
aggregator

-- | Same as 'toMap' but maybe faster because it uses mutable cells as
-- fold accumulators in the Map.
--
{-# INLINE toMapIO #-}
toMapIO :: (MonadIO m, Ord k) =>
    (a -> k) -> Fold m a b -> Fold m a (Map k b)
toMapIO :: forall (m :: * -> *) k a b.
(MonadIO m, Ord k) =>
(a -> k) -> Fold m a b -> Fold m a (Map k b)
toMapIO = forall (m :: * -> *) (f :: * -> *) a b.
(MonadIO m, IsMap f, Traversable f, Ord (Key f)) =>
(a -> Key f) -> Fold m a b -> Fold m a (f b)
toContainerIO

-- | Given an input stream of key value pairs and a fold for values, fold all
-- the values belonging to each key.  Useful for map/reduce, bucketizing the
-- input in different bins or for generating histograms.
--
-- Definition:
--
-- >>> kvToMap = Fold.toMap fst . Fold.lmap snd
--
-- Example:
--
-- >>> :{
--  let input = Stream.fromList [("ONE",1),("ONE",1.1),("TWO",2), ("TWO",2.2)]
--   in Stream.fold (Fold.kvToMap Fold.toList) input
-- :}
-- fromList [("ONE",[1.0,1.1]),("TWO",[2.0,2.2])]
--
-- /Pre-release/
{-# INLINE kvToMap #-}
kvToMap :: (Monad m, Ord k) => Fold m a b -> Fold m (k, a) (Map k b)
kvToMap :: forall (m :: * -> *) k a b.
(Monad m, Ord k) =>
Fold m a b -> Fold m (k, a) (Map k b)
kvToMap = forall (m :: * -> *) k a b.
(Monad m, Ord k) =>
(a -> k) -> Fold m a b -> Fold m a (Map k b)
toMap forall a b. (a, b) -> a
fst forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
lmap forall a b. (a, b) -> b
snd

-- | Determine the frequency of each element in the stream.
--
-- You can just collect the keys of the resulting map to get the unique
-- elements in the stream.
--
-- Definition:
--
-- >>> frequency = Fold.toMap id Fold.length
--
{-# INLINE frequency #-}
frequency :: (Monad m, Ord a) => Fold m a (Map a Int)
frequency :: forall (m :: * -> *) a. (Monad m, Ord a) => Fold m a (Map a Int)
frequency = forall (m :: * -> *) k a b.
(Monad m, Ord k) =>
(a -> k) -> Fold m a b -> Fold m a (Map k b)
toMap forall a. a -> a
id forall (m :: * -> *) a. Monad m => Fold m a Int
length