{-# LANGUAGE DeriveDataTypeable  #-}
{-# LANGUAGE ScopedTypeVariables #-}

{-

This file is part of the Haskell package cassava-streams. It is
subject to the license terms in the LICENSE file found in the
top-level directory of this distribution and at
git://pmade.com/cassava-streams/LICENSE. No part of cassava-streams
package, including this file, may be copied, modified, propagated, or
distributed except according to the terms contained in the LICENSE
file.

-}

--------------------------------------------------------------------------------
module System.IO.Streams.Csv.Decode
       ( StreamDecodingError (..)
       , decodeStream
       , decodeStreamWith
       , decodeStreamByName
       , decodeStreamByNameWith
       , onlyValidRecords
       ) where

--------------------------------------------------------------------------------
import Control.Exception
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Csv hiding (Parser, decodeWith, decodeByNameWith)
import Data.Csv.Incremental
import Data.IORef
import Data.Typeable
import System.IO.Streams (InputStream, makeInputStream)
import qualified System.IO.Streams as Streams

--------------------------------------------------------------------------------
-- | Exception thrown when stream decoding cannot continue due to an
-- error.
newtype StreamDecodingError = StreamDecodingError String
  deriving (Typeable, Int -> StreamDecodingError -> ShowS
[StreamDecodingError] -> ShowS
StreamDecodingError -> String
(Int -> StreamDecodingError -> ShowS)
-> (StreamDecodingError -> String)
-> ([StreamDecodingError] -> ShowS)
-> Show StreamDecodingError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [StreamDecodingError] -> ShowS
$cshowList :: [StreamDecodingError] -> ShowS
show :: StreamDecodingError -> String
$cshow :: StreamDecodingError -> String
showsPrec :: Int -> StreamDecodingError -> ShowS
$cshowsPrec :: Int -> StreamDecodingError -> ShowS
Show)

instance Exception StreamDecodingError

--------------------------------------------------------------------------------
-- | Create an @InputStream@ which decodes CSV records from the given
-- upstream data source.
--
-- Equivalent to @decodeStreamWith defaultDecodeOptions@.
decodeStream :: (FromRecord a)
             => HasHeader
             -- ^ Whether to skip a header or not.
             -> InputStream ByteString
             -- ^ Upstream.
             -> IO (InputStream (Either String a))
             -- ^ An @InputStream@ which produces records.
decodeStream :: HasHeader
-> InputStream ByteString -> IO (InputStream (Either String a))
decodeStream = DecodeOptions
-> HasHeader
-> InputStream ByteString
-> IO (InputStream (Either String a))
forall a.
FromRecord a =>
DecodeOptions
-> HasHeader
-> InputStream ByteString
-> IO (InputStream (Either String a))
decodeStreamWith DecodeOptions
defaultDecodeOptions

--------------------------------------------------------------------------------
-- | Create an @InputStream@ which decodes CSV records from the given
-- upstream data source.
decodeStreamWith :: (FromRecord a)
                 => DecodeOptions
                 -- ^ CSV decoding options.
                 -> HasHeader
                 -- ^ Whether to skip a header or not.
                 -> InputStream ByteString
                 -- ^ Upstream.
                 -> IO (InputStream (Either String a))
                 -- ^ An @InputStream@ which produces records.
decodeStreamWith :: DecodeOptions
-> HasHeader
-> InputStream ByteString
-> IO (InputStream (Either String a))
decodeStreamWith DecodeOptions
ops HasHeader
hdr InputStream ByteString
input = do
  IORef [Either String a]
queue  <- [Either String a] -> IO (IORef [Either String a])
forall a. a -> IO (IORef a)
newIORef []
  IORef (Maybe (Parser a))
parser <- Maybe (Parser a) -> IO (IORef (Maybe (Parser a)))
forall a. a -> IO (IORef a)
newIORef (Maybe (Parser a) -> IO (IORef (Maybe (Parser a))))
-> Maybe (Parser a) -> IO (IORef (Maybe (Parser a)))
forall a b. (a -> b) -> a -> b
$ Parser a -> Maybe (Parser a)
forall a. a -> Maybe a
Just (DecodeOptions -> HasHeader -> Parser a
forall a. FromRecord a => DecodeOptions -> HasHeader -> Parser a
decodeWith DecodeOptions
ops HasHeader
hdr)
  IO (Maybe (Either String a)) -> IO (InputStream (Either String a))
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
forall a.
IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
dispatch IORef [Either String a]
queue IORef (Maybe (Parser a))
parser InputStream ByteString
input)

--------------------------------------------------------------------------------
-- | Create an @InputStream@ which decodes CSV records from the given
-- upstream data source.  Data should be preceded by a header.
--
-- Equivalent to @decodeStreamByNameWith defaultDecodeOptions@.
decodeStreamByName :: (FromNamedRecord a)
                   => InputStream ByteString
                   -- ^ Upstream.
                   -> IO (InputStream (Either String a))
                   -- ^ An @InputStream@ which produces records.
decodeStreamByName :: InputStream ByteString -> IO (InputStream (Either String a))
decodeStreamByName = DecodeOptions
-> InputStream ByteString -> IO (InputStream (Either String a))
forall a.
FromNamedRecord a =>
DecodeOptions
-> InputStream ByteString -> IO (InputStream (Either String a))
decodeStreamByNameWith DecodeOptions
defaultDecodeOptions

--------------------------------------------------------------------------------
-- | Create an @InputStream@ which decodes CSV records from the given
-- upstream data source.  Data should be preceded by a header.
decodeStreamByNameWith :: (FromNamedRecord a)
                       => DecodeOptions
                       -- ^ CSV decoding options.
                       -> InputStream ByteString
                       -- ^ Upstream.
                       -> IO (InputStream (Either String a))
                       -- ^ An @InputStream@ which produces records.
decodeStreamByNameWith :: DecodeOptions
-> InputStream ByteString -> IO (InputStream (Either String a))
decodeStreamByNameWith DecodeOptions
ops InputStream ByteString
input = HeaderParser (Parser a) -> IO (InputStream (Either String a))
forall a.
HeaderParser (Parser a) -> IO (InputStream (Either String a))
go (DecodeOptions -> HeaderParser (Parser a)
forall a.
FromNamedRecord a =>
DecodeOptions -> HeaderParser (Parser a)
decodeByNameWith DecodeOptions
ops) where
  -- Dispatch on the HeaderParser type.
  go :: HeaderParser (Parser a) -> IO (InputStream (Either String a))
go (FailH ByteString
_ String
e)  = String -> IO (InputStream (Either String a))
forall a. String -> IO a
bomb String
e
  go (PartialH ByteString -> HeaderParser (Parser a)
f) = InputStream ByteString -> IO (Maybe ByteString)
forall a. InputStream a -> IO (Maybe a)
Streams.read InputStream ByteString
input IO (Maybe ByteString)
-> (Maybe ByteString -> IO (InputStream (Either String a)))
-> IO (InputStream (Either String a))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HeaderParser (Parser a) -> IO (InputStream (Either String a))
go (HeaderParser (Parser a) -> IO (InputStream (Either String a)))
-> (Maybe ByteString -> HeaderParser (Parser a))
-> Maybe ByteString
-> IO (InputStream (Either String a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HeaderParser (Parser a)
-> (ByteString -> HeaderParser (Parser a))
-> Maybe ByteString
-> HeaderParser (Parser a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ByteString -> HeaderParser (Parser a)
f ByteString
BS.empty) ByteString -> HeaderParser (Parser a)
f
  go (DoneH Header
_ Parser a
p)  = do
    IORef [Either String a]
queue  <- [Either String a] -> IO (IORef [Either String a])
forall a. a -> IO (IORef a)
newIORef []
    IORef (Maybe (Parser a))
parser <- Maybe (Parser a) -> IO (IORef (Maybe (Parser a)))
forall a. a -> IO (IORef a)
newIORef (Parser a -> Maybe (Parser a)
forall a. a -> Maybe a
Just Parser a
p)
    IO (Maybe (Either String a)) -> IO (InputStream (Either String a))
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
forall a.
IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
dispatch IORef [Either String a]
queue IORef (Maybe (Parser a))
parser InputStream ByteString
input)

--------------------------------------------------------------------------------
-- | Creates a new @InputStream@ which only sends valid CSV records
-- downstream.  The first invalid record will throw an exception.
onlyValidRecords :: InputStream (Either String a)
                 -- ^ Upstream.
                 -> IO (InputStream a)
                 -- ^ An @InputStream@ which only produces valid
                 -- records.
onlyValidRecords :: InputStream (Either String a) -> IO (InputStream a)
onlyValidRecords InputStream (Either String a)
input = IO (Maybe a) -> IO (InputStream a)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IO (Maybe a) -> IO (InputStream a))
-> IO (Maybe a) -> IO (InputStream a)
forall a b. (a -> b) -> a -> b
$ do
  Maybe (Either String a)
upstream <- InputStream (Either String a) -> IO (Maybe (Either String a))
forall a. InputStream a -> IO (Maybe a)
Streams.read InputStream (Either String a)
input

  case Maybe (Either String a)
upstream of
    Maybe (Either String a)
Nothing         -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
    Just (Left String
err) -> String -> IO (Maybe a)
forall a. String -> IO a
bomb (String
"invalid CSV row: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
err)
    Just (Right a
x)  -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
x)

