{-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -w #-}

module Data.Hadoop.SequenceFile.Parser
    ( header
    , recordBlock
    ) where

import           Control.Applicative ((<$>), (<*>))
import           Control.Monad (when, unless, replicateM)
import           Data.Attoparsec.ByteString (Parser)
import qualified Data.Attoparsec.ByteString as A
import           Data.Bits ((.|.), xor, shiftL)
import           Data.ByteString (ByteString)
import qualified Data.ByteString as B
import           Data.ByteString.Internal (ByteString(..))
import           Data.Int
import           Data.Monoid ((<>))
import           Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import           Data.Word
import           Foreign.ForeignPtr (withForeignPtr)
import           Foreign.Storable (Storable, peekByteOff)
import           System.IO.Unsafe (unsafeDupablePerformIO)
import           Text.Printf (printf)

import           Data.Hadoop.SequenceFile.Types
import           Data.Hadoop.Unsafe
import           Data.Hadoop.Writable

------------------------------------------------------------------------

#include "MachDeps.h"

------------------------------------------------------------------------

-- | Attoparsec 'Parser' for sequence file headers.
header :: Parser Header
header = do
    magic <- A.take 3
    when (magic /= "SEQ")
         (fail "not a sequence file")

    version <- A.anyWord8
    when (version /= 6)
         (fail $ "unknown version: " ++ show version)

    hdKeyType   <- textWritable
    hdValueType <- textWritable

    hdCompression      <- anyBool
    hdBlockCompression <- anyBool

    unless (hdCompression && hdBlockCompression)
           (fail "only block compressed files supported")

    hdCompressionType <- textWritable

    unless (hdCompressionType == "org.apache.hadoop.io.compress.SnappyCodec")
           (fail "only snappy compressed files supported")

    hdMetadata <- metadata
    hdSync     <- md5

    return Header{..}

metadata :: Parser [(Text, Text)]
metadata = do
    n <- fromIntegral <$> anyWord32le
    replicateM n $ (,) <$> textWritable <*> textWritable

md5 :: Parser MD5
md5 = MD5 <$> A.take 16

------------------------------------------------------------------------

-- | Attoparsec 'Parser' for sequence file record blocks.
recordBlock :: forall ck k cv v. (Writable ck k, Writable cv v) => Header -> Parser (RecordBlock k v)
recordBlock Header{..} = do
    when (hdKeyType /= keyType)
         (fail $ "expected keys of type <" <> T.unpack keyType <> "> "
              <> "but file contains <" <> T.unpack hdKeyType <> ">")

    when (hdValueType /= valueType)
         (fail $ "expected values of type <" <> T.unpack valueType <> "> "
              <> "but file contains <" <> T.unpack hdValueType <> ">")

    escape <- anyWord32le
    when (escape /= 0xffffffff)
         (fail $ "file corrupt, expected to find sync escape " ++
                 "<0xffffffff> but was " ++ printf "<0x%0x>" escape)

    sync <- md5
    when (hdSync /= sync)
         (fail $ "file corrupt, expected to find sync marker " ++
                 "<" ++ show hdSync ++ "> but was <" ++ show sync ++ ">")

    rbCount      <- anyVInt
    keyLengths   <- vintPrefixedBytes
    keys         <- vintPrefixedBytes
    valueLengths <- vintPrefixedBytes
    values       <- vintPrefixedBytes

    let rbKeys   = decodeSnappyBlock rbCount keyLengths   keys
        rbValues = decodeSnappyBlock rbCount valueLengths values

    return RecordBlock{..}
  where
    keyType   = javaType (undefined :: k)
    valueType = javaType (undefined :: v)

------------------------------------------------------------------------

textWritable :: Parser Text
textWritable = T.decodeUtf8 <$> vintPrefixedBytes
{-# INLINE textWritable #-}

vintPrefixedBytes :: Parser ByteString
vintPrefixedBytes = A.take =<< anyVInt
{-# INLINE vintPrefixedBytes #-}

anyBool :: Parser Bool
anyBool = (/= 0) <$> A.anyWord8
{-# INLINE anyBool #-}

anyVInt :: Parser Int
anyVInt = fromIntegral <$> anyVInt64
{-# INLINE anyVInt #-}

anyVInt64 :: Parser Int64
anyVInt64 = withFirst . fromIntegral =<< A.anyWord8
  where
    withFirst :: Int8 -> Parser Int64
    withFirst x | size == 1 = return (fromIntegral x)
                | otherwise = fixupSign . B.foldl' go 0 <$> A.take (size - 1)
      where
        go :: Int64 -> Word8 -> Int64
        go i b = (i `shiftL` 8) .|. fromIntegral b

        size | x >= -112 = 1
             | x <  -120 = fromIntegral (-119 - x)
             | otherwise = fromIntegral (-111 - x)

        fixupSign v = if isNegative then v `xor` (-1) else v

        isNegative = x < -120 || (x >= -112 && x < 0)
{-# INLINE anyVInt64 #-}

------------------------------------------------------------------------

#ifdef WORDS_BIGENDIAN

anyWord32le :: Parser Word32
anyWord32le = anyWord32swap
{-# INLINE anyWord32le #-}

anyWord32be :: Parser Word32
anyWord32be = anyWord32host
{-# INLINE anyWord32be #-}

#else

anyWord32le :: Parser Word32
anyWord32le = anyWord32host
{-# INLINE anyWord32le #-}

anyWord32be :: Parser Word32
anyWord32be = anyWord32swap
{-# INLINE anyWord32be #-}

#endif

anyWord32host :: Parser Word32
anyWord32host = peekBS <$> A.take 4
{-# INLINE anyWord32host #-}

anyWord32swap :: Parser Word32
anyWord32swap = byteSwap32 <$> anyWord32host
{-# INLINE anyWord32swap #-}

peekBS :: Storable a => ByteString -> a
peekBS (PS fp off _) = unsafeDupablePerformIO $ withForeignPtr fp $ \ptr -> peekByteOff ptr off
{-# INLINE peekBS #-}