{-# LANGUAGE CPP #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
#include "inline.hs"
module Streamly.Internal.Memory.ArrayStream
(
arraysOf
, concat
, concatRev
, interpose
, interposeSuffix
, intercalateSuffix
, splitOn
, splitOnSuffix
, compact
, toArray
)
where
import Control.Monad.IO.Class (MonadIO(..))
import Data.Word (Word8)
import Foreign.ForeignPtr (withForeignPtr)
import Foreign.Ptr (minusPtr, plusPtr, castPtr)
import Foreign.Storable (Storable(..))
import Prelude hiding (length, null, last, map, (!!), read, concat)
import Streamly.Internal.Memory.Array.Types (Array(..), length)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream)
import qualified Streamly.Internal.Memory.Array as A
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Internal.Prelude as S
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.Prelude as P
{-# INLINE concat #-}
concat :: (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a
concat :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
t m (Array a) -> t m a
concat t m (Array a)
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.concatMapU forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array a)
m)
{-# INLINE concatRev #-}
concatRev :: (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a
concatRev :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
t m (Array a) -> t m a
concatRev t m (Array a)
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Stream m (Array a) -> Stream m a
A.flattenArraysRev (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array a)
m)
{-# INLINE interpose #-}
interpose :: (MonadIO m, IsStream t, Storable a) => a -> t m (Array a) -> t m a
interpose :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadIO m, IsStream t, Storable a) =>
a -> t m (Array a) -> t m a
interpose a
x = forall (t :: (* -> *) -> * -> *) (m :: * -> *) c b.
(IsStream t, Monad m) =>
c -> Unfold m b c -> t m b -> t m c
S.interpose a
x forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read
{-# INLINE intercalateSuffix #-}
intercalateSuffix :: (MonadIO m, IsStream t, Storable a)
=> Array a -> t m (Array a) -> t m a
intercalateSuffix :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadIO m, IsStream t, Storable a) =>
Array a -> t m (Array a) -> t m a
intercalateSuffix Array a
arr = forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c.
(IsStream t, Monad m) =>
b -> Unfold m b c -> t m b -> t m c
S.intercalateSuffix Array a
arr forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read
{-# INLINE interposeSuffix #-}
interposeSuffix :: (MonadIO m, IsStream t, Storable a)
=> a -> t m (Array a) -> t m a
interposeSuffix :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadIO m, IsStream t, Storable a) =>
a -> t m (Array a) -> t m a
interposeSuffix a
x = forall (t :: (* -> *) -> * -> *) (m :: * -> *) c b.
(IsStream t, Monad m) =>
c -> Unfold m b c -> t m b -> t m c
S.interposeSuffix a
x forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read
{-# INLINE splitOn #-}
splitOn
:: (IsStream t, MonadIO m)
=> Word8
-> t m (Array Word8)
-> t m (Array Word8)
splitOn :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Word8 -> t m (Array Word8) -> t m (Array Word8)
splitOn Word8
byte t m (Array Word8)
s =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (f :: * -> *) a.
Monad m =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBy (forall (m :: * -> *).
MonadIO m =>
Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
A.breakOn Word8
byte) forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
A.spliceTwo forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array Word8)
s
{-# INLINE splitOnSuffix #-}
splitOnSuffix
:: (IsStream t, MonadIO m)
=> Word8
-> t m (Array Word8)
-> t m (Array Word8)
splitOnSuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Word8 -> t m (Array Word8) -> t m (Array Word8)
splitOnSuffix Word8
byte t m (Array Word8)
s =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Eq (f a), Monoid (f a)) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBySuffix (forall (m :: * -> *).
MonadIO m =>
Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
A.breakOn Word8
byte) forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
A.spliceTwo forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array Word8)
s
{-# INLINE compact #-}
compact :: (MonadIO m, Storable a)
=> Int -> SerialT m (Array a) -> SerialT m (Array a)
compact :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> SerialT m (Array a) -> SerialT m (Array a)
compact Int
n SerialT m (Array a)
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m (Array a) -> Stream m (Array a)
A.packArraysChunksOf Int
n (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD SerialT m (Array a)
xs)
{-# INLINE arraysOf #-}
arraysOf :: (IsStream t, MonadIO m, Storable a)
=> Int -> t m a -> t m (Array a)
arraysOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
Int -> t m a -> t m (Array a)
arraysOf Int
n t m a
str =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m a -> Stream m (Array a)
A.fromStreamDArraysOf Int
n (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
str)
{-# INLINE spliceArraysLenUnsafe #-}
spliceArraysLenUnsafe :: (MonadIO m, Storable a)
=> Int -> SerialT m (Array a) -> m (Array a)
spliceArraysLenUnsafe :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> SerialT m (Array a) -> m (Array a)
spliceArraysLenUnsafe Int
len SerialT m (Array a)
buffered = do
Array a
arr <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Storable a => Int -> IO (Array a)
A.newArray Int
len
Ptr a
end <- forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> b -> SerialT m a -> m b
S.foldlM' forall {m :: * -> *} {a} {a} {b}.
MonadIO m =>
Ptr a -> Array a -> m (Ptr b)
writeArr (forall a. Array a -> Ptr a
aEnd Array a
arr) SerialT m (Array a)
buffered
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Array a
arr {aEnd :: Ptr a
aEnd = Ptr a
end}
where
writeArr :: Ptr a -> Array a -> m (Ptr b)
writeArr Ptr a
dst Array{Ptr a
ForeignPtr a
aBound :: forall a. Array a -> Ptr a
aStart :: forall a. Array a -> ForeignPtr a
aBound :: Ptr a
aEnd :: Ptr a
aStart :: ForeignPtr a
aEnd :: forall a. Array a -> Ptr a
..} =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr a
aStart forall a b. (a -> b) -> a -> b
$ \Ptr a
src -> do
let count :: Int
count = Ptr a
aEnd forall a b. Ptr a -> Ptr b -> Int
`minusPtr` Ptr a
src
Ptr Word8 -> Ptr Word8 -> Int -> IO ()
A.memcpy (forall a b. Ptr a -> Ptr b
castPtr Ptr a
dst) (forall a b. Ptr a -> Ptr b
castPtr Ptr a
src) Int
count
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Ptr a
dst forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
count
{-# INLINE _spliceArraysBuffered #-}
_spliceArraysBuffered :: (MonadIO m, Storable a)
=> SerialT m (Array a) -> m (Array a)
_spliceArraysBuffered :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m (Array a)
_spliceArraysBuffered SerialT m (Array a)
s = do
SerialT m (Array a)
buffered <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, IsStream t) =>
(a -> b -> b) -> b -> t m a -> m b
P.foldr forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil SerialT m (Array a)
s
Int
len <- forall (m :: * -> *) a. (Monad m, Num a) => SerialT m a -> m a
S.sum (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map forall a. Storable a => Array a -> Int
length SerialT m (Array a)
buffered)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> SerialT m (Array a) -> m (Array a)
spliceArraysLenUnsafe Int
len SerialT m (Array a)
s
{-# INLINE spliceArraysRealloced #-}
spliceArraysRealloced :: forall m a. (MonadIO m, Storable a)
=> SerialT m (Array a) -> m (Array a)
spliceArraysRealloced :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m (Array a)
spliceArraysRealloced SerialT m (Array a)
s = do
Array a
idst <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Storable a => Int -> IO (Array a)
A.newArray (forall a. Storable a => a -> Int -> Int
A.bytesToElemCount (forall a. HasCallStack => a
undefined :: a)
(Int -> Int
A.mkChunkSizeKB Int
4))
Array a
arr <- forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> b -> SerialT m a -> m b
S.foldlM' forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
A.spliceWithDoubling Array a
idst SerialT m (Array a)
s
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Storable a => Array a -> IO (Array a)
A.shrinkToFit Array a
arr
{-# INLINE toArray #-}
toArray :: (MonadIO m, Storable a) => SerialT m (Array a) -> m (Array a)
toArray :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m (Array a)
toArray = forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m (Array a)
spliceArraysRealloced