{-| This module allows constant-space CSV parsing.

    It feeds 'ByteString's into cassavas incremental CSV parser to attain true
    constant-space record streaming.
-}

{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE OverloadedStrings #-}

module Pipes.Csv (
  feedParser,
  feedHeaderParser,
  decode,
  decodeWith,
  decodeByName,
  decodeByNameWith
) where

import qualified Data.Csv.Incremental as CI
import qualified Data.ByteString as B

import Data.Csv.Incremental (Parser(..), HeaderParser(..))
import Data.Csv (DecodeOptions, FromRecord,
                 FromNamedRecord, defaultDecodeOptions)
import Data.ByteString (ByteString)
import Pipes

-- | Create a Record 'Producer' by feeding 'ByteString's into a 'Parser'
feedParser :: Monad m
           => Parser a
           -> Producer ByteString m ()
           -> Producer (Either String a) m ()
feedParser parser source = case parser of
    Fail _ e  -> yield (Left e)
    Done es   -> each es
    Some es k -> each es >> cont k source
    Partial k -> cont k source
  where
    cont = continue feedParser


-- | Create a NamedRecord 'Producer' by feeding 'ByteString's into a 'Parser'
feedHeaderParser :: (Monad m, FromNamedRecord a)
                 => HeaderParser (Parser a)
                 -> Producer ByteString m ()
                 -> Producer (Either String a) m ()
feedHeaderParser headerParser source = case headerParser of
    FailH bs e -> yield (Left e)
    PartialH k -> cont k source
    DoneH _ p  -> feedParser p source
  where
    cont = continue feedHeaderParser


-- | Handle continuations properly within a Producer
continue :: (Monad (t m), Monad m, MonadTrans t)
         => (a -> Producer ByteString m () -> t m b)
         -> (ByteString -> a)
         -> Producer ByteString m ()
         -> t m b
continue feed k producer = do
  x <- lift (next producer)
  case x of
    Left () -> feed (k B.empty) (return ())
    Right (bs, producer') ->
      if (B.null bs)
      then continue feed k producer'
      else feed (k bs) producer'



-- | Equivalent to @'decodeWith' 'defaultDecodeOptions'@.
decode :: (Monad m, FromRecord a)
       => Bool
       -> Producer ByteString m ()
       -> Producer (Either String a) m ()
decode = decodeWith defaultDecodeOptions


-- | Create a 'Producer' that takes a 'ByteString' 'Producer' as input,
-- producing either errors or 'FromRecord's.
decodeWith :: (Monad m, FromRecord a)
           => DecodeOptions
           -> Bool
           -> Producer ByteString m ()
           -> Producer (Either String a) m ()
decodeWith opts skipHeader src = feedParser (CI.decodeWith opts skipHeader) src


-- | Equivalent to @'decodeByNameWith' 'defaultDecodeOptions'@.
decodeByName :: (Monad m, FromNamedRecord a)
             => Producer ByteString m ()
             -> Producer (Either String a) m ()
decodeByName = decodeByNameWith defaultDecodeOptions


-- | Create a 'Producer' that takes a 'ByteString' 'Producer' as input,
-- producing either errors or 'FromNamedRecord's.
decodeByNameWith :: (Monad m, FromNamedRecord a)
                 => DecodeOptions
                 -> Producer ByteString m ()
                 -> Producer (Either String a) m ()
decodeByNameWith opts src = feedHeaderParser (CI.decodeByNameWith opts) src