{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RecordWildCards #-}
#include "inline.hs"
module Streamly.Internal.FileSystem.Handle
(
read
, readWithBufferOf
, toBytes
, toBytesWithBufferOf
, getBytes
, readChunks
, readChunksWithBufferOf
, toChunksWithBufferOf
, toChunks
, getChunks
, write
, write2
, writeWithBufferOf
, fromBytes
, fromBytesWithBufferOf
, writeArray
, writeChunks
, writeChunksWithBufferOf
, fromChunksWithBufferOf
, fromChunks
, putChunks
, putStrings
, putBytes
, putLines
)
where
import Control.Monad.IO.Class (MonadIO(..))
import Data.Word (Word8)
import Foreign.ForeignPtr (withForeignPtr)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (minusPtr, plusPtr)
import Foreign.Storable (Storable(..))
import GHC.ForeignPtr (mallocPlainForeignPtrBytes)
import System.IO (Handle, hGetBufSome, hPutBuf, stdin, stdout)
import Prelude hiding (read)
import Streamly (MonadAsync)
import Streamly.Data.Fold (Fold)
import Streamly.Internal.Data.Fold.Types (Fold2(..))
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Memory.Array.Types
(Array(..), writeNUnsafe, defaultChunkSize, shrinkToFit,
lpackArraysChunksOf)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream, mkStream)
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Fold.Types as FL
import qualified Streamly.Internal.Data.Unfold as UF
import qualified Streamly.Internal.Memory.Array as IA
import qualified Streamly.Internal.Memory.ArrayStream as AS
import qualified Streamly.Internal.Prelude as S
import qualified Streamly.Memory.Array as A
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
{-# INLINABLE readArrayUpto #-}
readArrayUpto :: Int -> Handle -> IO (Array Word8)
readArrayUpto :: Int -> Handle -> IO (Array Word8)
readArrayUpto Int
size Handle
h = do
ForeignPtr Word8
ptr <- forall a. Int -> IO (ForeignPtr a)
mallocPlainForeignPtrBytes Int
size
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr Word8
ptr forall a b. (a -> b) -> a -> b
$ \Ptr Word8
p -> do
Int
n <- forall a. Handle -> Ptr a -> Int -> IO Int
hGetBufSome Handle
h Ptr Word8
p Int
size
let v :: Array Word8
v = Array
{ aStart :: ForeignPtr Word8
aStart = ForeignPtr Word8
ptr
, aEnd :: Ptr Word8
aEnd = Ptr Word8
p forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
n
, aBound :: Ptr Word8
aBound = Ptr Word8
p forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
size
}
forall a. Storable a => Array a -> IO (Array a)
shrinkToFit Array Word8
v
{-# INLINABLE _toChunksWithBufferOf #-}
_toChunksWithBufferOf :: (IsStream t, MonadIO m)
=> Int -> Handle -> t m (Array Word8)
_toChunksWithBufferOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Int -> Handle -> t m (Array Word8)
_toChunksWithBufferOf Int
size Handle
h = t m (Array Word8)
go
where
go :: t m (Array Word8)
go = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m (Array Word8)
_ Array Word8 -> t m (Array Word8) -> m r
yld Array Word8 -> m r
_ m r
stp -> do
Array Word8
arr <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> Handle -> IO (Array Word8)
readArrayUpto Int
size Handle
h
if forall a. Storable a => Array a -> Int
A.length Array Word8
arr forall a. Eq a => a -> a -> Bool
== Int
0
then m r
stp
else Array Word8 -> t m (Array Word8) -> m r
yld Array Word8
arr t m (Array Word8)
go
{-# INLINE_NORMAL toChunksWithBufferOf #-}
toChunksWithBufferOf :: (IsStream t, MonadIO m) => Int -> Handle -> t m (Array Word8)
toChunksWithBufferOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Int -> Handle -> t m (Array Word8)
toChunksWithBufferOf Int
size Handle
h = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {m :: * -> *} {p} {p}.
MonadIO m =>
p -> p -> m (Step () (Array Word8))
step ())
where
{-# INLINE_LATE step #-}
step :: p -> p -> m (Step () (Array Word8))
step p
_ p
_ = do
Array Word8
arr <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> Handle -> IO (Array Word8)
readArrayUpto Int
size Handle
h
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
case forall a. Storable a => Array a -> Int
A.length Array Word8
arr of
Int
0 -> forall s a. Step s a
D.Stop
Int
_ -> forall s a. a -> s -> Step s a
D.Yield Array Word8
arr ()
{-# INLINE_NORMAL readChunksWithBufferOf #-}
readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Handle) (Array Word8)
readChunksWithBufferOf :: forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, Handle) (Array Word8)
readChunksWithBufferOf = forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold forall {m :: * -> *}.
MonadIO m =>
(Int, Handle) -> m (Step (Int, Handle) (Array Word8))
step forall (m :: * -> *) a. Monad m => a -> m a
return
where
{-# INLINE_LATE step #-}
step :: (Int, Handle) -> m (Step (Int, Handle) (Array Word8))
step (Int
size, Handle
h) = do
Array Word8
arr <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> Handle -> IO (Array Word8)
readArrayUpto Int
size Handle
h
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
case forall a. Storable a => Array a -> Int
A.length Array Word8
arr of
Int
0 -> forall s a. Step s a
D.Stop
Int
_ -> forall s a. a -> s -> Step s a
D.Yield Array Word8
arr (Int
size, Handle
h)
{-# INLINE toChunks #-}
toChunks :: (IsStream t, MonadIO m) => Handle -> t m (Array Word8)
toChunks :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Handle -> t m (Array Word8)
toChunks = forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Int -> Handle -> t m (Array Word8)
toChunksWithBufferOf Int
defaultChunkSize
{-# INLINE getChunks #-}
getChunks :: (IsStream t, MonadIO m) => t m (Array Word8)
getChunks :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
t m (Array Word8)
getChunks = forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Handle -> t m (Array Word8)
toChunks Handle
stdin
{-# INLINE getBytes #-}
getBytes :: (IsStream t, MonadIO m) => t m Word8
getBytes :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
t m Word8
getBytes = forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Handle -> t m Word8
toBytes Handle
stdin
{-# INLINE readChunks #-}
readChunks :: MonadIO m => Unfold m Handle (Array Word8)
readChunks :: forall (m :: * -> *). MonadIO m => Unfold m Handle (Array Word8)
readChunks = forall (m :: * -> *) a b c. Unfold m (a, b) c -> a -> Unfold m b c
UF.supplyFirst forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, Handle) (Array Word8)
readChunksWithBufferOf Int
defaultChunkSize
{-# INLINE readWithBufferOf #-}
readWithBufferOf :: MonadIO m => Unfold m (Int, Handle) Word8
readWithBufferOf :: forall (m :: * -> *). MonadIO m => Unfold m (Int, Handle) Word8
readWithBufferOf = forall (m :: * -> *) a b c.
Monad m =>
Unfold m a b -> Unfold m b c -> Unfold m a c
UF.concat forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, Handle) (Array Word8)
readChunksWithBufferOf forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read
{-# INLINE toBytesWithBufferOf #-}
toBytesWithBufferOf :: (IsStream t, MonadIO m) => Int -> Handle -> t m Word8
toBytesWithBufferOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Int -> Handle -> t m Word8
toBytesWithBufferOf Int
chunkSize Handle
h = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
t m (Array a) -> t m a
AS.concat forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Int -> Handle -> t m (Array Word8)
toChunksWithBufferOf Int
chunkSize Handle
h
{-# INLINE read #-}
read :: MonadIO m => Unfold m Handle Word8
read :: forall (m :: * -> *). MonadIO m => Unfold m Handle Word8
read = forall (m :: * -> *) a b c. Unfold m (a, b) c -> a -> Unfold m b c
UF.supplyFirst forall (m :: * -> *). MonadIO m => Unfold m (Int, Handle) Word8
readWithBufferOf Int
defaultChunkSize
{-# INLINE toBytes #-}
toBytes :: (IsStream t, MonadIO m) => Handle -> t m Word8
toBytes :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Handle -> t m Word8
toBytes = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
t m (Array a) -> t m a
AS.concat forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Handle -> t m (Array Word8)
toChunks
{-# INLINABLE writeArray #-}
writeArray :: Storable a => Handle -> Array a -> IO ()
writeArray :: forall a. Storable a => Handle -> Array a -> IO ()
writeArray Handle
_ Array a
arr | forall a. Storable a => Array a -> Int
A.length Array a
arr forall a. Eq a => a -> a -> Bool
== Int
0 = forall (m :: * -> *) a. Monad m => a -> m a
return ()
writeArray Handle
h Array{Ptr a
ForeignPtr a
aBound :: Ptr a
aEnd :: Ptr a
aStart :: ForeignPtr a
aBound :: forall a. Array a -> Ptr a
aEnd :: forall a. Array a -> Ptr a
aStart :: forall a. Array a -> ForeignPtr a
..} = forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr a
aStart forall a b. (a -> b) -> a -> b
$ \Ptr a
p -> forall a. Handle -> Ptr a -> Int -> IO ()
hPutBuf Handle
h Ptr a
p Int
aLen
where
aLen :: Int
aLen =
let p :: Ptr a
p = forall a. ForeignPtr a -> Ptr a
unsafeForeignPtrToPtr ForeignPtr a
aStart
in Ptr a
aEnd forall a b. Ptr a -> Ptr b -> Int
`minusPtr` Ptr a
p
{-# INLINE fromChunks #-}
fromChunks :: (MonadIO m, Storable a)
=> Handle -> SerialT m (Array a) -> m ()
fromChunks :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Handle -> SerialT m (Array a) -> m ()
fromChunks Handle
h = forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> SerialT m a -> m ()
S.mapM_ (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Storable a => Handle -> Array a -> IO ()
writeArray Handle
h)
{-# INLINE putChunks #-}
putChunks :: (MonadIO m, Storable a) => SerialT m (Array a) -> m ()
putChunks :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m ()
putChunks = forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Handle -> SerialT m (Array a) -> m ()
fromChunks Handle
stdout
{-# INLINE putStrings #-}
putStrings :: MonadAsync m
=> (SerialT m Char -> SerialT m Word8) -> SerialT m String -> m ()
putStrings :: forall (m :: * -> *).
MonadAsync m =>
(SerialT m Char -> SerialT m Word8) -> SerialT m String -> m ()
putStrings SerialT m Char -> SerialT m Word8
encode = forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m ()
putChunks forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM (forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m a -> m (Array a)
IA.fromStream forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m Char -> SerialT m Word8
encode forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
S.fromList)
{-# INLINE putLines #-}
putLines :: MonadAsync m
=> (SerialT m Char -> SerialT m Word8) -> SerialT m String -> m ()
putLines :: forall (m :: * -> *).
MonadAsync m =>
(SerialT m Char -> SerialT m Word8) -> SerialT m String -> m ()
putLines SerialT m Char -> SerialT m Word8
encode = forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m ()
putChunks forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM
(\String
xs -> forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m a -> m (Array a)
IA.fromStream forall a b. (a -> b) -> a -> b
$ SerialT m Char -> SerialT m Word8
encode (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
S.fromList (String
xs forall a. [a] -> [a] -> [a]
++ String
"\n")))
{-# INLINE putBytes #-}
putBytes :: MonadIO m => SerialT m Word8 -> m ()
putBytes :: forall (m :: * -> *). MonadIO m => SerialT m Word8 -> m ()
putBytes = forall (m :: * -> *).
MonadIO m =>
Handle -> SerialT m Word8 -> m ()
fromBytes Handle
stdout
{-# INLINE fromChunksWithBufferOf #-}
fromChunksWithBufferOf :: (MonadIO m, Storable a)
=> Int -> Handle -> SerialT m (Array a) -> m ()
fromChunksWithBufferOf :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Handle -> SerialT m (Array a) -> m ()
fromChunksWithBufferOf Int
n Handle
h SerialT m (Array a)
xs = forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Handle -> SerialT m (Array a) -> m ()
fromChunks Handle
h forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> SerialT m (Array a) -> SerialT m (Array a)
AS.compact Int
n SerialT m (Array a)
xs
{-# INLINE fromBytesWithBufferOf #-}
fromBytesWithBufferOf :: MonadIO m => Int -> Handle -> SerialT m Word8 -> m ()
fromBytesWithBufferOf :: forall (m :: * -> *).
MonadIO m =>
Int -> Handle -> SerialT m Word8 -> m ()
fromBytesWithBufferOf Int
n Handle
h SerialT m Word8
m = forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Handle -> SerialT m (Array a) -> m ()
fromChunks Handle
h forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
Int -> t m a -> t m (Array a)
S.arraysOf Int
n SerialT m Word8
m
{-# INLINE fromBytes #-}
fromBytes :: MonadIO m => Handle -> SerialT m Word8 -> m ()
fromBytes :: forall (m :: * -> *).
MonadIO m =>
Handle -> SerialT m Word8 -> m ()
fromBytes = forall (m :: * -> *).
MonadIO m =>
Int -> Handle -> SerialT m Word8 -> m ()
fromBytesWithBufferOf Int
defaultChunkSize
{-# INLINE writeChunks #-}
writeChunks :: (MonadIO m, Storable a) => Handle -> Fold m (Array a) ()
writeChunks :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Handle -> Fold m (Array a) ()
writeChunks Handle
h = forall (m :: * -> *) a b. Monad m => (a -> m b) -> Fold m a ()
FL.drainBy (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Storable a => Handle -> Array a -> IO ()
writeArray Handle
h)
{-# INLINE writeChunks2 #-}
writeChunks2 :: (MonadIO m, Storable a) => Fold2 m Handle (Array a) ()
writeChunks2 :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Fold2 m Handle (Array a) ()
writeChunks2 = forall (m :: * -> *) c a b s.
(s -> a -> m s) -> (c -> m s) -> (s -> m b) -> Fold2 m c a b
Fold2 (\Handle
h Array a
arr -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Storable a => Handle -> Array a -> IO ()
writeArray Handle
h Array a
arr forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return Handle
h) forall (m :: * -> *) a. Monad m => a -> m a
return (\Handle
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ())
{-# INLINE writeChunksWithBufferOf #-}
writeChunksWithBufferOf :: (MonadIO m, Storable a)
=> Int -> Handle -> Fold m (Array a) ()
writeChunksWithBufferOf :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Handle -> Fold m (Array a) ()
writeChunksWithBufferOf Int
n Handle
h = forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m (Array a) () -> Fold m (Array a) ()
lpackArraysChunksOf Int
n (forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Handle -> Fold m (Array a) ()
writeChunks Handle
h)
{-# INLINE writeWithBufferOf #-}
writeWithBufferOf :: MonadIO m => Int -> Handle -> Fold m Word8 ()
writeWithBufferOf :: forall (m :: * -> *). MonadIO m => Int -> Handle -> Fold m Word8 ()
writeWithBufferOf Int
n Handle
h = forall (m :: * -> *) a b c.
Monad m =>
Int -> Fold m a b -> Fold m b c -> Fold m a c
FL.lchunksOf Int
n (forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m a (Array a)
writeNUnsafe Int
n) (forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Handle -> Fold m (Array a) ()
writeChunks Handle
h)
{-# INLINE writeWithBufferOf2 #-}
writeWithBufferOf2 :: MonadIO m => Int -> Fold2 m Handle Word8 ()
writeWithBufferOf2 :: forall (m :: * -> *). MonadIO m => Int -> Fold2 m Handle Word8 ()
writeWithBufferOf2 Int
n = forall (m :: * -> *) a b x c.
Monad m =>
Int -> Fold m a b -> Fold2 m x b c -> Fold2 m x a c
FL.lchunksOf2 Int
n (forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m a (Array a)
writeNUnsafe Int
n) forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Fold2 m Handle (Array a) ()
writeChunks2
{-# INLINE write #-}
write :: MonadIO m => Handle -> Fold m Word8 ()
write :: forall (m :: * -> *). MonadIO m => Handle -> Fold m Word8 ()
write = forall (m :: * -> *). MonadIO m => Int -> Handle -> Fold m Word8 ()
writeWithBufferOf Int
defaultChunkSize
{-# INLINE write2 #-}
write2 :: MonadIO m => Fold2 m Handle Word8 ()
write2 :: forall (m :: * -> *). MonadIO m => Fold2 m Handle Word8 ()
write2 = forall (m :: * -> *). MonadIO m => Int -> Fold2 m Handle Word8 ()
writeWithBufferOf2 Int
defaultChunkSize