--------------------------------------------------------------------------------
-- | Internal function which feeds data to the CSV parser.
dispatch :: forall a. IORef [Either String a]
         -- ^ List of queued CSV records.
         -> IORef (Maybe (Parser a))
         -- ^ Current CSV parser state.
         -> InputStream ByteString
         -- ^ Upstream.
         -> IO (Maybe (Either String a))
         -- ^ Data feed downstream.
dispatch :: IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
dispatch IORef [Either String a]
queueRef IORef (Maybe (Parser a))
parserRef InputStream ByteString
input = do
  [Either String a]
queue <- IORef [Either String a] -> IO [Either String a]
forall a. IORef a -> IO a
readIORef IORef [Either String a]
queueRef

  case [Either String a]
queue of
    [] -> do
      Maybe (Parser a)
parser <- IORef (Maybe (Parser a)) -> IO (Maybe (Parser a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Parser a))
parserRef
      case Maybe (Parser a)
parser of
        Maybe (Parser a)
Nothing          -> Maybe (Either String a) -> IO (Maybe (Either String a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Either String a)
forall a. Maybe a
Nothing
        Just (Fail ByteString
_  String
e)  -> String -> IO (Maybe (Either String a))
forall a. String -> IO a
bomb (String
"input data malformed: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
e)
        Just (Many [] ByteString -> Parser a
f) -> (ByteString -> Parser a) -> IO (Maybe (Either String a))
more ByteString -> Parser a
f
        Just (Many [Either String a]
xs ByteString -> Parser a
f) -> IORef (Maybe (Parser a)) -> Maybe (Parser a) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe (Parser a))
parserRef (Parser a -> Maybe (Parser a)
forall a. a -> Maybe a
Just (Parser a -> Maybe (Parser a)) -> Parser a -> Maybe (Parser a)
forall a b. (a -> b) -> a -> b
$ [Either String a] -> (ByteString -> Parser a) -> Parser a
forall a. [Either String a] -> (ByteString -> Parser a) -> Parser a
Many [] ByteString -> Parser a
f) IO ()
-> IO (Maybe (Either String a)) -> IO (Maybe (Either String a))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> [Either String a] -> IO (Maybe (Either String a))
feed [Either String a]
xs
        Just (Done [Either String a]
xs  ) -> IORef (Maybe (Parser a)) -> Maybe (Parser a) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe (Parser a))
parserRef Maybe (Parser a)
forall a. Maybe a
Nothing            IO ()
-> IO (Maybe (Either String a)) -> IO (Maybe (Either String a))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> [Either String a] -> IO (Maybe (Either String a))
feed [Either String a]
xs

    (Either String a
x:[Either String a]
xs) -> do
      IORef [Either String a] -> [Either String a] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [Either String a]
