{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ScopedTypeVariables #-}
module System.IO.Streams.Csv.Decode
( StreamDecodingError (..)
, decodeStream
, decodeStreamWith
, decodeStreamByName
, decodeStreamByNameWith
, onlyValidRecords
) where
import Control.Exception
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Csv hiding (Parser, decodeWith, decodeByNameWith)
import Data.Csv.Incremental
import Data.IORef
import Data.Typeable
import System.IO.Streams (InputStream, makeInputStream)
import qualified System.IO.Streams as Streams
newtype StreamDecodingError = StreamDecodingError String
deriving (Typeable, Int -> StreamDecodingError -> ShowS
[StreamDecodingError] -> ShowS
StreamDecodingError -> String
(Int -> StreamDecodingError -> ShowS)
-> (StreamDecodingError -> String)
-> ([StreamDecodingError] -> ShowS)
-> Show StreamDecodingError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [StreamDecodingError] -> ShowS
$cshowList :: [StreamDecodingError] -> ShowS
show :: StreamDecodingError -> String
$cshow :: StreamDecodingError -> String
showsPrec :: Int -> StreamDecodingError -> ShowS
$cshowsPrec :: Int -> StreamDecodingError -> ShowS
Show)
instance Exception StreamDecodingError
decodeStream :: (FromRecord a)
=> HasHeader
-> InputStream ByteString
-> IO (InputStream (Either String a))
decodeStream :: HasHeader
-> InputStream ByteString -> IO (InputStream (Either String a))
decodeStream = DecodeOptions
-> HasHeader
-> InputStream ByteString
-> IO (InputStream (Either String a))
forall a.
FromRecord a =>
DecodeOptions
-> HasHeader
-> InputStream ByteString
-> IO (InputStream (Either String a))
decodeStreamWith DecodeOptions
defaultDecodeOptions
decodeStreamWith :: (FromRecord a)
=> DecodeOptions
-> HasHeader
-> InputStream ByteString
-> IO (InputStream (Either String a))
decodeStreamWith :: DecodeOptions
-> HasHeader
-> InputStream ByteString
-> IO (InputStream (Either String a))
decodeStreamWith DecodeOptions
ops HasHeader
hdr InputStream ByteString
input = do
IORef [Either String a]
queue <- [Either String a] -> IO (IORef [Either String a])
forall a. a -> IO (IORef a)
newIORef []
IORef (Maybe (Parser a))
parser <- Maybe (Parser a) -> IO (IORef (Maybe (Parser a)))
forall a. a -> IO (IORef a)
newIORef (Maybe (Parser a) -> IO (IORef (Maybe (Parser a))))
-> Maybe (Parser a) -> IO (IORef (Maybe (Parser a)))
forall a b. (a -> b) -> a -> b
$ Parser a -> Maybe (Parser a)
forall a. a -> Maybe a
Just (DecodeOptions -> HasHeader -> Parser a
forall a. FromRecord a => DecodeOptions -> HasHeader -> Parser a
decodeWith DecodeOptions
ops HasHeader
hdr)
IO (Maybe (Either String a)) -> IO (InputStream (Either String a))
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
forall a.
IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
dispatch IORef [Either String a]
queue IORef (Maybe (Parser a))
parser InputStream ByteString
input)
decodeStreamByName :: (FromNamedRecord a)
=> InputStream ByteString
-> IO (InputStream (Either String a))
decodeStreamByName :: InputStream ByteString -> IO (InputStream (Either String a))
decodeStreamByName = DecodeOptions
-> InputStream ByteString -> IO (InputStream (Either String a))
forall a.
FromNamedRecord a =>
DecodeOptions
-> InputStream ByteString -> IO (InputStream (Either String a))
decodeStreamByNameWith DecodeOptions
defaultDecodeOptions
decodeStreamByNameWith :: (FromNamedRecord a)
=> DecodeOptions
-> InputStream ByteString
-> IO (InputStream (Either String a))
decodeStreamByNameWith :: DecodeOptions
-> InputStream ByteString -> IO (InputStream (Either String a))
decodeStreamByNameWith DecodeOptions
ops InputStream ByteString
input = HeaderParser (Parser a) -> IO (InputStream (Either String a))
forall a.
HeaderParser (Parser a) -> IO (InputStream (Either String a))
go (DecodeOptions -> HeaderParser (Parser a)
forall a.
FromNamedRecord a =>
DecodeOptions -> HeaderParser (Parser a)
decodeByNameWith DecodeOptions
ops) where
go :: HeaderParser (Parser a) -> IO (InputStream (Either String a))
go (FailH ByteString
_ String
e) = String -> IO (InputStream (Either String a))
forall a. String -> IO a
bomb String
e
go (PartialH ByteString -> HeaderParser (Parser a)
f) = InputStream ByteString -> IO (Maybe ByteString)
forall a. InputStream a -> IO (Maybe a)
Streams.read InputStream ByteString
input IO (Maybe ByteString)
-> (Maybe ByteString -> IO (InputStream (Either String a)))
-> IO (InputStream (Either String a))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HeaderParser (Parser a) -> IO (InputStream (Either String a))
go (HeaderParser (Parser a) -> IO (InputStream (Either String a)))
-> (Maybe ByteString -> HeaderParser (Parser a))
-> Maybe ByteString
-> IO (InputStream (Either String a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HeaderParser (Parser a)
-> (ByteString -> HeaderParser (Parser a))
-> Maybe ByteString
-> HeaderParser (Parser a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ByteString -> HeaderParser (Parser a)
f ByteString
BS.empty) ByteString -> HeaderParser (Parser a)
f
go (DoneH Header
_ Parser a
p) = do
IORef [Either String a]
queue <- [Either String a] -> IO (IORef [Either String a])
forall a. a -> IO (IORef a)
newIORef []
IORef (Maybe (Parser a))
parser <- Maybe (Parser a) -> IO (IORef (Maybe (Parser a)))
forall a. a -> IO (IORef a)
newIORef (Parser a -> Maybe (Parser a)
forall a. a -> Maybe a
Just Parser a
p)
IO (Maybe (Either String a)) -> IO (InputStream (Either String a))
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
forall a.
IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
dispatch IORef [Either String a]
queue IORef (Maybe (Parser a))
parser InputStream ByteString
input)
onlyValidRecords :: InputStream (Either String a)
-> IO (InputStream a)
onlyValidRecords :: InputStream (Either String a) -> IO (InputStream a)
onlyValidRecords InputStream (Either String a)
input = IO (Maybe a) -> IO (InputStream a)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IO (Maybe a) -> IO (InputStream a))
-> IO (Maybe a) -> IO (InputStream a)
forall a b. (a -> b) -> a -> b
$ do
Maybe (Either String a)
upstream <- InputStream (Either String a) -> IO (Maybe (Either String a))
forall a. InputStream a -> IO (Maybe a)
Streams.read InputStream (Either String a)
input
case Maybe (Either String a)
upstream of
Maybe (Either String a)
Nothing -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
Just (Left String
err) -> String -> IO (Maybe a)
forall a. String -> IO a
bomb (String
"invalid CSV row: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
err)
Just (Right a
x) -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
x)
dispatch :: forall a. IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
dispatch :: IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
dispatch IORef [Either String a]
queueRef IORef (Maybe (Parser a))
parserRef InputStream ByteString
input = do
[Either String a]
queue <- IORef [Either String a] -> IO [Either String a]
forall a. IORef a -> IO a
readIORef IORef [Either String a]
queueRef
case [Either String a]
queue of
[] -> do
Maybe (Parser a)
parser <- IORef (Maybe (Parser a)) -> IO (Maybe (Parser a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Parser a))
parserRef
case Maybe (Parser a)
parser of
Maybe (Parser a)
Nothing -> Maybe (Either String a) -> IO (Maybe (Either String a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Either String a)
forall a. Maybe a
Nothing
Just (Fail ByteString
_ String
e) -> String -> IO (Maybe (Either String a))
forall a. String -> IO a
bomb (String
"input data malformed: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
e)
Just (Many [] ByteString -> Parser a
f) -> (ByteString -> Parser a) -> IO (Maybe (Either String a))
more ByteString -> Parser a
f
Just (Many [Either String a]
xs ByteString -> Parser a
f) -> IORef (Maybe (Parser a)) -> Maybe (Parser a) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe (Parser a))
parserRef (Parser a -> Maybe (Parser a)
forall a. a -> Maybe a
Just (Parser a -> Maybe (Parser a)) -> Parser a -> Maybe (Parser a)
forall a b. (a -> b) -> a -> b
$ [Either String a] -> (ByteString -> Parser a) -> Parser a
forall a. [Either String a] -> (ByteString -> Parser a) -> Parser a
Many [] ByteString -> Parser a
f) IO ()
-> IO (Maybe (Either String a)) -> IO (Maybe (Either String a))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> [Either String a] -> IO (Maybe (Either String a))
feed [Either String a]
xs
Just (Done [Either String a]
xs ) -> IORef (Maybe (Parser a)) -> Maybe (Parser a) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe (Parser a))
parserRef Maybe (Parser a)
forall a. Maybe a
Nothing IO ()
-> IO (Maybe (Either String a)) -> IO (Maybe (Either String a))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> [Either String a] -> IO (Maybe (Either String a))
feed [Either String a]
xs
(Either String a
x:[Either String a]
xs) -> do
IORef [Either String a] -> [Either String a] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [Either String a]
queueRef [Either String a]
xs
Maybe (Either String a) -> IO (Maybe (Either String a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either String a -> Maybe (Either String a)
forall a. a -> Maybe a
Just Either String a
x)
where
more :: (ByteString -> Parser a) -> IO (Maybe (Either String a))
more :: (ByteString -> Parser a) -> IO (Maybe (Either String a))
more ByteString -> Parser a
f = do Maybe ByteString
bs <- InputStream ByteString -> IO (Maybe ByteString)
forall a. InputStream a -> IO (Maybe a)
Streams.read InputStream ByteString
input
IORef (Maybe (Parser a)) -> Maybe (Parser a) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe (Parser a))
parserRef (Parser a -> Maybe (Parser a)
forall a. a -> Maybe a
Just (Parser a -> Maybe (Parser a)) -> Parser a -> Maybe (Parser a)
forall a b. (a -> b) -> a -> b
$ Parser a
-> (ByteString -> Parser a) -> Maybe ByteString -> Parser a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (ByteString -> Parser a
f ByteString
BS.empty) ByteString -> Parser a
f Maybe ByteString
bs)
IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
forall a.
IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
dispatch IORef [Either String a]
queueRef IORef (Maybe (Parser a))
parserRef InputStream ByteString
input
feed :: [Either String a] -> IO (Maybe (Either String a))
feed :: [Either String a] -> IO (Maybe (Either String a))
feed [Either String a]
xs = do IORef [Either String a] -> [Either String a] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [Either String a]
queueRef [Either String a]
xs
IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
forall a.
IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
dispatch IORef [Either String a]
queueRef IORef (Maybe (Parser a))
parserRef InputStream ByteString
input
bomb :: String -> IO a
bomb :: String -> IO a
bomb = StreamDecodingError -> IO a
forall e a. Exception e => e -> IO a
throwIO (StreamDecodingError -> IO a)
-> (String -> StreamDecodingError) -> String -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> StreamDecodingError
StreamDecodingError