{- Copyright 2008-2010 Mario Blazevic This file is part of the Streaming Component Combinators (SCC) project. The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SCC. If not, see . -} {-# LANGUAGE ScopedTypeVariables, Rank2Types, KindSignatures, EmptyDataDecls, MultiParamTypeClasses, FlexibleContexts, FlexibleInstances, FunctionalDependencies, TypeFamilies #-} -- | The "Combinators" module defines combinators applicable to values of the 'Transducer' and 'Splitter' types defined -- in the "Control.Concurrent.SCC.Types" module. module Control.Concurrent.SCC.Combinators (-- * Consumer, producer, and transducer combinators splitterToMarker, consumeBy, prepend, append, substitute, PipeableComponentPair (compose), JoinableComponentPair (join, sequence), -- * Pseudo-logic splitter combinators -- | Combinators 'sAnd' and 'sOr' are only /pseudo/-logic. While the laws of double negation and De Morgan's laws -- hold, 'sAnd' and 'sOr' are in general not commutative, associative, nor idempotent. In the special case when all -- argument splitters are stateless, such as those produced by 'Control.Concurrent.SCC.Types.statelessSplitter', -- these combinators do satisfy all laws of Boolean algebra. sNot, sAnd, sOr, -- ** Zipping logic combinators -- | The 'pAnd' and 'pOr' combinators run the argument splitters in parallel and combine their logical outputs using -- the corresponding logical operation on each output pair, in a manner similar to 'Data.List.zipWith'. They fully -- satisfy the laws of Boolean algebra. pAnd, pOr, -- * Flow-control combinators -- | The following combinators resemble the common flow-control programming language constructs. Combinators -- 'wherever', 'unless', and 'select' are just the special cases of the combinator 'ifs'. -- -- * /transducer/ ``wherever`` /splitter/ = 'ifs' /splitter/ /transducer/ 'Control.Category.id' -- -- * /transducer/ ``unless`` /splitter/ = 'ifs' /splitter/ 'Control.Category.id' /transducer/ -- -- * 'select' /splitter/ = 'ifs' /splitter/ 'Control.Category.id' -- 'Control.Concurrent.SCC.Primitives.suppress' -- ifs, wherever, unless, select, -- ** Recursive while, nestedIn, -- * Section-based combinators -- | All combinators in this section use their 'Control.Concurrent.SCC.Splitter' argument to determine the structure -- of the input. Every contiguous portion of the input that gets passed to one or the other sink of the splitter is -- treated as one section in the logical structure of the input stream. What is done with the section depends on the -- combinator, but the sections, and therefore the logical structure of the input stream, are determined by the -- argument splitter alone. foreach, having, havingOnly, followedBy, even, -- ** first and its variants first, uptoFirst, prefix, -- ** last and its variants last, lastAndAfter, suffix, -- ** positional splitters startOf, endOf, -- ** input ranges between, -- * parser support parseRegions, parseNestedRegions, -- * helper functions groupMarks, findsTrueIn, findsFalseIn, teeConsumers) where import Control.Monad.Coroutine import Control.Monad.Parallel (MonadParallel(..)) import Control.Concurrent.SCC.Streams import Control.Concurrent.SCC.Types import Prelude hiding (even, last, sequence) import Control.Category ((>>>)) import Control.Monad (liftM, when) import qualified Control.Monad as Monad import Control.Monad.Trans (lift) import Data.Maybe (isJust, isNothing, fromJust) import qualified Data.Foldable as Foldable import qualified Data.Sequence as Seq import Data.Sequence (Seq, (|>), (><), ViewL (EmptyL, (:<))) import qualified Control.Category import qualified Data.List -- | Converts a 'Consumer' into a 'Transducer' with no output. consumeBy :: forall m x y r. (Monad m) => Consumer m x r -> Transducer m x y consumeBy c = Transducer $ \ source _sink -> consume c source >> return () -- | Class 'PipeableComponentPair' applies to any two components that can be combined into a third component with the -- following properties: -- -- * The input of the result, if any, becomes the input of the first component. -- -- * The output produced by the first child component is consumed by the second child component. -- -- * The result output, if any, is the output of the second component. class PipeableComponentPair (m :: * -> *) w c1 c2 c3 | c1 c2 -> c3, c1 c3 -> c2, c2 c3 -> c2, c1 -> m w, c2 -> m w, c3 -> m where compose :: Bool -> c1 -> c2 -> c3 instance forall m x. (MonadParallel m) => PipeableComponentPair m x (Producer m x ()) (Consumer m x ()) (Performer m ()) where compose parallel p c = let performPipe :: Coroutine Naught m ((), ()) performPipe = pipePS parallel (produce p) (consume c) in Performer (runCoroutine performPipe >> return ()) instance (MonadParallel m) => PipeableComponentPair m y (Transducer m x y) (Consumer m y r) (Consumer m x r) where compose parallel t c = isolateConsumer $ \source-> liftM snd $ pipePS parallel (transduce t source) (consume c) instance (MonadParallel m) => PipeableComponentPair m x (Producer m x r) (Transducer m x y) (Producer m y r) where compose parallel p t = isolateProducer $ \sink-> liftM fst $ pipePS parallel (produce p) (\source-> transduce t source sink) instance MonadParallel m => PipeableComponentPair m y (Transducer m x y) (Transducer m y z) (Transducer m x z) where compose parallel t1 t2 = if parallel then t1 >|> t2 else t1 >>> t2 class CompatibleSignature c cons (m :: * -> *) input output | c -> cons m class AnyListOrUnit c instance AnyListOrUnit [x] instance AnyListOrUnit () instance (AnyListOrUnit x, AnyListOrUnit y) => CompatibleSignature (Performer m r) (PerformerType r) m x y instance AnyListOrUnit y => CompatibleSignature (Consumer m x r) (ConsumerType r) m [x] y instance AnyListOrUnit y => CompatibleSignature (Producer m x r) (ProducerType r) m y [x] instance CompatibleSignature (Transducer m x y) TransducerType m [x] [y] data PerformerType r data ConsumerType r data ProducerType r data TransducerType -- | Class 'JoinableComponentPair' applies to any two components that can be combined into a third component with the -- following properties: -- -- * if both argument components consume input, the input of the combined component gets distributed to both -- components in parallel, -- -- * if both argument components produce output, the output of the combined component is a concatenation of the -- complete output from the first component followed by the complete output of the second component, and -- -- * the 'join' method may apply the components in any order, the 'sequence' method makes sure its first argument -- has completed before using the second one. class (Monad m, CompatibleSignature c1 t1 m x y, CompatibleSignature c2 t2 m x y, CompatibleSignature c3 t3 m x y) => JoinableComponentPair t1 t2 t3 m x y c1 c2 c3 | c1 c2 -> c3, c1 -> t1 m, c2 -> t2 m, c3 -> t3 m x y, t1 m x y -> c1, t2 m x y -> c2, t3 m x y -> c3 where join :: Bool -> c1 -> c2 -> c3 sequence :: c1 -> c2 -> c3 join = const sequence instance forall m x r1 r2. Monad m => JoinableComponentPair (ProducerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Producer m x r2) (Producer m x r2) where sequence p1 p2 = Producer $ \sink-> produce p1 sink >> produce p2 sink instance forall m x. MonadParallel m => JoinableComponentPair (ConsumerType ()) (ConsumerType ()) (ConsumerType ()) m [x] () (Consumer m x ()) (Consumer m x ()) (Consumer m x ()) where join parallel c1 c2 = Consumer (liftM (const ()) . teeConsumers parallel (consume c1) (consume c2)) sequence c1 c2 = Consumer $ \source-> teeConsumers False (consume c1) getList source >>= \((), list)-> pipe (putList list) (consume c2) >> return () instance forall m x y. (MonadParallel m) => JoinableComponentPair TransducerType TransducerType TransducerType m [x] [y] (Transducer m x y) (Transducer m x y) (Transducer m x y) where join parallel t1 t2 = isolateTransducer $ \source sink-> pipe (\buffer-> teeConsumers parallel (\source-> transduce t1 source sink) (\source-> transduce t2 source buffer) source) getList >>= \(_, list)-> putList list sink sequence t1 t2 = isolateTransducer $ \source sink-> teeConsumers False (flip (transduce t1) sink) getList source >>= \(_, list)-> pipe (putList list) (\source-> transduce t2 source sink) >> return () instance forall m r1 r2. MonadParallel m => JoinableComponentPair (PerformerType r1) (PerformerType r2) (PerformerType r2) m () () (Performer m r1) (Performer m r2) (Performer m r2) where join parallel p1 p2 = Performer $ if parallel then bindM2 (const return) (perform p1) (perform p2) else perform p1 >> perform p2 sequence p1 p2 = Performer $ perform p1 >> perform p2 instance forall m x r1 r2. (MonadParallel m) => JoinableComponentPair (PerformerType r1) (ProducerType r2) (ProducerType r2) m () [x] (Performer m r1) (Producer m x r2) (Producer m x r2) where join parallel pe pr = Producer $ \sink-> if parallel then bindM2 (const return) (lift (perform pe)) (produce pr sink) else lift (perform pe) >> produce pr sink sequence pe pr = Producer $ \sink-> lift (perform pe) >> produce pr sink instance forall m x r1 r2. (MonadParallel m) => JoinableComponentPair (ProducerType r1) (PerformerType r2) (ProducerType r2) m () [x] (Producer m x r1) (Performer m r2) (Producer m x r2) where join parallel pr pe = Producer $ \sink-> if parallel then bindM2 (const return) (produce pr sink) (lift (perform pe)) else produce pr sink >> lift (perform pe) sequence pr pe = Producer $ \sink-> produce pr sink >> lift (perform pe) instance forall m x r1 r2. (MonadParallel m) => JoinableComponentPair (PerformerType r1) (ConsumerType r2) (ConsumerType r2) m [x] () (Performer m r1) (Consumer m x r2) (Consumer m x r2) where join parallel p c = Consumer $ \source-> if parallel then bindM2 (const return) (lift (perform p)) (consume c source) else lift (perform p) >> consume c source sequence p c = Consumer $ \source-> lift (perform p) >> consume c source instance forall m x r1 r2. (MonadParallel m) => JoinableComponentPair (ConsumerType r1) (PerformerType r2) (ConsumerType r2) m [x] () (Consumer m x r1) (Performer m r2) (Consumer m x r2) where join parallel c p = Consumer $ \source-> if parallel then bindM2 (const return) (consume c source) (lift (perform p)) else consume c source >> lift (perform p) sequence c p = Consumer $ \source-> consume c source >> lift (perform p) instance forall m x y r. (MonadParallel m) => JoinableComponentPair (PerformerType r) TransducerType TransducerType m [x] [y] (Performer m r) (Transducer m x y) (Transducer m x y) where join parallel p t = Transducer $ \ source sink -> if parallel then bindM2 (const return) (lift (perform p)) (transduce t source sink) else lift (perform p) >> transduce t source sink sequence p t = Transducer $ \ source sink -> lift (perform p) >> transduce t source sink instance forall m x y r. (MonadParallel m) => JoinableComponentPair TransducerType (PerformerType r) TransducerType m [x] [y] (Transducer m x y) (Performer m r) (Transducer m x y) where join parallel t p = Transducer $ \ source sink -> if parallel then bindM2 (const . return) (transduce t source sink) (lift (perform p)) else do result <- transduce t source sink lift (perform p) return result sequence t p = Transducer $ \ source sink -> do result <- transduce t source sink lift (perform p) return result instance forall m x y. (MonadParallel m) => JoinableComponentPair (ProducerType ()) TransducerType TransducerType m [x] [y] (Producer m y ()) (Transducer m x y) (Transducer m x y) where join parallel p t = if parallel then isolateTransducer $ \source sink-> do (rest, out) <- pipe (\buffer-> bindM2 (const return) (produce p sink) (transduce t source buffer)) getList putList out sink return rest else sequence p t sequence p t = Transducer $ \ source sink -> produce p sink >> transduce t source sink instance forall m x y. (MonadParallel m) => JoinableComponentPair TransducerType (ProducerType ()) TransducerType m [x] [y] (Transducer m x y) (Producer m y ()) (Transducer m x y) where join parallel t p = if parallel then isolateTransducer $ \source sink-> do (rest, out) <- pipe (\buffer-> bindM2 (const . return) (transduce t source sink) (produce p buffer)) getList putList out sink return rest else sequence t p sequence t p = Transducer $ \ source sink -> do result <- transduce t source sink produce p sink return result instance forall m x y. (MonadParallel m) => JoinableComponentPair (ConsumerType ()) TransducerType TransducerType m [x] [y] (Consumer m x ()) (Transducer m x y) (Transducer m x y) where join parallel c t = isolateTransducer $ \source sink-> teeConsumers parallel (consume c) (\source-> transduce t source sink) source >> return () sequence c t = isolateTransducer $ \source sink-> teeConsumers False (consume c) getList source >>= \(_, list)-> pipe (putList list) (\source-> transduce t source sink) >> return () instance forall m x y. MonadParallel m => JoinableComponentPair TransducerType (ConsumerType ()) TransducerType m [x] [y] (Transducer m x y) (Consumer m x ()) (Transducer m x y) where join parallel t c = join parallel c t sequence t c = isolateTransducer $ \source sink-> teeConsumers False (\source-> transduce t source sink) getList source >>= \(_, list)-> pipe (putList list) (consume c) >> return () instance forall m x y. (MonadParallel m) => JoinableComponentPair (ProducerType ()) (ConsumerType ()) TransducerType m [x] [y] (Producer m y ()) (Consumer m x ()) (Transducer m x y) where join parallel p c = Transducer $ \ source sink -> if parallel then bindM2 (\ _ _ -> return ()) (produce p sink) (consume c source) else produce p sink >> consume c source sequence p c = Transducer $ \ source sink -> produce p sink >> consume c source instance forall m x y. (MonadParallel m) => JoinableComponentPair (ConsumerType ()) (ProducerType ()) TransducerType m [x] [y] (Consumer m x ()) (Producer m y ()) (Transducer m x y) where join parallel c p = join parallel p c sequence c p = Transducer $ \ source sink -> consume c source >> produce p sink -- | Combinator 'prepend' converts the given producer to a 'Control.Concurrent.SCC.Types.Transducer' that passes all its -- input through unmodified, except for prepending the output of the argument producer to it. The following law holds: @ -- 'prepend' /prefix/ = 'join' ('substitute' /prefix/) 'Control.Category.id' @ prepend :: forall m x r. (Monad m) => Producer m x r -> Transducer m x x prepend prefix = Transducer $ \ source sink -> produce prefix sink >> pour source sink -- | Combinator 'append' converts the given producer to a 'Control.Concurrent.SCC.Types.Transducer' that passes all its -- input through unmodified, finally appending the output of the argument producer to it. The following law holds: @ -- 'append' /suffix/ = 'join' 'Control.Category.id' ('substitute' /suffix/) @ append :: forall m x r. (Monad m) => Producer m x r -> Transducer m x x append suffix = Transducer $ \ source sink -> pour source sink >> produce suffix sink >> return () -- | The 'substitute' combinator converts its argument producer to a 'Control.Concurrent.SCC.Types.Transducer' that -- produces the same output, while consuming its entire input and ignoring it. substitute :: forall m x y r. (Monad m) => Producer m y r -> Transducer m x y substitute feed = Transducer $ \ source sink -> mapMStream_ (const $ return ()) source >> produce feed sink >> return () -- | The 'sNot' (streaming not) combinator simply reverses the outputs of the argument splitter. In other words, data -- that the argument splitter sends to its /true/ sink goes to the /false/ sink of the result, and vice versa. sNot :: forall m x b. Monad m => Splitter m x b -> Splitter m x b sNot splitter = isolateSplitter $ \ source true false edge -> suppressProducer (split splitter source false true) -- | The 'sAnd' combinator sends the /true/ sink output of its left operand to the input of its right operand for -- further splitting. Both operands' /false/ sinks are connected to the /false/ sink of the combined splitter, but any -- input value to reach the /true/ sink of the combined component data must be deemed true by both splitters. sAnd :: forall m x b1 b2. MonadParallel m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2) sAnd parallel s1 s2 = isolateSplitter $ \ source true false edge -> liftM (fst . fst) $ pipe (\edges-> pipePS parallel (\true-> split s1 source true false (mapSink Left edges)) (\source-> split s2 source true false (mapSink Right edges))) (flip intersectRegions edge) intersectRegions source sink = next Nothing Nothing where next lastLeft lastRight = getWith (either (flip pair lastRight . Just) (pair lastLeft . Just)) source pair l@(Just x) r@(Just y) = put sink (x, y) >> next Nothing Nothing pair l r = next l r -- | A 'sOr' combinator's input value can reach its /false/ sink only by going through both argument splitters' /false/ -- sinks. sOr :: forall m x b1 b2. MonadParallel m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2) sOr parallel s1 s2 = isolateSplitter $ \ source true false edge -> liftM fst $ pipePS parallel (\false-> split s1 source true false (mapSink Left edge)) (\source-> split s2 source true false (mapSink Right edge)) -- | Combinator 'pAnd' is a pairwise logical conjunction of two splitters run in parallel on the same input. pAnd :: forall m x b1 b2. MonadParallel m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2) pAnd parallel s1 s2 = isolateSplitter $ \ source true false edge -> pipePS parallel (transduce (splittersToPairMarker parallel s1 s2) source) (\source-> let split l r = getWith (test l r) source test l r (Left (x, t1, t2)) = (if t1 && t2 then put true x else put false x) >> split (if t1 then l else Nothing) (if t2 then r else Nothing) test _ Nothing (Right (Left l)) = split (Just l) Nothing test _ (Just r) (Right (Left l)) = put edge (l, r) >> split (Just l) (Just r) test Nothing _ (Right (Right r)) = split Nothing (Just r) test (Just l) _ (Right (Right r)) = put edge (l, r) >> split (Just l) (Just r) in split Nothing Nothing) >> return () -- | Combinator 'pOr' is a pairwise logical disjunction of two splitters run in parallel on the same input. pOr :: forall c m x b1 b2. MonadParallel m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (Either b1 b2) pOr = zipSplittersWith (||) pour ifs :: forall c m x b. (MonadParallel m, Branching c m x ()) => Bool -> Splitter m x b -> c -> c -> c ifs parallel s c1 c2 = combineBranches if' parallel c1 c2 where if' :: forall d. Bool -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x ()) -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x ()) -> forall a. OpenConsumer m a d x () if' parallel c1 c2 source = splitInputToConsumers parallel s source c1 c2 wherever :: forall m x b. MonadParallel m => Bool -> Transducer m x x -> Splitter m x b -> Transducer m x x wherever parallel t s = isolateTransducer wherever' where wherever' :: forall d. Functor d => Source m d x -> Sink m d x -> Coroutine d m () wherever' source sink = pipePS parallel (\true-> split s source true sink (nullSink :: Sink m d b)) (flip (transduce t) sink) >> return () unless :: forall m x b. MonadParallel m => Bool -> Transducer m x x -> Splitter m x b -> Transducer m x x unless parallel t s = wherever parallel t (sNot s) select :: forall m x b. Monad m => Splitter m x b -> Transducer m x x select s = isolateTransducer $ \source sink-> suppressProducer (suppressProducer . split s source sink) -- | Converts a splitter into a parser. parseRegions :: forall m x b. Monad m => Splitter m x b -> Parser m x b parseRegions s = isolateTransducer $ \source sink-> pipe (transduce (splitterToMarker s) source) (\source-> wrapRegions source sink) >> return () where wrapRegions source sink = let wrap Nothing (Left (x, _)) = put sink (Content x) >> return Nothing wrap (Just p) (Left (x, False)) = flush p >> put sink (Content x) >> return Nothing wrap (Just (b, t)) (Left (x, True)) = do Monad.unless t (put sink (Markup (Start b))) put sink (Content x) return (Just (b, True)) wrap (Just p) (Right b') = flush p >> return (Just (b', False)) wrap Nothing (Right b) = return (Just (b, False)) flush (b, t) = put sink $ Markup $ (if t then End else Point) b in foldMStream wrap Nothing source >>= maybe (return ()) flush -- | Converts a boundary-marking splitter into a parser. parseNestedRegions :: forall m x b. Monad m => Splitter m x (Boundary b) -> Parser m x b parseNestedRegions s = isolateTransducer $ \source sink-> split s source (mapSink Content sink) (mapSink Content sink) (mapSink Markup sink) -- | The recursive combinator 'while' feeds the true sink of the argument splitter back to itself, modified by the -- argument transducer. Data fed to the splitter's false sink is passed on unmodified. while :: forall m x b. MonadParallel m => [(Bool, (Transducer m x x, Splitter m x b))] -> Transducer m x x while ((parallel, (t, s)) : rest) = isolateTransducer while' where while' :: forall d. Functor d => Source m d x -> Sink m d x -> Coroutine d m () while' source sink = pipePS parallel (\true-> split s source true sink (nullSink :: Sink m d b)) (\source-> getWith (\x-> liftM fst $ pipe (\sink-> put sink x >> pour source sink) (\source-> transduce while'' source sink)) source) >> return () while'' = compose parallel t (while rest) -- | The recursive combinator 'nestedIn' combines two splitters into a mutually recursive loop acting as a single -- splitter. The true sink of one of the argument splitters and false sink of the other become the true and false sinks -- of the loop. The other two sinks are bound to the other splitter's source. The use of 'nestedIn' makes sense only -- on hierarchically structured streams. If we gave it some input containing a flat sequence of values, and assuming -- both component splitters are deterministic and stateless, an input value would either not loop at all or it would -- loop forever. nestedIn :: forall m x b. MonadParallel m => [(Bool, (Splitter m x b, Splitter m x b))] -> Splitter m x b nestedIn ((parallel, (s1, s2)) : rest) = isolateSplitter $ \ source true false edge -> liftM fst $ pipePS parallel (\false-> split s1 source true false edge) (\source-> pipe (\true-> split s2 source true false (filterMSink (const $ return False) edge)) (\source-> get source >>= maybe (return ((), ())) (\x-> pipe (\sink-> put sink x >> pour source sink) (\source-> split (nestedIn rest) source true false edge)))) -- | The 'foreach' combinator is similar to the combinator 'ifs' in that it combines a splitter and two transducers into -- another transducer. However, in this case the transducers are re-instantiated for each consecutive portion of the -- input as the splitter chunks it up. Each contiguous portion of the input that the splitter sends to one of its two -- sinks gets transducered through the appropriate argument transducer as that transducer's whole input. As soon as the -- contiguous portion is finished, the transducer gets terminated. foreach :: forall m x b c. (MonadParallel m, Branching c m x ()) => Bool -> Splitter m x b -> c -> c -> c foreach parallel s c1 c2 = combineBranches foreach' parallel c1 c2 where foreach' :: forall d. Bool -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x ()) -> (forall a d'. AncestorFunctor d d' => OpenConsumer m a d' x ()) -> forall a. OpenConsumer m a d x () foreach' parallel c1 c2 source = liftM fst $ pipePS parallel (transduce (splitterToMarker s) (liftSource source :: Source m d x)) (\source-> groupMarks source (maybe c2 (const c1))) -- | The 'having' combinator combines two pure splitters into a pure splitter. One splitter is used to chunk the input -- into contiguous portions. Its /false/ sink is routed directly to the /false/ sink of the combined splitter. The -- second splitter is instantiated and run on each portion of the input that goes to first splitter's /true/ sink. If -- the second splitter sends any output at all to its /true/ sink, the whole input portion is passed on to the /true/ -- sink of the combined splitter, otherwise it goes to its /false/ sink. having :: forall m x b1 b2. MonadParallel m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1 having parallel s1 s2 = isolateSplitter s where s source true false edge = pipePS parallel (transduce (splitterToMarker s1) source) (flip groupMarks test) >> return () where test Nothing chunk = pour chunk false test (Just mb) chunk = teeConsumers False getList (findsTrueIn s2) chunk >>= \(chunk, maybeFound)-> if isJust maybeFound then maybe (return ()) (put edge) mb >> putList chunk true else putList chunk false -- | The 'havingOnly' combinator is analogous to the 'having' combinator, but it succeeds and passes each chunk of the -- input to its /true/ sink only if the second splitter sends no part of it to its /false/ sink. havingOnly :: forall m x b1 b2. MonadParallel m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1 havingOnly parallel s1 s2 = isolateSplitter s where s source true false edge = pipePS parallel (transduce (splitterToMarker s1) source) (flip groupMarks test) >> return () where test Nothing chunk = pour chunk false test (Just mb) chunk = teeConsumers False getList (findsFalseIn s2) chunk >>= \(chunk, anyFalse)-> if anyFalse then putList chunk false else maybe (return ()) (put edge) mb >> putList chunk true -- | The result of combinator 'first' behaves the same as the argument splitter up to and including the first portion of -- the input which goes into the argument's /true/ sink. All input following the first true portion goes into the -- /false/ sink. first :: forall m x b. Monad m => Splitter m x b -> Splitter m x b first splitter = wrapMarkedSplitter splitter $ \source true false edge-> let split 1 (Left (x, False)) = put false x >> return 1 split 1 (Left (x, True)) = put true x >> return 2 split 1 (Right b) = put edge b >> return 2 split 2 b@Right{} = return 3 split 2 (Left (x, True)) = put true x >> return 2 split 2 (Left (x, False)) = put false x >> return 3 split 3 (Left (x, _)) = put false x >> return 3 split 3 (Right _) = return 3 in foldMStream_ split 1 source -- | The result of combinator 'uptoFirst' takes all input up to and including the first portion of the input which goes -- into the argument's /true/ sink and feeds it to the result splitter's /true/ sink. All the rest of the input goes -- into the /false/ sink. The only difference between 'first' and 'uptoFirst' combinators is in where they direct the -- /false/ portion of the input preceding the first /true/ part. uptoFirst :: forall m x b. Monad m => Splitter m x b -> Splitter m x b uptoFirst splitter = wrapMarkedSplitter splitter $ \source true false edge-> let split (Left q) (Left (x, False)) = return (Left (q |> x)) split (Left q) (Left (x, True)) = putQueue q true >> put true x >> return (Right True) split (Left q) (Right b) = putQueue q true >> put edge b >> return (Right True) split (Right True) Right{} = return (Right False) split (Right True) (Left (x, True)) = put true x >> return (Right True) split (Right True) (Left (x, False)) = put false x >> return (Right False) split (Right False) (Left (x, _)) = put false x >> return (Right False) split (Right False) (Right _) = return (Right False) in foldMStream split (Left Seq.empty) source >>= either (flip putQueue false) (const $ return ()) -- | The result of the combinator 'last' is a splitter which directs all input to its /false/ sink, up to the last -- portion of the input which goes to its argument's /true/ sink. That portion of the input is the only one that goes to -- the resulting component's /true/ sink. The splitter returned by the combinator 'last' has to buffer the previous two -- portions of its input, because it cannot know if a true portion of the input is the last one until it sees the end of -- the input or another portion succeeding the previous one. last :: forall m x b. Monad m => Splitter m x b -> Splitter m x b last splitter = wrapMarkedSplitter splitter $ \source true false edge-> let get1 (Left (x, False)) = put false x >> getWith get1 source get1 p@(Left (x, True)) = get2 Nothing Seq.empty p get1 (Right b) = getWith (get2 (Just b) Seq.empty) source get2 mb q (Left (x, True)) = let q' = q |> x in get source >>= maybe (flush mb q') (get2 mb q') get2 mb q p = get3 mb q Seq.empty p get3 mb qt qf (Left (x, False)) = let qf' = qf |> x in get source >>= maybe (flush mb qt >> putQueue qf' false) (get3 mb qt qf') get3 mb qt qf p = do putQueue qt false putQueue qf false get1 p flush mb q = maybe (return ()) (put edge) mb >> putQueue q true in getWith get1 source -- | The result of the combinator 'lastAndAfter' is a splitter which directs all input to its /false/ sink, up to the -- last portion of the input which goes to its argument's /true/ sink. That portion and the remainder of the input is -- fed to the resulting component's /true/ sink. The difference between 'last' and 'lastAndAfter' combinators is where -- they feed the /false/ portion of the input, if any, remaining after the last /true/ part. lastAndAfter :: forall m x b. Monad m => Splitter m x b -> Splitter m x b lastAndAfter splitter = wrapMarkedSplitter splitter $ \source true false edge-> let get1 (Left (x, False)) = put false x >> getWith get1 source get1 p@(Left (x, True)) = get2 Nothing Seq.empty p get1 (Right b) = getWith (get2 (Just b) Seq.empty) source get2 mb q (Left (x, True)) = let q' = q |> x in get source >>= maybe (flush mb q') (get2 mb q') get2 mb q p = get3 mb q p get3 mb q (Left (x, False)) = let q' = q |> x in get source >>= maybe (flush mb q') (get3 mb q') get3 _ q p@(Left (x, True)) = putQueue q false >> get1 p get3 _ q b'@Right{} = putQueue q false >> get1 b' flush mb q = maybe (return ()) (put edge) mb >> putQueue q true in getWith get1 source -- | The 'prefix' combinator feeds its /true/ sink only the prefix of the input that its argument feeds to its /true/ -- sink. All the rest of the input is dumped into the /false/ sink of the result. prefix :: forall m x b. Monad m => Splitter m x b -> Splitter m x b prefix splitter = wrapMarkedSplitter splitter $ \source true false edge-> let split 0 p@Left{} = split 1 p split 0 (Right b) = put edge b >> return 1 split 1 (Left (x, False)) = put false x >> return 2 split 1 (Left (x, True)) = put true x >> return 1 split 1 (Right b) = return 2 split 2 (Left (x, _)) = put false x >> return 2 split 2 Right{} = return 2 in foldMStream_ split 0 source -- | The 'suffix' combinator feeds its /true/ sink only the suffix of the input that its argument feeds to its /true/ -- sink. All the rest of the input is dumped into the /false/ sink of the result. suffix :: forall m x b. Monad m => Splitter m x b -> Splitter m x b suffix splitter = wrapMarkedSplitter splitter $ \source true false edge-> let split Nothing (Left (x, False)) = put false x >> return Nothing split Nothing (Left (x, True)) = return (Just (Nothing, Seq.singleton x)) split Nothing (Right b) = return (Just (Just b, Seq.empty)) split (Just (mb, q)) (Left (x, True)) = return (Just (mb, q |> x)) split (Just (mb, q)) (Left (x, False)) = putQueue q false >> put false x >> return Nothing split (Just (mb, q)) (Right b) = putQueue q false >> return (Just (Just b, Seq.empty)) in foldMStream split Nothing source >>= \r-> case r of Nothing -> return () Just (Nothing, q) -> putQueue q true Just (Just b, q) -> put edge b >> putQueue q true -- | The 'even' combinator takes every input section that its argument /splitter/ deems /true/, and feeds even ones into -- its /true/ sink. The odd sections and parts of input that are /false/ according to its argument splitter are fed to -- 'even' splitter's /false/ sink. even :: forall m x b. Monad m => Splitter m x b -> Splitter m x b even splitter = wrapMarkedSplitter splitter $ \source true false edge-> let split 1 (Left (x, False)) = put false x >> return 1 split 1 p@(Left (x, True)) = split 2 p split 1 (Right b) = return 2 split 2 (Left (x, True)) = put false x >> return 2 split 2 p@(Left (x, False)) = split 3 p split 2 (Right b) = put edge b >> return 4 split 3 (Left (x, False)) = put false x >> return 3 split 3 p@(Left (x, True)) = split 4 p split 3 (Right b) = put edge b >> return 4 split 4 (Left (x, True)) = put true x >> return 4 split 4 p@(Left (x, False)) = split 1 p split 4 (Right b) = return 2 in foldMStream_ split 1 source -- | Splitter 'startOf' issues an empty /true/ section at the beginning of every section considered /true/ by its -- argument splitter, otherwise the entire input goes into its /false/ sink. startOf :: forall m x b. Monad m => Splitter m x b -> Splitter m x (Maybe b) startOf splitter = wrapMarkedSplitter splitter $ \source true false edge-> let split 1 (Left (x, False)) = put false x >> return 1 split 1 p@(Left (x, True)) = put edge Nothing >> split 2 p split 1 (Right b) = put edge (Just b) >> return 2 split 2 (Left (x, True)) = put false x >> return 2 split 2 p = split 1 p in foldMStream_ split 1 source -- | Splitter 'endOf' issues an empty /true/ section at the end of every section considered /true/ by its argument -- splitter, otherwise the entire input goes into its /false/ sink. endOf :: forall m x b. Monad m => Splitter m x b -> Splitter m x (Maybe b) endOf splitter = wrapMarkedSplitter splitter $ \source true false edge-> let split Nothing (Left (x, False)) = put false x >> return Nothing split Nothing p@(Left (x, True)) = split (Just Nothing) p split Nothing (Right b) = return (Just (Just b)) split (Just mb) (Left (x, True)) = put false x >> return (Just mb) split (Just mb) p@(Left (x, False)) = put edge mb >> split Nothing p split (Just mb) (Right b) = put edge mb >> return (Just $ Just b) in foldMStream split Nothing source >>= maybe (return ()) (put edge) -- | Combinator 'followedBy' treats its argument 'Splitter's as patterns components and returns a 'Splitter' that -- matches their concatenation. A section of input is considered /true/ by the result iff its prefix is considered -- /true/ by argument /s1/ and the rest of the section is considered /true/ by /s2/. The splitter /s2/ is started anew -- after every section split to /true/ sink by /s1/. followedBy :: forall m x b1 b2. MonadParallel m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x (b1, b2) followedBy parallel s1 s2 = isolateSplitter $ \ source true false edge -> pipePS parallel (transduce (splitterToMarker s1) source) (\source-> let get0 q = case Seq.viewl q of Seq.EmptyL -> getWith get1 source (Left (x, False)) :< rest -> put false x >> get0 rest (Left (x, True)) :< rest -> get2 Nothing Seq.empty q (Right b) :< rest -> get2 (Just b) Seq.empty rest get1 (Left (x, False)) = put false x >> getWith get1 source get1 p@(Left (x, True)) = get2 Nothing Seq.empty (Seq.singleton p) get1 (Right b) = get2 (Just b) Seq.empty Seq.empty get2 mb q q' = case Seq.viewl q' of Seq.EmptyL -> get source >>= maybe (testEnd mb q) (get2 mb q . Seq.singleton) (Left (x, True)) :< rest -> get2 mb (q |> x) rest (Left (x, False)) :< rest -> get3 mb q q' Right{} :< rest -> get3 mb q q' get3 mb q q' = do ((q1, q2), n) <- pipe (get7 Seq.empty q') (test mb q) case n of Nothing -> putQueue q false >> get0 (q1 >< q2) Just 0 -> get0 (q1 >< q2) Just n -> get8 (Just mb) n (q1 >< q2) get7 q1 q2 sink = case Seq.viewl q2 of Seq.EmptyL -> get source >>= maybe (return (q1, q2)) (\p-> either (put sink . fst) (const $ return ()) p >> get7 (q1 |> p) q2 sink) p :< rest -> either (put sink . fst) (const $ return ()) p >> get7 (q1 |> p) rest sink testEnd mb q = do ((), n) <- pipe (const $ return ()) (test mb q) case n of Nothing -> putQueue q false _ -> return () test mb q source = liftM snd $ pipe (transduce (splitterToMarker s2) source) (\source-> let get4 (Left (_, False)) = return Nothing get4 p@(Left (_, True)) = putQueue q true >> get5 0 p get4 p@(Right b) = maybe (return ()) (\b1-> put edge (b1, b)) mb >> putQueue q true >> get6 0 get5 n (Left (x, True)) = put true x >> get6 (succ n) get5 n _ = return (Just n) get6 n = get source >>= maybe (return $ Just n) (get5 n) in get source >>= maybe (return Nothing) get4) get8 Nothing 0 q = get0 q get8 (Just mb) 0 q = get2 mb Seq.empty q get8 mmb n q = case Seq.viewl q of Left (x, False) :< rest -> get8 Nothing (pred n) rest Left (x, True) :< rest -> get8 (maybe (Just Nothing) Just mmb) (pred n) rest Right b :< rest -> get8 (Just (Just b)) n rest in get0 Seq.empty) >> return () -- | Combinator '...' tracks the running balance of difference between the number of preceding starts of sections -- considered /true/ according to its first argument and the ones according to its second argument. The combinator -- passes to /true/ all input values for which the difference balance is positive. This combinator is typically used -- with 'startOf' and 'endOf' in order to count entire input sections and ignore their lengths. between :: forall m x b1 b2. MonadParallel m => Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b1 between parallel s1 s2 = isolateSplitter $ \ source true false edge -> pipePS parallel (transduce (splittersToPairMarker parallel s1 s2) source) (let pass n x = (if n > 0 then put true x else put false x) >> return n pass' n x = (if n >= 0 then put true x else put false x) >> return n state n (Left (x, True, False)) = pass (succ n) x state n (Left (x, False, True)) = pass' (pred n) x state n (Left (x, True, True)) = pass' n x state n (Left (x, False, False)) = pass n x state 0 (Right (Left b)) = put edge b >> return 1 state n (Right (Left _)) = return (succ n) state n (Right (Right _)) = return (pred n) in foldMStream_ state 0) >> return () -- Helper functions wrapMarkedSplitter :: forall m x b1 b2. Monad m => Splitter m x b1 -> (forall a1 a2 a3 a4 d. (AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d, AncestorFunctor a4 d) => Source m a1 (Either (x, Bool) b1) -> Sink m a2 x -> Sink m a3 x -> Sink m a4 b2 -> Coroutine d m ()) -> Splitter m x b2 wrapMarkedSplitter splitter splitMarked = isolateSplitter $ \ source true false edge -> pipe (transduce (splitterToMarker splitter) source) (\source-> splitMarked source true false edge) >> return () splitterToMarker :: forall m x b. Monad m => Splitter m x b -> Transducer m x (Either (x, Bool) b) splitterToMarker s = isolateTransducer $ \source sink-> split s source (mapSink (\x-> Left (x, True)) sink) (mapSink (\x-> Left (x, False)) sink) (mapSink Right sink) splittersToPairMarker :: forall m x b1 b2. (MonadParallel m) => Bool -> Splitter m x b1 -> Splitter m x b2 -> Transducer m x (Either (x, Bool, Bool) (Either b1 b2)) splittersToPairMarker parallel s1 s2 = let t source sink = pipe (\sync-> teeConsumers parallel (\source1-> split s1 source1 (mapSink (\x-> Left ((x, True), True)) sync) (mapSink (\x-> Left ((x, False), True)) sync) (mapSink (Right. Left) sync)) (\source2-> split s2 source2 (mapSink (\x-> Left ((x, True), False)) sync) (mapSink (\x-> Left ((x, False), False)) sync) (mapSink (Right . Right) sync)) source) (synchronizeMarks sink) >> return () synchronizeMarks :: forall m a1 a2 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d) => Sink m a1 (Either (x, Bool, Bool) (Either b1 b2)) -> Source m a2 (Either ((x, Bool), Bool) (Either b1 b2)) -> Coroutine d m () synchronizeMarks sink source = foldMStream handleMark Nothing source >>= \Nothing-> return () where handleMark Nothing (Right b) = put sink (Right b) >> return Nothing handleMark Nothing (Left (p, first)) = return (Just (Seq.singleton (Left p), first)) handleMark state@(Just (q, first)) (Left (p, first')) | first == first' = return (Just (q |> Left p, first)) handleMark state@(Just (q, True)) (Right b@Left{}) = return (Just (q |> Right b, True)) handleMark state@(Just (q, False)) (Right b@Right{}) = return (Just (q |> Right b, False)) handleMark state (Right b) = put sink (Right b) >> return state handleMark state@(Just (q, pos')) mark@(Left ((x, t), pos)) = case Seq.viewl q of Seq.EmptyL -> return (Just (Seq.singleton (Left (x, t)), pos)) Right b :< rest -> put sink (Right b) >> handleMark (if Seq.null rest then Nothing else Just (rest, pos')) mark Left (y, t') :< rest -> put sink (Left $ if pos then (y, t, t') else (y, t', t)) >> return (if Seq.null rest then Nothing else Just (rest, pos')) returnQueuedList q = return $ concatMap (either ((:[]) . fst) (const [])) $ Foldable.toList $ Seq.viewl q in isolateTransducer t zipSplittersWith :: forall m x b1 b2 b. MonadParallel m => (Bool -> Bool -> Bool) -> (forall a1 a2 d. (AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 (Either b1 b2) -> Sink m a2 b -> Coroutine d m ()) -> Bool -> Splitter m x b1 -> Splitter m x b2 -> Splitter m x b zipSplittersWith f boundaries parallel s1 s2 = isolateSplitter $ \ source true false edge -> pipe (\edge-> pipePS parallel (transduce (splittersToPairMarker parallel s1 s2) source) (mapMStream_ (either (\(x, t1, t2)-> if f t1 t2 then put true x else put false x) (put edge)))) (flip boundaries edge) >> return () -- | Runs the second argument on every contiguous region of input source (typically produced by 'splitterToMarker') -- whose all values either match @Left (_, True)@ or @Left (_, False)@. groupMarks :: (Monad m, AncestorFunctor a d, AncestorFunctor a (SinkFunctor d x)) => Source m a (Either (x, Bool) b) -> (Maybe (Maybe b) -> Source m (SourceFunctor d x) x -> Coroutine (SourceFunctor d x) m r) -> Coroutine d m () groupMarks source getConsumer = start where start = getWith (either startContent startRegion) source startContent (x, False) = pipe (\sink-> pass False sink x) (getConsumer Nothing) >>= maybe (return ()) (either startContent startRegion) . fst startContent (x, True) = pipe (\sink-> pass True sink x) (getConsumer $ Just Nothing) >>= maybe (return ()) (either startContent startRegion) . fst startRegion b = pipe (next True) (getConsumer (Just $ Just b)) >>= maybe (return ()) (either startContent startRegion) . fst pass t sink x = put sink x >> next t sink next t sink = get source >>= maybe (return Nothing) (continue t sink) continue t sink (Left (x, t')) | t == t' = pass t sink x continue t sink p = return (Just p) -- | 'suppressProducer' runs the /producer/ argument with a new sink, suppressing everything 'put' in the sink. suppressProducer :: forall m a x r. (Functor a, Monad m) => (Sink m a x -> Coroutine a m r) -> Coroutine a m r suppressProducer producer = producer (nullSink :: Sink m a x) findsTrueIn :: forall m a d x b. (Monad m, AncestorFunctor a d) => Splitter m x b -> Source m a x -> Coroutine d m (Maybe (Maybe b)) findsTrueIn splitter source = pipe (\testTrue-> pipe (split splitter (liftSource source :: Source m d x) testTrue (nullSink :: Sink m d x)) get) get >>= \(((), maybeEdge), maybeTrue)-> return $ case maybeEdge of Nothing -> fmap (const Nothing) maybeTrue _ -> Just maybeEdge findsFalseIn :: forall m a d x b. (Monad m, AncestorFunctor a d) => Splitter m x b -> Source m a x -> Coroutine d m Bool findsFalseIn splitter source = pipe (\testFalse-> split splitter (liftSource source :: Source m d x) (nullSink :: Sink m d x) testFalse (nullSink :: Sink m d b)) get >>= \((), maybeFalse)-> return (isJust maybeFalse) teeConsumers :: forall m a d x r1 r2. MonadParallel m => Bool -> (forall a. OpenConsumer m a (SinkFunctor d x) x r1) -> (forall a. OpenConsumer m a (SourceFunctor d x) x r2) -> OpenConsumer m a d x (r1, r2) teeConsumers parallel c1 c2 source = pipePS parallel consume1 c2 where consume1 sink = c1 (teeSource sink source' :: Source m (SinkFunctor d x) x) source' :: Source m d x source' = liftSource source