queueRef [Either String a]
xs
      Maybe (Either String a) -> IO (Maybe (Either String a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either String a -> Maybe (Either String a)
forall a. a -> Maybe a
Just Either String a
x)

  where
    -- Send more data to the CSV parser.  If there is no more data
    -- from upstream then send an empty @ByteString@.
    more :: (ByteString -> Parser a) -> IO (Maybe (Either String a))
    more :: (ByteString -> Parser a) -> IO (Maybe (Either String a))
more ByteString -> Parser a
f = do Maybe ByteString
bs <- InputStream ByteString -> IO (Maybe ByteString)
forall a. InputStream a -> IO (Maybe a)
Streams.read InputStream ByteString
input
                IORef (Maybe (Parser a)) -> Maybe (Parser a) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe (Parser a))
parserRef (Parser a -> Maybe (Parser a)
forall a. a -> Maybe a
Just (Parser a -> Maybe (Parser a)) -> Parser a -> Maybe (Parser a)
forall a b. (a -> b) -> a -> b
$ Parser a
-> (ByteString -> Parser a) -> Maybe ByteString -> Parser a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ByteString -> Parser a
f ByteString
BS.empty) ByteString -> Parser a
f Maybe ByteString
bs)
                IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
forall a.
IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
dispatch IORef [Either String a]
queueRef IORef (Maybe (Parser a))
parserRef InputStream ByteString
input

    -- Feed records downstream.
    feed :: [Either String a] -> IO (Maybe (Either String a))
    feed :: [Either String a] -> IO (Maybe (Either String a))
feed [Either String a]
xs = do IORef [Either String a] -> [Either String a] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [Either String a]
queueRef [Either String a]
xs
                 IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
forall a.
IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
dispatch IORef [Either String a]
queueRef IORef (Maybe (Parser a))
parserRef InputStream ByteString
input

--------------------------------------------------------------------------------
-- | Throw an exception.
bomb :: String -> IO a
bomb :: String -> IO a
bomb = StreamDecodingError -> IO a
forall e a. Exception e => e -> IO a
throwIO (StreamDecodingError -> IO a)
-> (String -> StreamDecodingError) -> String -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> StreamDecodingError
StreamDecodingError