module Potoki.Core.Consume where import Potoki.Core.Prelude import Potoki.Core.Types import qualified Potoki.Core.Fetch as A instance Profunctor Consume where {-# INLINE dimap #-} dimap inputMapping outputMapping (Consume consume) = Consume (\ fetch -> fmap outputMapping (consume (fmap inputMapping fetch))) instance Choice Consume where right' (Consume rightConsumeIO) = Consume $ \ (Fetch eitherFetchIO) -> do fetchedLeftMaybeRef <- newIORef Nothing consumedRight <- rightConsumeIO $ Fetch $ \ nil just -> join $ eitherFetchIO (return nil) $ \ case Right !fetchedRight -> return (just fetchedRight) Left !fetchedLeft -> writeIORef fetchedLeftMaybeRef (Just fetchedLeft) >> return nil fetchedLeftMaybe <- readIORef fetchedLeftMaybeRef case fetchedLeftMaybe of Nothing -> return (Right consumedRight) Just fetchedLeft -> return (Left fetchedLeft) instance Functor (Consume input) where fmap = rmap instance Applicative (Consume a) where pure x = Consume $ \_ -> pure x Consume leftConsumeIO <*> Consume rightConsumeIO = Consume $ \fetch -> leftConsumeIO fetch <*> rightConsumeIO fetch instance Monad (Consume a) where Consume leftConsumeIO >>= toRightConsumeIO = Consume $ \fetch -> do Consume rightConsumeIO <- toRightConsumeIO <$> leftConsumeIO fetch rightConsumeIO fetch instance MonadIO (Consume a) where liftIO a = Consume $ \_ -> a apConcurrently :: Consume a (b -> c) -> Consume a b -> Consume a c apConcurrently (Consume leftConsumeIO) (Consume rightConsumeIO) = Consume $ \ fetch -> do (leftFetch, rightFetch) <- A.duplicate fetch rightOutputVar <- newEmptyMVar forkIO $ do !rightOutput <- rightConsumeIO rightFetch putMVar rightOutputVar rightOutput !leftOutput <- leftConsumeIO leftFetch rightOutput <- takeMVar rightOutputVar return (leftOutput rightOutput) {-# INLINABLE list #-} list :: Consume input [input] list = Consume $ \ (Fetch fetchIO) -> let build !acc = join (fetchIO (pure (acc [])) (\ !element -> build (acc . (:) element))) in build id {-# INLINE sum #-} sum :: Num num => Consume num num sum = Consume $ \ (Fetch fetchIO) -> let build !acc = join (fetchIO (pure acc) (\ !element -> build (element + acc))) in build 0 {-# INLINABLE transform #-} transform :: Transform input output -> Consume output sinkOutput -> Consume input sinkOutput transform (Transform transformManaged) (Consume sink) = Consume (\ fetch -> with (fmap ($ fetch) transformManaged) sink)