module Potoki.Core.Consume

import Potoki.Core.Prelude hiding (sum, head, fold, concat, last)
import Potoki.Core.Types
import qualified Potoki.Core.Fetch as A
import qualified Acquire.IO as B
import qualified Potoki.Core.Transform as J
import qualified Potoki.Core.IO.Fetch as L
import qualified Data.ByteString as C
import qualified Data.Attoparsec.ByteString as E
import qualified Data.Attoparsec.Text as F
import qualified Data.Attoparsec.Types as I
import qualified Data.Text.IO as K
import qualified Control.Foldl as D
import qualified System.Directory as G
import qualified Potoki.Core.Transform.Concurrency as B
import qualified Control.Monad.Trans.State.Strict as O

instance Profunctor Consume where
  {-# INLINE dimap #-}
  dimap inputMapping outputMapping (Consume consume) =
    Consume (\ fetch -> fmap outputMapping (consume $ fmap inputMapping fetch))

instance Choice Consume where
  right' :: Consume a b -> Consume (Either c a) (Either c b)
  right' (Consume rightConsumeIO) =
     Consume $ \ (Fetch eitherFetchIO) -> do
       fetchedLeftMaybeRef <- newIORef Nothing
       consumedRight <-
         rightConsumeIO $ Fetch $ do
           eitherFetch <- eitherFetchIO
           case eitherFetch of
             Nothing      -> return Nothing
             Just element -> case element of
               Right fetchedRight -> return $ Just fetchedRight
               Left  fetchedLeft  -> do
                 writeIORef fetchedLeftMaybeRef $ Just fetchedLeft
                 return Nothing
       fetchedLeftMaybe <- readIORef fetchedLeftMaybeRef
       case fetchedLeftMaybe of
         Nothing          -> return $ Right consumedRight
         Just fetchedLeft -> return $ Left fetchedLeft

instance Functor (Consume input) where
  fmap = rmap

instance Applicative (Consume a) where
  pure x = Consume $ \ _ -> pure x

  Consume leftConsumeIO <*> Consume rightConsumeIO =
    Consume $ \ fetch -> leftConsumeIO fetch <*> rightConsumeIO fetch

instance Monad (Consume a) where
  Consume leftConsumeIO >>= toRightConsumeIO = Consume $ \ fetch -> do
    Consume rightConsumeIO <- toRightConsumeIO <$> leftConsumeIO fetch
    rightConsumeIO fetch

instance MonadIO (Consume a) where
  liftIO a = Consume $ \ _ -> a

apConcurrently :: Consume a (b -> c) -> Consume a b -> Consume a c
apConcurrently (Consume leftConsumeIO) (Consume rightConsumeIO) =
  Consume $ \ fetch -> do
    (leftFetch, rightFetch) <- A.duplicate fetch
    rightOutputVar <- newEmptyMVar
    _ <- forkIO $ do
      !rightOutput <- rightConsumeIO rightFetch
      putMVar rightOutputVar rightOutput
    !leftOutput <- leftConsumeIO leftFetch
    rightOutput <- takeMVar rightOutputVar
    return (leftOutput rightOutput)

{-# INLINABLE list #-}
list :: Consume input [input]
list =
  Consume $ \ (Fetch fetchIO) ->
      build !acc = do
        fetch <- fetchIO
        case fetch of
          Nothing       -> pure $ acc []
          Just !element -> build $ acc . (:) element
     in build id

{-# INLINE sum #-}
sum :: Num num => Consume num num
sum =
  Consume $ \ (Fetch fetchIO) ->
      build !acc = do
        fetch <- fetchIO
        case fetch of
          Nothing       -> pure acc
          Just !element -> build $ element + acc
     in build 0

{-# INLINABLE transform #-}
transform :: Transform input1 input2 -> Consume input2 output -> Consume input1 output
transform (Transform transformAcquire) (Consume consumeIO) =
  Consume $ \ fetch -> B.acquire (transformAcquire fetch) consumeIO

{-# INLINABLE head #-}
head :: Consume input (Maybe input)
head =
  Consume (\ (A.Fetch fetchIO) -> fetchIO)

{-# INLINABLE last #-}
last :: Consume input (Maybe input)
last =
  fold D.last

A faster alternative to "list",
which however constructs the list in the reverse order.
{-# INLINABLE reverseList #-}
reverseList :: Consume input [input]
reverseList =
  Consume $ \ (A.Fetch fetchIO) -> build fetchIO []
    build fetchIO !acc =
      fetchIO >>= \case
        Nothing -> pure acc
        Just element -> build fetchIO (element : acc)

{-# INLINABLE vector #-}
vector :: Consume input (Vector input)
vector =
  foldInIO D.vectorM

{-# INLINABLE count #-}
count :: Consume input Int
count =
  Consume $ \ (A.Fetch fetchIO) -> let
    iterate !count = do
      fetchResult <- fetchIO
      case fetchResult of
        Just _ -> iterate (succ count)
        Nothing -> return count
    in iterate 0

{-# INLINABLE concat #-}
concat :: (Semigroup monoid, Monoid monoid) => Consume monoid monoid
concat =
  Consume $ \ (A.Fetch fetchIO) -> build fetchIO mempty
    build fetchIO !acc =
      fetchIO >>= \case
        Nothing -> pure acc
        Just element -> build fetchIO (acc <> element)

{-# INLINABLE processInIO #-}
processInIO :: IO () -> (element -> IO ()) -> Consume element ()
processInIO stop process =
  Consume (\ fetch -> L.fetchAndHandleAll fetch stop process)

{-# INLINABLE printBytes #-}
printBytes :: Consume ByteString ()
printBytes =
  processInIO (putChar '\n') C.putStr

{-# INLINABLE printText #-}
printText :: Consume Text ()
printText =
  processInIO (putChar '\n') K.putStr

{-# INLINABLE printString #-}
printString :: Consume String ()
printString =
  processInIO (putChar '\n') putStr

Overwrite a file.

* Exception-free
* Automatic resource management
{-# INLINABLE writeBytesToFile #-}
writeBytesToFile :: FilePath -> Consume ByteString (Either IOException ())
writeBytesToFile =
  writeBytesToFileWithBuffering (BlockBuffering Nothing)

Overwrite a file.

* Exception-free
* Automatic resource management
{-# INLINABLE writeBytesToFileWithBuffering #-}
writeBytesToFileWithBuffering :: BufferMode -> FilePath -> Consume ByteString (Either IOException ())
writeBytesToFileWithBuffering bufferMode path =
  Consume $ \ fetch ->
  try $ withFile path WriteMode $ \ handle ->
    hSetBuffering handle bufferMode
    L.fetchAndHandleAll fetch (return ()) (C.hPut handle)

A more efficient implementation than just writing to file without buffering.
It uses an explicit buffer of input chunks and flushes all the chunks that have been so far aggregated at once.
{-# INLINABLE writeBytesToFileWithoutBuffering #-}
writeBytesToFileWithoutBuffering :: FilePath -> Consume ByteString (Either IOException ())
writeBytesToFileWithoutBuffering =
  transform (arr mconcat . B.bufferizeFlushing 64) . writeBytesToFileWithBuffering NoBuffering

Append to a file.

* Exception-free
* Automatic resource management
{-# INLINABLE appendBytesToFile #-}
appendBytesToFile :: FilePath -> Consume ByteString (Either IOException ())
appendBytesToFile path =
  Consume $ \ fetch ->
  try $ withFile path AppendMode $ \ handleVal ->
    L.fetchAndHandleAll fetch (return ()) (C.hPut handleVal)

{-# INLINABLE deleteFiles #-}
deleteFiles :: Consume FilePath (Either IOException ())
deleteFiles =
  Consume $ \ fetch ->
  try $ L.fetchAndHandleAll fetch (return ()) G.removeFile

{-# INLINABLE fold #-}
fold :: D.Fold input output -> Consume input output
fold (D.Fold step initVal finish) =
  Consume $ \ (A.Fetch fetch) -> build fetch initVal
    build fetch !acc =
      fetch >>= \case
        Nothing -> pure $ finish acc
        Just input -> build fetch (step acc input)

{-# INLINABLE foldInIO #-}
foldInIO :: D.FoldM IO input output -> Consume input output
foldInIO (D.FoldM step initVal finish) =
  Consume $ \ (A.Fetch fetch) -> build fetch =<< initVal
    build fetch !acc =
      fetch >>= \case
        Nothing -> finish acc
        Just input -> step acc input >>= build fetch

{-# INLINABLE folding #-}
folding :: D.Fold a b -> Consume a c -> Consume a (b, c)
folding (D.Fold step initVal extract) (Consume consumeIO) =
  Consume $ \ fetch -> do
    foldStateRef <- newIORef initVal
    consumptionResult <-
      consumeIO (A.handlingElements (\ element -> do
        !newState <- flip step element <$> readIORef foldStateRef
        writeIORef foldStateRef newState) fetch)
    foldResult <- extract <$> readIORef foldStateRef
    return (foldResult, consumptionResult)

{-# INLINABLE foldingInIO #-}
foldingInIO :: D.FoldM IO a b -> Consume a c -> Consume a (b, c)
foldingInIO (D.FoldM step initVal extract) (Consume consumeIO) =
  Consume $ \ fetch -> do
    foldStateRef <- newIORef =<< initVal
    consumptionResult <-
      consumeIO (A.handlingElements (\ element -> do
        !newState <- flip step element =<< readIORef foldStateRef
        writeIORef foldStateRef newState) fetch)
    foldResult <- extract =<< readIORef foldStateRef
    return (foldResult, consumptionResult)

{-# INLINE execState #-}
execState :: (a -> O.State s b) -> s -> Consume a s
execState stateFn initialState =
  fold $ D.Fold (\currentState input -> snd $ O.runState (stateFn input) currentState) initialState id

{-# INLINABLE runParseResult #-}
runParseResult :: (Monoid input, Eq input) => (input -> I.IResult input output) -> Consume input (Either Text output)
runParseResult inputToResult =
  Consume $ \ (A.Fetch fetchInput) ->
    just !input =
      case inputToResult input of
        I.Partial newInputToResult -> consume newInputToResult
        I.Done _ parsed -> return (Right parsed)
        I.Fail _ contexts message -> return (Left resultMessage)
            resultMessage =
              if null contexts
                then fromString message
                else fromString (showString (intercalate " > " contexts) (showString ": " message))
    consume _ =
      fetchInput >>= \case
        Nothing -> just mempty
        Just !input -> just input
    in consume inputToResult

{-# INLINABLE parseBytes #-}
parseBytes :: E.Parser output -> Consume ByteString (Either Text output)
parseBytes =
  runParseResult . E.parse

{-# INLINABLE parseText #-}
parseText :: F.Parser output -> Consume Text (Either Text output)
parseText =
  runParseResult . F.parse

Execute a Consume concurrently and consume its results.
{-# INLINABLE concurrently #-}
concurrently :: Int -> Consume a b -> Consume b c -> Consume a c
concurrently amount consume1 consume2 =
  transform (B.concurrently amount (J.consume consume1)) consume2