module Potoki.Core.Consume
import Potoki.Core.Prelude hiding (sum, head, fold, concat, last)
import Potoki.Core.Types
import qualified Potoki.Core.Fetch as A
import qualified Acquire.IO as B
import qualified Potoki.Core.Transform as J
import qualified Potoki.Core.IO.Fetch as L
import qualified Data.ByteString as C
import qualified Data.Attoparsec.ByteString as E
import qualified Data.Attoparsec.Text as F
import qualified Data.Attoparsec.Types as I
import qualified Data.Text.IO as K
import qualified Control.Foldl as D
import qualified System.Directory as G
import qualified Potoki.Core.Transform.Concurrency as B
import qualified Control.Monad.Trans.State.Strict as O
instance Profunctor Consume where
{-# INLINE dimap #-}
dimap inputMapping outputMapping (Consume consume) =
Consume (\ fetch -> fmap outputMapping (consume $ fmap inputMapping fetch))
instance Choice Consume where
right' :: Consume a b -> Consume (Either c a) (Either c b)
right' (Consume rightConsumeIO) =
Consume $ \ (Fetch eitherFetchIO) -> do
fetchedLeftMaybeRef <- newIORef Nothing
consumedRight <-
rightConsumeIO $ Fetch $ do
eitherFetch <- eitherFetchIO
case eitherFetch of
Nothing -> return Nothing
Just element -> case element of
Right fetchedRight -> return $ Just fetchedRight
Left fetchedLeft -> do
writeIORef fetchedLeftMaybeRef $ Just fetchedLeft
return Nothing
fetchedLeftMaybe <- readIORef fetchedLeftMaybeRef
case fetchedLeftMaybe of
Nothing -> return $ Right consumedRight
Just fetchedLeft -> return $ Left fetchedLeft
instance Functor (Consume input) where
fmap = rmap
instance Applicative (Consume a) where
pure x = Consume $ \ _ -> pure x
Consume leftConsumeIO <*> Consume rightConsumeIO =
Consume $ \ fetch -> leftConsumeIO fetch <*> rightConsumeIO fetch
instance Monad (Consume a) where
Consume leftConsumeIO >>= toRightConsumeIO = Consume $ \ fetch -> do
Consume rightConsumeIO <- toRightConsumeIO <$> leftConsumeIO fetch
rightConsumeIO fetch
instance MonadIO (Consume a) where
liftIO a = Consume $ \ _ -> a
apConcurrently :: Consume a (b -> c) -> Consume a b -> Consume a c
apConcurrently (Consume leftConsumeIO) (Consume rightConsumeIO) =
Consume $ \ fetch -> do
(leftFetch, rightFetch) <- A.duplicate fetch
rightOutputVar <- newEmptyMVar
_ <- forkIO $ do
!rightOutput <- rightConsumeIO rightFetch
putMVar rightOutputVar rightOutput
!leftOutput <- leftConsumeIO leftFetch
rightOutput <- takeMVar rightOutputVar
return (leftOutput rightOutput)
{-# INLINABLE list #-}
list :: Consume input [input]
list =
Consume $ \ (Fetch fetchIO) ->
build !acc = do
fetch <- fetchIO
case fetch of
Nothing -> pure $ acc []
Just !element -> build $ acc . (:) element
in build id
{-# INLINE sum #-}
sum :: Num num => Consume num num
sum =
Consume $ \ (Fetch fetchIO) ->
build !acc = do
fetch <- fetchIO
case fetch of
Nothing -> pure acc
Just !element -> build $ element + acc
in build 0
{-# INLINABLE transform #-}
transform :: Transform input1 input2 -> Consume input2 output -> Consume input1 output
transform (Transform transformAcquire) (Consume consumeIO) =
Consume $ \ fetch -> B.acquire (transformAcquire fetch) consumeIO
{-# INLINABLE head #-}
head :: Consume input (Maybe input)
head =
Consume (\ (A.Fetch fetchIO) -> fetchIO)
{-# INLINABLE last #-}
last :: Consume input (Maybe input)
last =
fold D.last
{-# INLINABLE reverseList #-}
reverseList :: Consume input [input]
reverseList =
Consume $ \ (A.Fetch fetchIO) -> build fetchIO []
build fetchIO !acc =
fetchIO >>= \case
Nothing -> pure acc
Just element -> build fetchIO (element : acc)
{-# INLINABLE vector #-}
vector :: Consume input (Vector input)
vector =
foldInIO D.vectorM
{-# INLINABLE count #-}
count :: Consume input Int
count =
Consume $ \ (A.Fetch fetchIO) -> build fetchIO 0
build fetchIO !acc =
fetchIO >>= \case
Nothing -> pure acc
Just _ -> build fetchIO (succ acc)
{-# INLINABLE concat #-}
concat :: Monoid monoid => Consume monoid monoid
concat =
Consume $ \ (A.Fetch fetchIO) -> build fetchIO mempty
build fetchIO !acc =
fetchIO >>= \case
Nothing -> pure acc
Just element -> build fetchIO (acc <> element)
{-# INLINABLE processInIO #-}
processInIO :: IO () -> (element -> IO ()) -> Consume element ()
processInIO stop process =
Consume (\ fetch -> L.fetchAndHandleAll fetch stop process)
{-# INLINABLE printBytes #-}
printBytes :: Consume ByteString ()
printBytes =
processInIO (putChar '\n') C.putStr
{-# INLINABLE printText #-}
printText :: Consume Text ()
printText =
processInIO (putChar '\n') K.putStr
{-# INLINABLE printString #-}
printString :: Consume String ()
printString =
processInIO (putChar '\n') putStr
{-# INLINABLE writeBytesToFile #-}
writeBytesToFile :: FilePath -> Consume ByteString (Either IOException ())
writeBytesToFile path =
Consume $ \ fetch ->
try $ withFile path WriteMode $ \ handleVal ->
L.fetchAndHandleAll fetch (return ()) (C.hPut handleVal)
{-# INLINABLE appendBytesToFile #-}
appendBytesToFile :: FilePath -> Consume ByteString (Either IOException ())
appendBytesToFile path =
Consume $ \ fetch ->
try $ withFile path AppendMode $ \ handleVal ->
L.fetchAndHandleAll fetch (return ()) (C.hPut handleVal)
{-# INLINABLE deleteFiles #-}
deleteFiles :: Consume FilePath (Either IOException ())
deleteFiles =
Consume $ \ fetch ->
try $ L.fetchAndHandleAll fetch (return ()) G.removeFile
{-# INLINABLE fold #-}
fold :: D.Fold input output -> Consume input output
fold (D.Fold step initVal finish) =
Consume $ \ (A.Fetch fetch) -> build fetch initVal
build fetch !acc =
fetch >>= \case
Nothing -> pure $ finish acc
Just input -> build fetch (step acc input)
{-# INLINABLE foldInIO #-}
foldInIO :: D.FoldM IO input output -> Consume input output
foldInIO (D.FoldM step initVal finish) =
Consume $ \ (A.Fetch fetch) -> build fetch =<< initVal
build fetch !acc =
fetch >>= \case
Nothing -> finish acc
Just input -> step acc input >>= build fetch
{-# INLINABLE folding #-}
folding :: D.Fold a b -> Consume a c -> Consume a (b, c)
folding (D.Fold step initVal extract) (Consume consumeIO) =
Consume $ \ fetch -> do
foldStateRef <- newIORef initVal
consumptionResult <-
consumeIO (A.handlingElements (\ element -> do
!newState <- flip step element <$> readIORef foldStateRef
writeIORef foldStateRef newState) fetch)
foldResult <- extract <$> readIORef foldStateRef
return (foldResult, consumptionResult)
{-# INLINABLE foldingInIO #-}
foldingInIO :: D.FoldM IO a b -> Consume a c -> Consume a (b, c)
foldingInIO (D.FoldM step initVal extract) (Consume consumeIO) =
Consume $ \ fetch -> do
foldStateRef <- newIORef =<< initVal
consumptionResult <-
consumeIO (A.handlingElements (\ element -> do
!newState <- flip step element =<< readIORef foldStateRef
writeIORef foldStateRef newState) fetch)
foldResult <- extract =<< readIORef foldStateRef
return (foldResult, consumptionResult)
{-# INLINE execState #-}
execState :: (a -> O.State s b) -> s -> Consume a s
execState stateFn initialState =
fold $ D.Fold (\currentState input -> snd $ O.runState (stateFn input) currentState) initialState id
{-# INLINABLE runParseResult #-}
runParseResult :: (Monoid input, Eq input) => (input -> I.IResult input output) -> Consume input (Either Text output)
runParseResult inputToResult =
Consume $ \ (A.Fetch fetchInput) ->
just !input =
case inputToResult input of
I.Partial newInputToResult -> consume newInputToResult
I.Done _ parsed -> return (Right parsed)
I.Fail _ contexts message -> return (Left resultMessage)
resultMessage =
if null contexts
then fromString message
else fromString (showString (intercalate " > " contexts) (showString ": " message))
consume _ =
fetchInput >>= \case
Nothing -> just mempty
Just !input -> just input
in consume inputToResult
{-# INLINABLE parseBytes #-}
parseBytes :: E.Parser output -> Consume ByteString (Either Text output)
parseBytes =
runParseResult . E.parse
{-# INLINABLE parseText #-}
parseText :: F.Parser output -> Consume Text (Either Text output)
parseText =
runParseResult . F.parse
{-# INLINABLE concurrently #-}
concurrently :: Int -> Consume a b -> Consume b c -> Consume a c
concurrently amount consume1 consume2 =
transform (B.concurrently amount (J.consume consume1)) consume2