{-# LANGUAGE CPP #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UnboxedTuples #-}
{-# LANGUAGE FlexibleContexts #-}
#include "inline.hs"
module Streamly.Internal.Memory.Array.Types
(
Array (..)
, withNewArray
, newArray
, unsafeSnoc
, snoc
, spliceWithDoubling
, spliceTwo
, fromList
, fromListN
, fromStreamDN
, fromStreamDArraysOf
, FlattenState (..)
, flattenArrays
, flattenArraysRev
, packArraysChunksOf
, lpackArraysChunksOf
#if !defined(mingw32_HOST_OS)
, groupIOVecsOf
#endif
, splitOn
, breakOn
, unsafeIndexIO
, unsafeIndex
, length
, byteLength
, byteCapacity
, foldl'
, foldr
, splitAt
, toStreamD
, toStreamDRev
, toStreamK
, toStreamKRev
, toList
, toArrayMinChunk
, writeN
, writeNUnsafe
, writeNAligned
, writeNAlignedUnmanaged
, write
, writeAligned
, defaultChunkSize
, mkChunkSize
, mkChunkSizeKB
, unsafeInlineIO
, realloc
, shrinkToFit
, memcpy
, memcmp
, bytesToElemCount
, unlines
)
where
import Control.Exception (assert)
import Control.DeepSeq (NFData(..))
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Functor.Identity (runIdentity)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Data.Word (Word8)
import Foreign.C.String (CString)
import Foreign.C.Types (CSize(..), CInt(..))
import Foreign.ForeignPtr (withForeignPtr, touchForeignPtr)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (plusPtr, minusPtr, castPtr, nullPtr)
import Foreign.Storable (Storable(..))
import Prelude hiding (length, foldr, read, unlines, splitAt)
import Text.Read (readPrec, readListPrec, readListPrecDefault)
import GHC.Base (Addr#, nullAddr#, realWorld#, build)
import GHC.Exts (IsList, IsString(..))
import GHC.ForeignPtr (ForeignPtr(..), newForeignPtr_)
import GHC.IO (IO(IO), unsafePerformIO)
import GHC.Ptr (Ptr(..))
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.Strict (Tuple'(..))
import Streamly.Internal.Data.SVar (adaptState)
#if !defined(mingw32_HOST_OS)
import Streamly.FileSystem.FDIO (IOVec(..))
#endif
import qualified Streamly.Memory.Malloc as Malloc
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified GHC.Exts as Exts
#ifdef DEVBUILD
import qualified Data.Foldable as F
#endif
#if MIN_VERSION_base(4,10,0)
import Foreign.ForeignPtr (plusForeignPtr)
#else
import GHC.Base (Int(..), plusAddr#)
import GHC.ForeignPtr (ForeignPtr(..))
plusForeignPtr :: ForeignPtr a -> Int -> ForeignPtr b
plusForeignPtr (ForeignPtr addr c) (I# d) = ForeignPtr (plusAddr# addr d) c
#endif
data Array a =
#ifdef DEVBUILD
Storable a =>
#endif
Array
{ aStart :: {-# UNPACK #-} !(ForeignPtr a)
, aEnd :: {-# UNPACK #-} !(Ptr a)
, aBound :: {-# UNPACK #-} !(Ptr a)
}
foreign import ccall unsafe "string.h memcpy" c_memcpy
:: Ptr Word8 -> Ptr Word8 -> CSize -> IO (Ptr Word8)
foreign import ccall unsafe "string.h strlen" c_strlen
:: CString -> IO CSize
foreign import ccall unsafe "string.h memchr" c_memchr
:: Ptr Word8 -> Word8 -> CSize -> IO (Ptr Word8)
memcpy :: Ptr Word8 -> Ptr Word8 -> Int -> IO ()
memcpy dst src len = void (c_memcpy dst src (fromIntegral len))
foreign import ccall unsafe "string.h memcmp" c_memcmp
:: Ptr Word8 -> Ptr Word8 -> CSize -> IO CInt
{-# INLINE memcmp #-}
memcmp :: Ptr Word8 -> Ptr Word8 -> Int -> IO Bool
memcmp p1 p2 len = do
r <- c_memcmp p1 p2 (fromIntegral len)
return $ r == 0
{-# INLINE unsafeInlineIO #-}
unsafeInlineIO :: IO a -> a
unsafeInlineIO (IO m) = case m realWorld# of (# _, r #) -> r
{-# INLINE bytesToElemCount #-}
bytesToElemCount :: Storable a => a -> Int -> Int
bytesToElemCount x n =
let elemSize = sizeOf x
in n + elemSize - 1 `div` elemSize
{-# INLINE newArrayAlignedAllocWith #-}
newArrayAlignedAllocWith :: forall a. Storable a
=> (Int -> Int -> IO (ForeignPtr a)) -> Int -> Int -> IO (Array a)
newArrayAlignedAllocWith alloc alignSize count = do
let size = count * sizeOf (undefined :: a)
fptr <- alloc size alignSize
let p = unsafeForeignPtrToPtr fptr
return $ Array
{ aStart = fptr
, aEnd = p
, aBound = p `plusPtr` size
}
{-# INLINE newArrayAlignedUnmanaged #-}
newArrayAlignedUnmanaged :: forall a. Storable a => Int -> Int -> IO (Array a)
newArrayAlignedUnmanaged =
newArrayAlignedAllocWith Malloc.mallocForeignPtrAlignedUnmanagedBytes
{-# INLINE newArrayAligned #-}
newArrayAligned :: forall a. Storable a => Int -> Int -> IO (Array a)
newArrayAligned = newArrayAlignedAllocWith Malloc.mallocForeignPtrAlignedBytes
{-# INLINE newArray #-}
newArray :: forall a. Storable a => Int -> IO (Array a)
newArray = newArrayAligned (alignment (undefined :: a))
{-# INLINE withNewArray #-}
withNewArray :: forall a. Storable a => Int -> (Ptr a -> IO ()) -> IO (Array a)
withNewArray count f = do
arr <- newArray count
withForeignPtr (aStart arr) $ \p -> f p >> return arr
{-# INLINE unsafeSnoc #-}
unsafeSnoc :: forall a. Storable a => Array a -> a -> IO (Array a)
unsafeSnoc arr@Array{..} x = do
when (aEnd == aBound) $
error "BUG: unsafeSnoc: writing beyond array bounds"
poke aEnd x
touchForeignPtr aStart
return $ arr {aEnd = aEnd `plusPtr` sizeOf (undefined :: a)}
{-# INLINE snoc #-}
snoc :: forall a. Storable a => Array a -> a -> IO (Array a)
snoc arr@Array {..} x =
if aEnd == aBound
then do
let oldStart = unsafeForeignPtrToPtr aStart
size = aEnd `minusPtr` oldStart
newSize = size + sizeOf (undefined :: a)
newPtr <- Malloc.mallocForeignPtrAlignedBytes
newSize (alignment (undefined :: a))
withForeignPtr newPtr $ \pNew -> do
memcpy (castPtr pNew) (castPtr oldStart) size
poke (pNew `plusPtr` size) x
touchForeignPtr aStart
return $ Array
{ aStart = newPtr
, aEnd = pNew `plusPtr` (size + sizeOf (undefined :: a))
, aBound = pNew `plusPtr` newSize
}
else do
poke aEnd x
touchForeignPtr aStart
return $ arr {aEnd = aEnd `plusPtr` sizeOf (undefined :: a)}
{-# NOINLINE reallocAligned #-}
reallocAligned :: Int -> Int -> Array a -> IO (Array a)
reallocAligned alignSize newSize Array{..} = do
assert (aEnd <= aBound) (return ())
let oldStart = unsafeForeignPtrToPtr aStart
let size = aEnd `minusPtr` oldStart
newPtr <- Malloc.mallocForeignPtrAlignedBytes newSize alignSize
withForeignPtr newPtr $ \pNew -> do
memcpy (castPtr pNew) (castPtr oldStart) size
touchForeignPtr aStart
return $ Array
{ aStart = newPtr
, aEnd = pNew `plusPtr` size
, aBound = pNew `plusPtr` newSize
}
{-# INLINABLE realloc #-}
realloc :: forall a. Storable a => Int -> Array a -> IO (Array a)
realloc = reallocAligned (alignment (undefined :: a))
shrinkToFit :: forall a. Storable a => Array a -> IO (Array a)
shrinkToFit arr@Array{..} = do
assert (aEnd <= aBound) (return ())
let start = unsafeForeignPtrToPtr aStart
let used = aEnd `minusPtr` start
waste = aBound `minusPtr` aEnd
if used < 3 * waste
then realloc used arr
else return arr
{-# INLINE _fromCStringAddrUnsafe #-}
_fromCStringAddrUnsafe :: Addr# -> IO (Array Word8)
_fromCStringAddrUnsafe addr# = do
ptr <- newForeignPtr_ (castPtr cstr)
len <- c_strlen cstr
let n = fromIntegral len
let p = unsafeForeignPtrToPtr ptr
let end = p `plusPtr` n
return $ Array
{ aStart = ptr
, aEnd = end
, aBound = end
}
where
cstr :: CString
cstr = Ptr addr#
{-# INLINE_NORMAL unsafeIndexIO #-}
unsafeIndexIO :: forall a. Storable a => Array a -> Int -> IO a
unsafeIndexIO Array {..} i =
withForeignPtr aStart $ \p -> do
let elemSize = sizeOf (undefined :: a)
elemOff = p `plusPtr` (elemSize * i)
assert (i >= 0 && elemOff `plusPtr` elemSize <= aEnd)
(return ())
peek elemOff
{-# INLINE_NORMAL unsafeIndex #-}
unsafeIndex :: forall a. Storable a => Array a -> Int -> a
unsafeIndex arr i = let !r = unsafeInlineIO $ unsafeIndexIO arr i in r
{-# INLINE byteLength #-}
byteLength :: Array a -> Int
byteLength Array{..} =
let p = unsafeForeignPtrToPtr aStart
len = aEnd `minusPtr` p
in assert (len >= 0) len
{-# INLINE length #-}
length :: forall a. Storable a => Array a -> Int
length arr = byteLength arr `div` sizeOf (undefined :: a)
{-# INLINE byteCapacity #-}
byteCapacity :: Array a -> Int
byteCapacity Array{..} =
let p = unsafeForeignPtrToPtr aStart
len = aBound `minusPtr` p
in assert (len >= 0) len
{-# INLINE_NORMAL toStreamD #-}
toStreamD :: forall m a. (Monad m, Storable a) => Array a -> D.Stream m a
toStreamD Array{..} =
let p = unsafeForeignPtrToPtr aStart
in D.Stream step p
where
{-# INLINE_LATE step #-}
step _ p | p == aEnd = return D.Stop
step _ p = do
let !x = unsafeInlineIO $ do
r <- peek p
touchForeignPtr aStart
return r
return $ D.Yield x (p `plusPtr` sizeOf (undefined :: a))
{-# INLINE toStreamK #-}
toStreamK :: forall t m a. (K.IsStream t, Storable a) => Array a -> t m a
toStreamK Array{..} =
let p = unsafeForeignPtrToPtr aStart
in go p
where
go p | p == aEnd = K.nil
| otherwise =
let !x = unsafeInlineIO $ do
r <- peek p
touchForeignPtr aStart
return r
in x `K.cons` go (p `plusPtr` sizeOf (undefined :: a))
{-# INLINE_NORMAL toStreamDRev #-}
toStreamDRev :: forall m a. (Monad m, Storable a) => Array a -> D.Stream m a
toStreamDRev Array{..} =
let p = aEnd `plusPtr` negate (sizeOf (undefined :: a))
in D.Stream step p
where
{-# INLINE_LATE step #-}
step _ p | p < unsafeForeignPtrToPtr aStart = return D.Stop
step _ p = do
let !x = unsafeInlineIO $ do
r <- peek p
touchForeignPtr aStart
return r
return $ D.Yield x (p `plusPtr` negate (sizeOf (undefined :: a)))
{-# INLINE toStreamKRev #-}
toStreamKRev :: forall t m a. (K.IsStream t, Storable a) => Array a -> t m a
toStreamKRev Array {..} =
let p = aEnd `plusPtr` negate (sizeOf (undefined :: a))
in go p
where
go p | p < unsafeForeignPtrToPtr aStart = K.nil
| otherwise =
let !x = unsafeInlineIO $ do
r <- peek p
touchForeignPtr aStart
return r
in x `K.cons` go (p `plusPtr` negate (sizeOf (undefined :: a)))
{-# INLINE_NORMAL foldl' #-}
foldl' :: forall a b. Storable a => (b -> a -> b) -> b -> Array a -> b
foldl' f z arr = runIdentity $ D.foldl' f z $ toStreamD arr
{-# INLINE_NORMAL foldr #-}
foldr :: Storable a => (a -> b -> b) -> b -> Array a -> b
foldr f z arr = runIdentity $ D.foldr f z $ toStreamD arr
{-# INLINE_NORMAL writeNAllocWith #-}
writeNAllocWith :: forall m a. (MonadIO m, Storable a)
=> (Int -> IO (Array a)) -> Int -> Fold m a (Array a)
writeNAllocWith alloc n = Fold step initial extract
where
initial = liftIO $ alloc (max n 0)
step arr@(Array _ end bound) _ | end == bound = return arr
step (Array start end bound) x = do
liftIO $ poke end x
return $ Array start (end `plusPtr` sizeOf (undefined :: a)) bound
extract = return
{-# INLINE_NORMAL writeN #-}
writeN :: forall m a. (MonadIO m, Storable a) => Int -> Fold m a (Array a)
writeN = writeNAllocWith newArray
{-# INLINE_NORMAL writeNAligned #-}
writeNAligned :: forall m a. (MonadIO m, Storable a)
=> Int -> Int -> Fold m a (Array a)
writeNAligned alignSize = writeNAllocWith (newArrayAligned alignSize)
{-# INLINE_NORMAL writeNAlignedUnmanaged #-}
writeNAlignedUnmanaged :: forall m a. (MonadIO m, Storable a)
=> Int -> Int -> Fold m a (Array a)
writeNAlignedUnmanaged alignSize =
writeNAllocWith (newArrayAlignedUnmanaged alignSize)
data ArrayUnsafe a = ArrayUnsafe
{-# UNPACK #-} !(ForeignPtr a)
{-# UNPACK #-} !(Ptr a)
{-# INLINE_NORMAL writeNUnsafe #-}
writeNUnsafe :: forall m a. (MonadIO m, Storable a)
=> Int -> Fold m a (Array a)
writeNUnsafe n = Fold step initial extract
where
initial = do
(Array start end _) <- liftIO $ newArray (max n 0)
return $ ArrayUnsafe start end
step (ArrayUnsafe start end) x = do
liftIO $ poke end x
return $ ArrayUnsafe start (end `plusPtr` sizeOf (undefined :: a))
extract (ArrayUnsafe start end) = return $ Array start end end
{-# INLINE_NORMAL toArrayMinChunk #-}
toArrayMinChunk :: forall m a. (MonadIO m, Storable a)
=> Int -> Int -> Fold m a (Array a)
toArrayMinChunk alignSize elemCount = Fold step initial extract
where
insertElem (Array start end bound) x = do
liftIO $ poke end x
return $ Array start (end `plusPtr` sizeOf (undefined :: a)) bound
initial = do
when (elemCount < 0) $ error "toArrayMinChunk: elemCount is negative"
liftIO $ newArrayAligned alignSize elemCount
step arr@(Array start end bound) x | end == bound = do
let p = unsafeForeignPtrToPtr start
oldSize = end `minusPtr` p
newSize = max (oldSize * 2) 1
arr1 <- liftIO $ reallocAligned alignSize newSize arr
insertElem arr1 x
step arr x = insertElem arr x
extract = liftIO . shrinkToFit
{-# INLINE write #-}
write :: forall m a. (MonadIO m, Storable a) => Fold m a (Array a)
write = toArrayMinChunk (alignment (undefined :: a))
(bytesToElemCount (undefined :: a)
(mkChunkSize 1024))
{-# INLINE writeAligned #-}
writeAligned :: forall m a. (MonadIO m, Storable a)
=> Int -> Fold m a (Array a)
writeAligned alignSize =
toArrayMinChunk alignSize
(bytesToElemCount (undefined :: a)
(mkChunkSize 1024))
{-# INLINE_NORMAL fromStreamDN #-}
fromStreamDN :: forall m a. (MonadIO m, Storable a)
=> Int -> D.Stream m a -> m (Array a)
fromStreamDN limit str = do
arr <- liftIO $ newArray limit
end <- D.foldlM' fwrite (aEnd arr) $ D.take limit str
return $ arr {aEnd = end}
where
fwrite ptr x = do
liftIO $ poke ptr x
return $ ptr `plusPtr` sizeOf (undefined :: a)
data GroupState s start end bound
= GroupStart s
| GroupBuffer s start end bound
| GroupYield start end bound (GroupState s start end bound)
| GroupFinish
{-# INLINE_NORMAL fromStreamDArraysOf #-}
fromStreamDArraysOf :: forall m a. (MonadIO m, Storable a)
=> Int -> D.Stream m a -> D.Stream m (Array a)
fromStreamDArraysOf n (D.Stream step state) =
D.Stream step' (GroupStart state)
where
{-# INLINE_LATE step' #-}
step' _ (GroupStart st) = do
when (n <= 0) $
error $ "Streamly.Internal.Memory.Array.Types.fromStreamDArraysOf: the size of "
++ "arrays [" ++ show n ++ "] must be a natural number"
Array start end bound <- liftIO $ newArray n
return $ D.Skip (GroupBuffer st start end bound)
step' gst (GroupBuffer st start end bound) = do
r <- step (adaptState gst) st
case r of
D.Yield x s -> do
liftIO $ poke end x
let end' = end `plusPtr` sizeOf (undefined :: a)
return $
if end' >= bound
then D.Skip (GroupYield start end' bound (GroupStart s))
else D.Skip (GroupBuffer s start end' bound)
D.Skip s -> return $ D.Skip (GroupBuffer s start end bound)
D.Stop -> return $ D.Skip (GroupYield start end bound GroupFinish)
step' _ (GroupYield start end bound next) =
return $ D.Yield (Array start end bound) next
step' _ GroupFinish = return D.Stop
data FlattenState s a =
OuterLoop s
| InnerLoop s !(ForeignPtr a) !(Ptr a) !(Ptr a)
{-# INLINE_NORMAL flattenArrays #-}
flattenArrays :: forall m a. (MonadIO m, Storable a)
=> D.Stream m (Array a) -> D.Stream m a
flattenArrays (D.Stream step state) = D.Stream step' (OuterLoop state)
where
{-# INLINE_LATE step' #-}
step' gst (OuterLoop st) = do
r <- step (adaptState gst) st
return $ case r of
D.Yield Array{..} s ->
let p = unsafeForeignPtrToPtr aStart
in D.Skip (InnerLoop s aStart p aEnd)
D.Skip s -> D.Skip (OuterLoop s)
D.Stop -> D.Stop
step' _ (InnerLoop st _ p end) | p == end =
return $ D.Skip $ OuterLoop st
step' _ (InnerLoop st startf p end) = do
x <- liftIO $ do
r <- peek p
touchForeignPtr startf
return r
return $ D.Yield x (InnerLoop st startf
(p `plusPtr` sizeOf (undefined :: a)) end)
{-# INLINE_NORMAL flattenArraysRev #-}
flattenArraysRev :: forall m a. (MonadIO m, Storable a)
=> D.Stream m (Array a) -> D.Stream m a
flattenArraysRev (D.Stream step state) = D.Stream step' (OuterLoop state)
where
{-# INLINE_LATE step' #-}
step' gst (OuterLoop st) = do
r <- step (adaptState gst) st
return $ case r of
D.Yield Array{..} s ->
let p = aEnd `plusPtr` negate (sizeOf (undefined :: a))
in D.Skip (InnerLoop s aStart p aEnd)
D.Skip s -> D.Skip (OuterLoop s)
D.Stop -> D.Stop
step' _ (InnerLoop st start p _) | p < unsafeForeignPtrToPtr start =
return $ D.Skip $ OuterLoop st
step' _ (InnerLoop st startf p end) = do
x <- liftIO $ do
r <- peek p
touchForeignPtr startf
return r
return $ D.Yield x (InnerLoop st startf
(p `plusPtr` negate (sizeOf (undefined :: a))) end)
{-# INLINE fromStreamD #-}
fromStreamD :: (MonadIO m, Storable a) => D.Stream m a -> m (Array a)
fromStreamD m = do
let s = fromStreamDArraysOf defaultChunkSize m
buffered <- D.foldr K.cons K.nil s
len <- K.foldl' (+) 0 (K.map length buffered)
fromStreamDN len $ flattenArrays $ D.fromStreamK buffered
{-# INLINE_LATE toListFB #-}
toListFB :: forall a b. Storable a => (a -> b -> b) -> b -> Array a -> b
toListFB c n Array{..} = go (unsafeForeignPtrToPtr aStart)
where
go p | p == aEnd = n
go p =
let !x = unsafeInlineIO $ do
r <- peek p
touchForeignPtr aStart
return r
in c x (go (p `plusPtr` sizeOf (undefined :: a)))
{-# INLINE toList #-}
toList :: Storable a => Array a -> [a]
toList s = build (\c n -> toListFB c n s)
instance (Show a, Storable a) => Show (Array a) where
{-# INLINE showsPrec #-}
showsPrec _ = shows . toList
{-# INLINABLE fromListN #-}
fromListN :: Storable a => Int -> [a] -> Array a
fromListN n xs = unsafePerformIO $ fromStreamDN n $ D.fromList xs
{-# INLINABLE fromList #-}
fromList :: Storable a => [a] -> Array a
fromList xs = unsafePerformIO $ fromStreamD $ D.fromList xs
instance (Storable a, Read a, Show a) => Read (Array a) where
{-# INLINE readPrec #-}
readPrec = fromList <$> readPrec
readListPrec = readListPrecDefault
instance (a ~ Char) => IsString (Array a) where
{-# INLINE fromString #-}
fromString = fromList
instance Storable a => IsList (Array a) where
type (Item (Array a)) = a
{-# INLINE fromList #-}
fromList = fromList
{-# INLINE fromListN #-}
fromListN = fromListN
{-# INLINE toList #-}
toList = toList
{-# INLINE arrcmp #-}
arrcmp :: Array a -> Array a -> Bool
arrcmp arr1 arr2 =
let !res = unsafeInlineIO $ do
let ptr1 = unsafeForeignPtrToPtr $ aStart arr1
let ptr2 = unsafeForeignPtrToPtr $ aStart arr2
let len1 = aEnd arr1 `minusPtr` ptr1
let len2 = aEnd arr2 `minusPtr` ptr2
if len1 == len2
then do
r <- memcmp (castPtr ptr1) (castPtr ptr2) len1
touchForeignPtr $ aStart arr1
touchForeignPtr $ aStart arr2
return r
else return False
in res
instance (Storable a, Eq a) => Eq (Array a) where
{-# INLINE (==) #-}
(==) = arrcmp
instance (Storable a, NFData a) => NFData (Array a) where
{-# INLINE rnf #-}
rnf = foldl' (\_ x -> rnf x) ()
instance (Storable a, Ord a) => Ord (Array a) where
{-# INLINE compare #-}
compare arr1 arr2 = unsafePerformIO $
D.cmpBy compare (toStreamD arr1) (toStreamD arr2)
{-# INLINE (<) #-}
x < y = case compare x y of { LT -> True; _ -> False }
{-# INLINE (<=) #-}
x <= y = case compare x y of { GT -> False; _ -> True }
{-# INLINE (>) #-}
x > y = case compare x y of { GT -> True; _ -> False }
{-# INLINE (>=) #-}
x >= y = case compare x y of { LT -> False; _ -> True }
{-# INLINE max #-}
max x y = if x <= y then y else x
{-# INLINE min #-}
min x y = if x <= y then x else y
#ifdef DEVBUILD
{-# INLINE_NORMAL toStreamD_ #-}
toStreamD_ :: forall m a. MonadIO m => Int -> Array a -> D.Stream m a
toStreamD_ size Array{..} =
let p = unsafeForeignPtrToPtr aStart
in D.Stream step p
where
{-# INLINE_LATE step #-}
step _ p | p == aEnd = return D.Stop
step _ p = do
x <- liftIO $ do
r <- peek p
touchForeignPtr aStart
return r
return $ D.Yield x (p `plusPtr` size)
{-# INLINE_NORMAL _foldr #-}
_foldr :: forall a b. (a -> b -> b) -> b -> Array a -> b
_foldr f z arr@Array {..} =
let !n = sizeOf (undefined :: a)
in unsafePerformIO $ D.foldr f z $ toStreamD_ n arr
instance Foldable Array where
foldr = _foldr
#endif
{-# INLINE spliceTwo #-}
spliceTwo :: (MonadIO m, Storable a) => Array a -> Array a -> m (Array a)
spliceTwo arr1 arr2 = do
let src1 = unsafeForeignPtrToPtr (aStart arr1)
src2 = unsafeForeignPtrToPtr (aStart arr2)
len1 = aEnd arr1 `minusPtr` src1
len2 = aEnd arr2 `minusPtr` src2
arr <- liftIO $ newArray (len1 + len2)
let dst = unsafeForeignPtrToPtr (aStart arr)
liftIO $ do
memcpy (castPtr dst) (castPtr src1) len1
touchForeignPtr (aStart arr1)
memcpy (castPtr (dst `plusPtr` len1)) (castPtr src2) len2
touchForeignPtr (aStart arr2)
return arr { aEnd = dst `plusPtr` (len1 + len2) }
instance Storable a => Semigroup (Array a) where
arr1 <> arr2 = unsafePerformIO $ spliceTwo arr1 arr2
nullForeignPtr :: ForeignPtr a
nullForeignPtr = ForeignPtr nullAddr# (error "nullForeignPtr")
nil ::
#ifdef DEVBUILD
Storable a =>
#endif
Array a
nil = Array nullForeignPtr (Ptr nullAddr#) (Ptr nullAddr#)
instance Storable a => Monoid (Array a) where
mempty = nil
mappend = (<>)
allocOverhead :: Int
allocOverhead = 2 * sizeOf (undefined :: Int)
mkChunkSize :: Int -> Int
mkChunkSize n = let size = n - allocOverhead in max size 0
mkChunkSizeKB :: Int -> Int
mkChunkSizeKB n = mkChunkSize (n * k)
where k = 1024
defaultChunkSize :: Int
defaultChunkSize = mkChunkSizeKB 32
{-# INLINE_NORMAL unlines #-}
unlines :: forall m a. (MonadIO m, Storable a)
=> a -> D.Stream m (Array a) -> D.Stream m a
unlines sep (D.Stream step state) = D.Stream step' (OuterLoop state)
where
{-# INLINE_LATE step' #-}
step' gst (OuterLoop st) = do
r <- step (adaptState gst) st
return $ case r of
D.Yield Array{..} s ->
let p = unsafeForeignPtrToPtr aStart
in D.Skip (InnerLoop s aStart p aEnd)
D.Skip s -> D.Skip (OuterLoop s)
D.Stop -> D.Stop
step' _ (InnerLoop st _ p end) | p == end =
return $ D.Yield sep $ OuterLoop st
step' _ (InnerLoop st startf p end) = do
x <- liftIO $ do
r <- peek p
touchForeignPtr startf
return r
return $ D.Yield x (InnerLoop st startf
(p `plusPtr` sizeOf (undefined :: a)) end)
{-# INLINE spliceWith #-}
spliceWith :: (MonadIO m) => Array a -> Array a -> m (Array a)
spliceWith dst@(Array _ end bound) src = liftIO $ do
let srcLen = byteLength src
if end `plusPtr` srcLen > bound
then error "Bug: spliceIntoUnsafe: Not enough space in the target array"
else
withForeignPtr (aStart dst) $ \_ ->
withForeignPtr (aStart src) $ \psrc -> do
let pdst = aEnd dst
memcpy (castPtr pdst) (castPtr psrc) srcLen
return $ dst { aEnd = pdst `plusPtr` srcLen }
{-# INLINE spliceWithDoubling #-}
spliceWithDoubling :: (MonadIO m, Storable a)
=> Array a -> Array a -> m (Array a)
spliceWithDoubling dst@(Array start end bound) src = do
assert (end <= bound) (return ())
let srcLen = aEnd src `minusPtr` unsafeForeignPtrToPtr (aStart src)
dst1 <-
if end `plusPtr` srcLen >= bound
then do
let oldStart = unsafeForeignPtrToPtr start
oldSize = end `minusPtr` oldStart
newSize = max (oldSize * 2) (oldSize + srcLen)
liftIO $ realloc newSize dst
else return dst
spliceWith dst1 src
data SpliceState s arr
= SpliceInitial s
| SpliceBuffering s arr
| SpliceYielding arr (SpliceState s arr)
| SpliceFinish
{-# INLINE_NORMAL packArraysChunksOf #-}
packArraysChunksOf :: (MonadIO m, Storable a)
=> Int -> D.Stream m (Array a) -> D.Stream m (Array a)
packArraysChunksOf n (D.Stream step state) =
D.Stream step' (SpliceInitial state)
where
{-# INLINE_LATE step' #-}
step' gst (SpliceInitial st) = do
when (n <= 0) $
error $ "Streamly.Internal.Memory.Array.Types.packArraysChunksOf: the size of "
++ "arrays [" ++ show n ++ "] must be a natural number"
r <- step gst st
case r of
D.Yield arr s -> return $
let len = byteLength arr
in if len >= n
then D.Skip (SpliceYielding arr (SpliceInitial s))
else D.Skip (SpliceBuffering s arr)
D.Skip s -> return $ D.Skip (SpliceInitial s)
D.Stop -> return D.Stop
step' gst (SpliceBuffering st buf) = do
r <- step gst st
case r of
D.Yield arr s -> do
let len = byteLength buf + byteLength arr
if len > n
then return $
D.Skip (SpliceYielding buf (SpliceBuffering s arr))
else do
buf' <- if byteCapacity buf < n
then liftIO $ realloc n buf
else return buf
buf'' <- spliceWith buf' arr
return $ D.Skip (SpliceBuffering s buf'')
D.Skip s -> return $ D.Skip (SpliceBuffering s buf)
D.Stop -> return $ D.Skip (SpliceYielding buf SpliceFinish)
step' _ SpliceFinish = return D.Stop
step' _ (SpliceYielding arr next) = return $ D.Yield arr next
{-# INLINE_NORMAL lpackArraysChunksOf #-}
lpackArraysChunksOf :: (MonadIO m, Storable a)
=> Int -> Fold m (Array a) () -> Fold m (Array a) ()
lpackArraysChunksOf n (Fold step1 initial1 extract1) =
Fold step initial extract
where
initial = do
when (n <= 0) $
error $ "Streamly.Internal.Memory.Array.Types.packArraysChunksOf: the size of "
++ "arrays [" ++ show n ++ "] must be a natural number"
r1 <- initial1
return (Tuple' Nothing r1)
extract (Tuple' Nothing r1) = extract1 r1
extract (Tuple' (Just buf) r1) = do
r <- step1 r1 buf
extract1 r
step (Tuple' Nothing r1) arr =
let len = byteLength arr
in if len >= n
then do
r <- step1 r1 arr
extract1 r
r1' <- initial1
return (Tuple' Nothing r1')
else return (Tuple' (Just arr) r1)
step (Tuple' (Just buf) r1) arr = do
let len = byteLength buf + byteLength arr
buf' <- if byteCapacity buf < len
then liftIO $ realloc (max n len) buf
else return buf
buf'' <- spliceWith buf' arr
if len >= n
then do
r <- step1 r1 buf''
extract1 r
r1' <- initial1
return (Tuple' Nothing r1')
else return (Tuple' (Just buf'') r1)
#if !defined(mingw32_HOST_OS)
data GatherState s arr
= GatherInitial s
| GatherBuffering s arr Int
| GatherYielding arr (GatherState s arr)
| GatherFinish
{-# INLINE_NORMAL groupIOVecsOf #-}
groupIOVecsOf :: MonadIO m
=> Int -> Int -> D.Stream m (Array a) -> D.Stream m (Array IOVec)
groupIOVecsOf n maxIOVLen (D.Stream step state) =
D.Stream step' (GatherInitial state)
where
{-# INLINE_LATE step' #-}
step' gst (GatherInitial st) = do
when (n <= 0) $
error $ "Streamly.Internal.Memory.Array.Types.groupIOVecsOf: the size of "
++ "groups [" ++ show n ++ "] must be a natural number"
when (maxIOVLen <= 0) $
error $ "Streamly.Internal.Memory.Array.Types.groupIOVecsOf: the number of "
++ "IOVec entries [" ++ show n ++ "] must be a natural number"
r <- step (adaptState gst) st
case r of
D.Yield arr s -> do
let p = unsafeForeignPtrToPtr (aStart arr)
len = byteLength arr
iov <- liftIO $ newArray maxIOVLen
iov' <- liftIO $ unsafeSnoc iov (IOVec (castPtr p)
(fromIntegral len))
if len >= n
then return $ D.Skip (GatherYielding iov' (GatherInitial s))
else return $ D.Skip (GatherBuffering s iov' len)
D.Skip s -> return $ D.Skip (GatherInitial s)
D.Stop -> return D.Stop
step' gst (GatherBuffering st iov len) = do
r <- step (adaptState gst) st
case r of
D.Yield arr s -> do
let p = unsafeForeignPtrToPtr (aStart arr)
alen = byteLength arr
len' = len + alen
if len' > n || length iov >= maxIOVLen
then do
iov' <- liftIO $ newArray maxIOVLen
iov'' <- liftIO $ unsafeSnoc iov' (IOVec (castPtr p)
(fromIntegral alen))
return $ D.Skip (GatherYielding iov
(GatherBuffering s iov'' alen))
else do
iov' <- liftIO $ unsafeSnoc iov (IOVec (castPtr p)
(fromIntegral alen))
return $ D.Skip (GatherBuffering s iov' len')
D.Skip s -> return $ D.Skip (GatherBuffering s iov len)
D.Stop -> return $ D.Skip (GatherYielding iov GatherFinish)
step' _ GatherFinish = return D.Stop
step' _ (GatherYielding iov next) = return $ D.Yield iov next
#endif
splitAt :: forall a. Storable a => Int -> Array a -> (Array a, Array a)
splitAt i arr@Array{..} =
let maxIndex = length arr - 1
in if i < 0
then error "sliceAt: negative array index"
else if i > maxIndex
then error $ "sliceAt: specified array index " ++ show i
++ " is beyond the maximum index " ++ show maxIndex
else let off = i * sizeOf (undefined :: a)
p = unsafeForeignPtrToPtr aStart `plusPtr` off
in ( Array
{ aStart = aStart
, aEnd = p
, aBound = p
}
, Array
{ aStart = aStart `plusForeignPtr` off
, aEnd = aEnd
, aBound = aBound
}
)
{-# INLINE breakOn #-}
breakOn :: MonadIO m
=> Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
breakOn sep arr@Array{..} = liftIO $ do
let p = unsafeForeignPtrToPtr aStart
loc <- c_memchr p sep (fromIntegral $ aEnd `minusPtr` p)
return $
if loc == nullPtr
then (arr, Nothing)
else
( Array
{ aStart = aStart
, aEnd = loc
, aBound = loc
}
, Just $ Array
{ aStart = aStart `plusForeignPtr` (loc `minusPtr` p + 1)
, aEnd = aEnd
, aBound = aBound
}
)
data SplitState s arr
= Initial s
| Buffering s arr
| Splitting s arr
| Yielding arr (SplitState s arr)
| Finishing
{-# INLINE_NORMAL splitOn #-}
splitOn
:: MonadIO m
=> Word8
-> D.Stream m (Array Word8)
-> D.Stream m (Array Word8)
splitOn byte (D.Stream step state) = D.Stream step' (Initial state)
where
{-# INLINE_LATE step' #-}
step' gst (Initial st) = do
r <- step gst st
case r of
D.Yield arr s -> do
(arr1, marr2) <- breakOn byte arr
return $ case marr2 of
Nothing -> D.Skip (Buffering s arr1)
Just arr2 -> D.Skip (Yielding arr1 (Splitting s arr2))
D.Skip s -> return $ D.Skip (Initial s)
D.Stop -> return D.Stop
step' gst (Buffering st buf) = do
r <- step gst st
case r of
D.Yield arr s -> do
(arr1, marr2) <- breakOn byte arr
buf' <- spliceTwo buf arr1
return $ case marr2 of
Nothing -> D.Skip (Buffering s buf')
Just x -> D.Skip (Yielding buf' (Splitting s x))
D.Skip s -> return $ D.Skip (Buffering s buf)
D.Stop -> return $
if byteLength buf == 0
then D.Stop
else D.Skip (Yielding buf Finishing)
step' _ (Splitting st buf) = do
(arr1, marr2) <- breakOn byte buf
return $ case marr2 of
Nothing -> D.Skip $ Buffering st arr1
Just arr2 -> D.Skip $ Yielding arr1 (Splitting st arr2)
step' _ (Yielding arr next) = return $ D.Yield arr next
step' _ Finishing = return D.Stop