#include "inline.hs" -- | -- Module : Streamly.Internal.FileSystem.FD -- Copyright : (c) 2019 Composewell Technologies -- -- License : BSD3 -- Maintainer : streamly@composewell.com -- Stability : experimental -- Portability : GHC -- -- This module is a an experimental replacement for -- "Streamly.FileSystem.Handle". The former module provides IO facilities based -- on the GHC Handle type. The APIs in this module avoid the GHC handle layer -- and provide more explicit control over buffering. -- -- Read and write data as streams and arrays to and from files. -- -- This module provides read and write APIs based on handles. Before reading or -- writing, a file must be opened first using 'openFile'. The 'Handle' returned -- by 'openFile' is then used to access the file. A 'Handle' is backed by an -- operating system file descriptor. When the 'Handle' is garbage collected the -- underlying file descriptor is automatically closed. A handle can be -- explicitly closed using 'closeFile'. -- -- Reading and writing APIs are divided into two categories, sequential -- streaming APIs and random or seekable access APIs. File IO APIs are quite -- similar to "Streamly.Data.Array.Foreign" read write APIs. In that regard, arrays can -- be considered as in-memory files or files can be considered as on-disk -- arrays. -- -- > import qualified Streamly.Internal.FileSystem.FD as FD -- module Streamly.Internal.FileSystem.FD ( -- * File Handles Handle , stdin , stdout , stderr , openFile -- TODO file path based APIs -- , readFile -- , writeFile -- * Streaming IO -- | Streaming APIs read or write data to or from a file or device -- sequentially, they never perform a seek to a random location. When -- reading, the stream is lazy and generated on-demand as the consumer -- consumes it. Read IO requests to the IO device are performed in chunks -- of 32KiB, this is referred to as @defaultChunkSize@ in the -- documentation. One IO request may or may not read the full chunk. If the -- whole stream is not consumed, it is possible that we may read slightly -- more from the IO device than what the consumer needed. Unless specified -- otherwise in the API, writes are collected into chunks of -- @defaultChunkSize@ before they are written to the IO device. -- Streaming APIs work for all kind of devices, seekable or non-seekable; -- including disks, files, memory devices, terminals, pipes, sockets and -- fifos. While random access APIs work only for files or devices that have -- random access or seek capability for example disks, memory devices. -- Devices like terminals, pipes, sockets and fifos do not have random -- access capability. -- ** Read File to Stream , read -- , readUtf8 -- , readLines -- , readFrames , readInChunksOf -- -- * Array Read -- , readArrayUpto -- , readArrayOf , readArrays , readArraysOfUpto -- , readArraysOf -- ** Write File from Stream , write -- , writeUtf8 -- , writeUtf8Lines -- , writeFrames , writeInChunksOf -- -- * Array Write -- , writeArray , writeArrays , writeArraysPackedUpto -- XXX these are incomplete -- , writev -- , writevArraysPackedUpto -- -- * Random Access (Seek) -- -- | Unlike the streaming APIs listed above, these APIs apply to devices or -- files that have random access or seek capability. This type of devices -- include disks, files, memory devices and exclude terminals, pipes, -- sockets and fifos. -- -- , readIndex -- , readSlice -- , readSliceRev -- , readAt -- read from a given position to th end of file -- , readSliceArrayUpto -- , readSliceArrayOf -- , writeIndex -- , writeSlice -- , writeSliceRev -- , writeAt -- start writing at the given position -- , writeSliceArray ) where import Control.Monad.IO.Class (MonadIO(..)) import Data.Word (Word8) import Foreign.ForeignPtr (withForeignPtr) -- import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr) import Foreign.Ptr (plusPtr, castPtr) import Foreign.Storable (Storable(..)) import GHC.ForeignPtr (mallocPlainForeignPtrBytes) -- import System.IO (Handle, hGetBufSome, hPutBuf) import System.IO (IOMode) import Prelude hiding (read) import qualified GHC.IO.FD as FD import qualified GHC.IO.Device as RawIO import Streamly.Internal.Data.Array.Foreign.Type (Array(..), byteLength, unsafeFreeze) import Streamly.Internal.Data.Array.Foreign.Mut.Type (fromForeignPtrUnsafe, unsafeWithArrayContents) import Streamly.Internal.System.IO (defaultChunkSize) import Streamly.Internal.Data.Stream.Serial (SerialT) import Streamly.Internal.Data.Stream.IsStream.Type (IsStream, mkStream, fromStreamD) #if !defined(mingw32_HOST_OS) import Streamly.Internal.Data.Stream.IsStream.Type (toStreamD) import Streamly.Internal.System.IOVec (groupIOVecsOf) import qualified Streamly.Internal.FileSystem.FDIO as RawIO hiding (write) import qualified Streamly.Internal.System.IOVec.Type as RawIO #endif -- import Streamly.Data.Fold (Fold) -- import Streamly.String (encodeUtf8, decodeUtf8, foldLines) import qualified Streamly.Data.Array.Foreign as A import qualified Streamly.Internal.Data.Array.Stream.Foreign as AS import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Stream.StreamD.Type as D ------------------------------------------------------------------------------- -- References ------------------------------------------------------------------------------- -- -- The following references may be useful to build an understanding about the -- file API design: -- -- http://www.linux-mag.com/id/308/ for blocking/non-blocking IO on linux. -- https://lwn.net/Articles/612483/ Non-blocking buffered file read operations -- https://en.wikipedia.org/wiki/C_file_input/output for C APIs. -- https://docs.oracle.com/javase/tutorial/essential/io/file.html for Java API. -- https://www.w3.org/TR/FileAPI/ for http file API. ------------------------------------------------------------------------------- -- Handles ------------------------------------------------------------------------------- -- XXX attach a finalizer -- | A 'Handle' is returned by 'openFile' and is subsequently used to perform -- read and write operations on a file. -- newtype Handle = Handle FD.FD -- | File handle for standard input stdin :: Handle stdin :: Handle stdin = FD -> Handle Handle FD FD.stdin -- | File handle for standard output stdout :: Handle stdout :: Handle stdout = FD -> Handle Handle FD FD.stdout -- | File handle for standard error stderr :: Handle stderr :: Handle stderr = FD -> Handle Handle FD FD.stderr -- XXX we can support all the flags that the "open" system call supports. -- Instead of using RTS locking mechanism can we use system provided locking -- instead? -- -- | Open a file that is not a directory and return a file handle. -- 'openFile' enforces a multiple-reader single-writer locking on files. That -- is, there may either be many handles on the same file which manage input, or -- just one handle on the file which manages output. If any open handle is -- managing a file for output, no new handle can be allocated for that file. If -- any open handle is managing a file for input, new handles can only be -- allocated if they do not manage output. Whether two files are the same is -- implementation-dependent, but they should normally be the same if they have -- the same absolute path name and neither has been renamed, for example. -- openFile :: FilePath -> IOMode -> IO Handle openFile :: FilePath -> IOMode -> IO Handle openFile FilePath path IOMode mode = FD -> Handle Handle (FD -> Handle) -> ((FD, IODeviceType) -> FD) -> (FD, IODeviceType) -> Handle forall b c a. (b -> c) -> (a -> b) -> a -> c . (FD, IODeviceType) -> FD forall a b. (a, b) -> a fst ((FD, IODeviceType) -> Handle) -> IO (FD, IODeviceType) -> IO Handle forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> FilePath -> IOMode -> Bool -> IO (FD, IODeviceType) FD.openFile FilePath path IOMode mode Bool True ------------------------------------------------------------------------------- -- Array IO (Input) ------------------------------------------------------------------------------- -- | Read a 'ByteArray' from a file handle. If no data is available on the -- handle it blocks until some data becomes available. If data is available -- then it immediately returns that data without blocking. It reads a maximum -- of up to the size requested. {-# INLINABLE readArrayUpto #-} readArrayUpto :: Int -> Handle -> IO (Array Word8) readArrayUpto :: Int -> Handle -> IO (Array Word8) readArrayUpto Int size (Handle FD fd) = do ForeignPtr Word8 ptr <- Int -> IO (ForeignPtr Word8) forall a. Int -> IO (ForeignPtr a) mallocPlainForeignPtrBytes Int size -- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8)) ForeignPtr Word8 -> (Ptr Word8 -> IO (Array Word8)) -> IO (Array Word8) forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b withForeignPtr ForeignPtr Word8 ptr ((Ptr Word8 -> IO (Array Word8)) -> IO (Array Word8)) -> (Ptr Word8 -> IO (Array Word8)) -> IO (Array Word8) forall a b. (a -> b) -> a -> b $ \Ptr Word8 p -> do -- n <- hGetBufSome h p size #if MIN_VERSION_base(4,15,0) n <- RawIO.read fd p 0 size #else Int n <- FD -> Ptr Word8 -> Int -> IO Int forall a. RawIO a => a -> Ptr Word8 -> Int -> IO Int RawIO.read FD fd Ptr Word8 p Int size #endif -- XXX shrink only if the diff is significant -- Use unsafeFreezeWithShrink Array Word8 -> IO (Array Word8) forall (m :: * -> *) a. Monad m => a -> m a return (Array Word8 -> IO (Array Word8)) -> Array Word8 -> IO (Array Word8) forall a b. (a -> b) -> a -> b $ Array Word8 -> Array Word8 forall a. Array a -> Array a unsafeFreeze (Array Word8 -> Array Word8) -> Array Word8 -> Array Word8 forall a b. (a -> b) -> a -> b $ ForeignPtr Word8 -> Ptr Word8 -> Ptr Word8 -> Array Word8 forall a. ForeignPtr a -> Ptr a -> Ptr a -> Array a fromForeignPtrUnsafe ForeignPtr Word8 ptr (Ptr Word8 p Ptr Word8 -> Int -> Ptr Word8 forall a b. Ptr a -> Int -> Ptr b `plusPtr` Int n) (Ptr Word8 p Ptr Word8 -> Int -> Ptr Word8 forall a b. Ptr a -> Int -> Ptr b `plusPtr` Int size) ------------------------------------------------------------------------------- -- Array IO (output) ------------------------------------------------------------------------------- -- | Write an 'Array' to a file handle. -- -- @since 0.7.0 {-# INLINABLE writeArray #-} writeArray :: Storable a => Handle -> Array a -> IO () writeArray :: Handle -> Array a -> IO () writeArray Handle _ Array a arr | Array a -> Int forall a. Storable a => Array a -> Int A.length Array a arr Int -> Int -> Bool forall a. Eq a => a -> a -> Bool == Int 0 = () -> IO () forall (m :: * -> *) a. Monad m => a -> m a return () writeArray (Handle FD fd) Array a arr = ArrayContents -> Ptr a -> (Ptr a -> IO ()) -> IO () forall (m :: * -> *) a b. MonadIO m => ArrayContents -> Ptr a -> (Ptr a -> m b) -> m b unsafeWithArrayContents (Array a -> ArrayContents forall a. Array a -> ArrayContents arrContents Array a arr) (Array a -> Ptr a forall a. Array a -> Ptr a arrStart Array a arr) ((Ptr a -> IO ()) -> IO ()) -> (Ptr a -> IO ()) -> IO () forall a b. (a -> b) -> a -> b $ \Ptr a p -> -- RawIO.writeAll fd (castPtr p) aLen #if MIN_VERSION_base(4,15,0) RawIO.write fd (castPtr p) 0 aLen #else FD -> Ptr Word8 -> Int -> IO () forall a. RawIO a => a -> Ptr Word8 -> Int -> IO () RawIO.write FD fd (Ptr a -> Ptr Word8 forall a b. Ptr a -> Ptr b castPtr Ptr a p) Int aLen #endif {- -- Experiment to compare "writev" based IO with "write" based IO. iov <- A.newArray 1 let iov' = iov {aEnd = aBound iov} A.writeIndex iov' 0 (RawIO.IOVec (castPtr p) (fromIntegral aLen)) RawIO.writevAll fd (unsafeForeignPtrToPtr (aStart iov')) 1 -} where aLen :: Int aLen = Array a -> Int forall a. Array a -> Int byteLength Array a arr #if !defined(mingw32_HOST_OS) -- | Write an array of 'IOVec' to a file handle. -- -- @since 0.7.0 {-# INLINABLE writeIOVec #-} writeIOVec :: Handle -> Array RawIO.IOVec -> IO () writeIOVec :: Handle -> Array IOVec -> IO () writeIOVec Handle _ Array IOVec iov | Array IOVec -> Int forall a. Storable a => Array a -> Int A.length Array IOVec iov Int -> Int -> Bool forall a. Eq a => a -> a -> Bool == Int 0 = () -> IO () forall (m :: * -> *) a. Monad m => a -> m a return () writeIOVec (Handle FD fd) Array IOVec iov = ArrayContents -> Ptr IOVec -> (Ptr IOVec -> IO ()) -> IO () forall (m :: * -> *) a b. MonadIO m => ArrayContents -> Ptr a -> (Ptr a -> m b) -> m b unsafeWithArrayContents (Array IOVec -> ArrayContents forall a. Array a -> ArrayContents arrContents Array IOVec iov) (Array IOVec -> Ptr IOVec forall a. Array a -> Ptr a arrStart Array IOVec iov) ((Ptr IOVec -> IO ()) -> IO ()) -> (Ptr IOVec -> IO ()) -> IO () forall a b. (a -> b) -> a -> b $ \Ptr IOVec p -> FD -> Ptr IOVec -> Int -> IO () RawIO.writevAll FD fd Ptr IOVec p (Array IOVec -> Int forall a. Storable a => Array a -> Int A.length Array IOVec iov) #endif ------------------------------------------------------------------------------- -- Stream of Arrays IO ------------------------------------------------------------------------------- -- | @readArraysOfUpto size h@ reads a stream of arrays from file handle @h@. -- The maximum size of a single array is specified by @size@. The actual size -- read may be less than or equal to @size@. {-# INLINABLE _readArraysOfUpto #-} _readArraysOfUpto :: (IsStream t, MonadIO m) => Int -> Handle -> t m (Array Word8) _readArraysOfUpto :: Int -> Handle -> t m (Array Word8) _readArraysOfUpto Int size Handle h = t m (Array Word8) go where -- XXX use cons/nil instead go :: t m (Array Word8) go = (forall r. State Stream m (Array Word8) -> (Array Word8 -> t m (Array Word8) -> m r) -> (Array Word8 -> m r) -> m r -> m r) -> t m (Array Word8) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. IsStream t => (forall r. State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r) -> t m a mkStream ((forall r. State Stream m (Array Word8) -> (Array Word8 -> t m (Array Word8) -> m r) -> (Array Word8 -> m r) -> m r -> m r) -> t m (Array Word8)) -> (forall r. State Stream m (Array Word8) -> (Array Word8 -> t m (Array Word8) -> m r) -> (Array Word8 -> m r) -> m r -> m r) -> t m (Array Word8) forall a b. (a -> b) -> a -> b $ \State Stream m (Array Word8) _ Array Word8 -> t m (Array Word8) -> m r yld Array Word8 -> m r _ m r stp -> do Array Word8 arr <- IO (Array Word8) -> m (Array Word8) forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO (Array Word8) -> m (Array Word8)) -> IO (Array Word8) -> m (Array Word8) forall a b. (a -> b) -> a -> b $ Int -> Handle -> IO (Array Word8) readArrayUpto Int size Handle h if Array Word8 -> Int forall a. Storable a => Array a -> Int A.length Array Word8 arr Int -> Int -> Bool forall a. Eq a => a -> a -> Bool == Int 0 then m r stp else Array Word8 -> t m (Array Word8) -> m r yld Array Word8 arr t m (Array Word8) go {-# INLINE_NORMAL readArraysOfUpto #-} readArraysOfUpto :: (IsStream t, MonadIO m) => Int -> Handle -> t m (Array Word8) readArraysOfUpto :: Int -> Handle -> t m (Array Word8) readArraysOfUpto Int size Handle h = Stream m (Array Word8) -> t m (Array Word8) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. (IsStream t, Monad m) => Stream m a -> t m a fromStreamD ((State Stream m (Array Word8) -> () -> m (Step () (Array Word8))) -> () -> Stream m (Array Word8) forall (m :: * -> *) a s. (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a D.Stream State Stream m (Array Word8) -> () -> m (Step () (Array Word8)) forall (m :: * -> *) p p. MonadIO m => p -> p -> m (Step () (Array Word8)) step ()) where {-# INLINE_LATE step #-} step :: p -> p -> m (Step () (Array Word8)) step p _ p _ = do Array Word8 arr <- IO (Array Word8) -> m (Array Word8) forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO (Array Word8) -> m (Array Word8)) -> IO (Array Word8) -> m (Array Word8) forall a b. (a -> b) -> a -> b $ Int -> Handle -> IO (Array Word8) readArrayUpto Int size Handle h Step () (Array Word8) -> m (Step () (Array Word8)) forall (m :: * -> *) a. Monad m => a -> m a return (Step () (Array Word8) -> m (Step () (Array Word8))) -> Step () (Array Word8) -> m (Step () (Array Word8)) forall a b. (a -> b) -> a -> b $ case Array Word8 -> Int forall a. Storable a => Array a -> Int A.length Array Word8 arr of Int 0 -> Step () (Array Word8) forall s a. Step s a D.Stop Int _ -> Array Word8 -> () -> Step () (Array Word8) forall s a. a -> s -> Step s a D.Yield Array Word8 arr () -- XXX read 'Array a' instead of Word8 -- -- | @readArrays h@ reads a stream of arrays from file handle @h@. -- The maximum size of a single array is limited to @defaultChunkSize@. -- 'readArrays' ignores the prevailing 'TextEncoding' and 'NewlineMode' -- on the 'Handle'. -- -- > readArrays = readArraysOfUpto defaultChunkSize -- -- @since 0.7.0 {-# INLINE readArrays #-} readArrays :: (IsStream t, MonadIO m) => Handle -> t m (Array Word8) readArrays :: Handle -> t m (Array Word8) readArrays = Int -> Handle -> t m (Array Word8) forall (t :: (* -> *) -> * -> *) (m :: * -> *). (IsStream t, MonadIO m) => Int -> Handle -> t m (Array Word8) readArraysOfUpto Int defaultChunkSize ------------------------------------------------------------------------------- -- Read File to Stream ------------------------------------------------------------------------------- -- TODO for concurrent streams implement readahead IO. We can send multiple -- read requests at the same time. For serial case we can use async IO. We can -- also control the read throughput in mbps or IOPS. -- | @readInChunksOf chunkSize handle@ reads a byte stream from a file handle, -- reads are performed in chunks of up to @chunkSize@. The stream ends as soon -- as EOF is encountered. -- {-# INLINE readInChunksOf #-} readInChunksOf :: (IsStream t, MonadIO m) => Int -> Handle -> t m Word8 readInChunksOf :: Int -> Handle -> t m Word8 readInChunksOf Int chunkSize Handle h = t m (Array Word8) -> t m Word8 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a AS.concat (t m (Array Word8) -> t m Word8) -> t m (Array Word8) -> t m Word8 forall a b. (a -> b) -> a -> b $ Int -> Handle -> t m (Array Word8) forall (t :: (* -> *) -> * -> *) (m :: * -> *). (IsStream t, MonadIO m) => Int -> Handle -> t m (Array Word8) readArraysOfUpto Int chunkSize Handle h -- TODO -- read :: (IsStream t, MonadIO m, Storable a) => Handle -> t m a -- -- > read = 'readByChunks' A.defaultChunkSize -- | Generate a stream of elements of the given type from a file 'Handle'. The -- stream ends when EOF is encountered. -- -- @since 0.7.0 {-# INLINE read #-} read :: (IsStream t, MonadIO m) => Handle -> t m Word8 read :: Handle -> t m Word8 read = t m (Array Word8) -> t m Word8 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a AS.concat (t m (Array Word8) -> t m Word8) -> (Handle -> t m (Array Word8)) -> Handle -> t m Word8 forall b c a. (b -> c) -> (a -> b) -> a -> c . Handle -> t m (Array Word8) forall (t :: (* -> *) -> * -> *) (m :: * -> *). (IsStream t, MonadIO m) => Handle -> t m (Array Word8) readArrays ------------------------------------------------------------------------------- -- Writing ------------------------------------------------------------------------------- -- | Write a stream of arrays to a handle. -- -- @since 0.7.0 {-# INLINE writeArrays #-} writeArrays :: (MonadIO m, Storable a) => Handle -> SerialT m (Array a) -> m () writeArrays :: Handle -> SerialT m (Array a) -> m () writeArrays Handle h = (Array a -> m ()) -> SerialT m (Array a) -> m () forall (m :: * -> *) a b. Monad m => (a -> m b) -> SerialT m a -> m () S.mapM_ (IO () -> m () forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO () -> m ()) -> (Array a -> IO ()) -> Array a -> m () forall b c a. (b -> c) -> (a -> b) -> a -> c . Handle -> Array a -> IO () forall a. Storable a => Handle -> Array a -> IO () writeArray Handle h) -- | Write a stream of arrays to a handle after coalescing them in chunks of -- specified size. The chunk size is only a maximum and the actual writes could -- be smaller than that as we do not split the arrays to fit them to the -- specified size. -- -- @since 0.7.0 {-# INLINE writeArraysPackedUpto #-} writeArraysPackedUpto :: (MonadIO m, Storable a) => Int -> Handle -> SerialT m (Array a) -> m () writeArraysPackedUpto :: Int -> Handle -> SerialT m (Array a) -> m () writeArraysPackedUpto Int n Handle h SerialT m (Array a) xs = Handle -> SerialT m (Array a) -> m () forall (m :: * -> *) a. (MonadIO m, Storable a) => Handle -> SerialT m (Array a) -> m () writeArrays Handle h (SerialT m (Array a) -> m ()) -> SerialT m (Array a) -> m () forall a b. (a -> b) -> a -> b $ Int -> SerialT m (Array a) -> SerialT m (Array a) forall (m :: * -> *) a. (MonadIO m, Storable a) => Int -> SerialT m (Array a) -> SerialT m (Array a) AS.compact Int n SerialT m (Array a) xs #if !defined(mingw32_HOST_OS) -- XXX this is incomplete -- | Write a stream of 'IOVec' arrays to a handle. -- -- @since 0.7.0 {-# INLINE writev #-} writev :: MonadIO m => Handle -> SerialT m (Array RawIO.IOVec) -> m () writev :: Handle -> SerialT m (Array IOVec) -> m () writev Handle h = (Array IOVec -> m ()) -> SerialT m (Array IOVec) -> m () forall (m :: * -> *) a b. Monad m => (a -> m b) -> SerialT m a -> m () S.mapM_ (IO () -> m () forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (IO () -> m ()) -> (Array IOVec -> IO ()) -> Array IOVec -> m () forall b c a. (b -> c) -> (a -> b) -> a -> c . Handle -> Array IOVec -> IO () writeIOVec Handle h) -- XXX this is incomplete -- | Write a stream of arrays to a handle after grouping them in 'IOVec' arrays -- of up to a maximum total size. Writes are performed using gather IO via -- @writev@ system call. The maximum number of entries in each 'IOVec' group -- limited to 512. -- -- @since 0.7.0 {-# INLINE _writevArraysPackedUpto #-} _writevArraysPackedUpto :: MonadIO m => Int -> Handle -> SerialT m (Array a) -> m () _writevArraysPackedUpto :: Int -> Handle -> SerialT m (Array a) -> m () _writevArraysPackedUpto Int n Handle h SerialT m (Array a) xs = Handle -> SerialT m (Array IOVec) -> m () forall (m :: * -> *). MonadIO m => Handle -> SerialT m (Array IOVec) -> m () writev Handle h (SerialT m (Array IOVec) -> m ()) -> SerialT m (Array IOVec) -> m () forall a b. (a -> b) -> a -> b $ Stream m (Array IOVec) -> SerialT m (Array IOVec) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. (IsStream t, Monad m) => Stream m a -> t m a fromStreamD (Stream m (Array IOVec) -> SerialT m (Array IOVec)) -> Stream m (Array IOVec) -> SerialT m (Array IOVec) forall a b. (a -> b) -> a -> b $ Int -> Int -> Stream m (Array a) -> Stream m (Array IOVec) forall (m :: * -> *) a. MonadIO m => Int -> Int -> Stream m (Array a) -> Stream m (Array IOVec) groupIOVecsOf Int n Int 512 (SerialT m (Array a) -> Stream m (Array a) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. (IsStream t, Monad m) => t m a -> Stream m a toStreamD SerialT m (Array a) xs) #endif -- GHC buffer size dEFAULT_FD_BUFFER_SIZE=8192 bytes. -- -- XXX test this -- Note that if you use a chunk size less than 8K (GHC's default buffer -- size) then you are advised to use 'NOBuffering' mode on the 'Handle' in case you -- do not want buffering to occur at GHC level as well. Same thing applies to -- writes as well. -- | Like 'write' but provides control over the write buffer. Output will -- be written to the IO device as soon as we collect the specified number of -- input elements. -- -- @since 0.7.0 {-# INLINE writeInChunksOf #-} writeInChunksOf :: MonadIO m => Int -> Handle -> SerialT m Word8 -> m () writeInChunksOf :: Int -> Handle -> SerialT m Word8 -> m () writeInChunksOf Int n Handle h SerialT m Word8 m = Handle -> SerialT m (Array Word8) -> m () forall (m :: * -> *) a. (MonadIO m, Storable a) => Handle -> SerialT m (Array a) -> m () writeArrays Handle h (SerialT m (Array Word8) -> m ()) -> SerialT m (Array Word8) -> m () forall a b. (a -> b) -> a -> b $ Int -> SerialT m Word8 -> SerialT m (Array Word8) forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. (IsStream t, MonadIO m, Storable a) => Int -> t m a -> t m (Array a) AS.arraysOf Int n SerialT m Word8 m -- > write = 'writeInChunksOf' A.defaultChunkSize -- -- | Write a byte stream to a file handle. Combines the bytes in chunks of size -- up to 'A.defaultChunkSize' before writing. Note that the write behavior -- depends on the 'IOMode' and the current seek position of the handle. -- -- @since 0.7.0 {-# INLINE write #-} write :: MonadIO m => Handle -> SerialT m Word8 -> m () write :: Handle -> SerialT m Word8 -> m () write = Int -> Handle -> SerialT m Word8 -> m () forall (m :: * -> *). MonadIO m => Int -> Handle -> SerialT m Word8 -> m () writeInChunksOf Int defaultChunkSize {- {-# INLINE write #-} write :: (MonadIO m, Storable a) => Handle -> SerialT m a -> m () write = toHandleWith A.defaultChunkSize -} ------------------------------------------------------------------------------- -- IO with encoding/decoding Unicode characters ------------------------------------------------------------------------------- {- -- | -- > readUtf8 = decodeUtf8 . read -- -- Read a UTF8 encoded stream of unicode characters from a file handle. -- -- @since 0.7.0 {-# INLINE readUtf8 #-} readUtf8 :: (IsStream t, MonadIO m) => Handle -> t m Char readUtf8 = decodeUtf8 . read -- | -- > writeUtf8 h s = write h $ encodeUtf8 s -- -- Encode a stream of unicode characters to UTF8 and write it to the given file -- handle. Default block buffering applies to the writes. -- -- @since 0.7.0 {-# INLINE writeUtf8 #-} writeUtf8 :: MonadIO m => Handle -> SerialT m Char -> m () writeUtf8 h s = write h $ encodeUtf8 s -- | Write a stream of unicode characters after encoding to UTF-8 in chunks -- separated by a linefeed character @'\n'@. If the size of the buffer exceeds -- @defaultChunkSize@ and a linefeed is not yet found, the buffer is written -- anyway. This is similar to writing to a 'Handle' with the 'LineBuffering' -- option. -- -- @since 0.7.0 {-# INLINE writeUtf8ByLines #-} writeUtf8ByLines :: (IsStream t, MonadIO m) => Handle -> t m Char -> m () writeUtf8ByLines = undefined -- | Read UTF-8 lines from a file handle and apply the specified fold to each -- line. This is similar to reading a 'Handle' with the 'LineBuffering' option. -- -- @since 0.7.0 {-# INLINE readLines #-} readLines :: (IsStream t, MonadIO m) => Handle -> Fold m Char b -> t m b readLines h f = foldLines (readUtf8 h) f ------------------------------------------------------------------------------- -- Framing on a sequence ------------------------------------------------------------------------------- -- | Read a stream from a file handle and split it into frames delimited by -- the specified sequence of elements. The supplied fold is applied on each -- frame. -- -- @since 0.7.0 {-# INLINE readFrames #-} readFrames :: (IsStream t, MonadIO m, Storable a) => Array a -> Handle -> Fold m a b -> t m b readFrames = undefined -- foldFrames . read -- | Write a stream to the given file handle buffering up to frames separated -- by the given sequence or up to a maximum of @defaultChunkSize@. -- -- @since 0.7.0 {-# INLINE writeByFrames #-} writeByFrames :: (IsStream t, MonadIO m, Storable a) => Array a -> Handle -> t m a -> m () writeByFrames = undefined ------------------------------------------------------------------------------- -- Random Access IO (Seek) ------------------------------------------------------------------------------- -- XXX handles could be shared, so we may not want to use the handle state at -- all for these APIs. we can use pread and pwrite instead. On windows we will -- need to use readFile/writeFile with an offset argument. ------------------------------------------------------------------------------- -- | Read the element at the given index treating the file as an array. -- -- @since 0.7.0 {-# INLINE readIndex #-} readIndex :: Storable a => Handle -> Int -> Maybe a readIndex arr i = undefined -- NOTE: To represent a range to read we have chosen (start, size) instead of -- (start, end). This helps in removing the ambiguity of whether "end" is -- included in the range or not. -- -- We could avoid specifying the range to be read and instead use "take size" -- on the stream, but it may end up reading more and then consume it partially. -- | @readSliceWith chunkSize handle pos len@ reads up to @len@ bytes -- from @handle@ starting at the offset @pos@ from the beginning of the file. -- -- Reads are performed in chunks of size @chunkSize@. For block devices, to -- avoid reading partial blocks @chunkSize@ must align with the block size of -- the underlying device. If the underlying block size is unknown, it is a good -- idea to keep it a multiple 4KiB. This API ensures that the start of each -- chunk is aligned with @chunkSize@ from second chunk onwards. -- {-# INLINE readSliceWith #-} readSliceWith :: (IsStream t, MonadIO m, Storable a) => Int -> Handle -> Int -> Int -> t m a readSliceWith chunkSize h pos len = undefined -- | @readSlice h i count@ streams a slice from the file handle @h@ starting -- at index @i@ and reading up to @count@ elements in the forward direction -- ending at the index @i + count - 1@. -- -- @since 0.7.0 {-# INLINE readSlice #-} readSlice :: (IsStream t, MonadIO m, Storable a) => Handle -> Int -> Int -> t m a readSlice = readSliceWith defaultChunkSize -- | @readSliceRev h i count@ streams a slice from the file handle @h@ starting -- at index @i@ and reading up to @count@ elements in the reverse direction -- ending at the index @i - count + 1@. -- -- @since 0.7.0 {-# INLINE readSliceRev #-} readSliceRev :: (IsStream t, MonadIO m, Storable a) => Handle -> Int -> Int -> t m a readSliceRev h i count = undefined -- | Write the given element at the given index in the file. -- -- @since 0.7.0 {-# INLINE writeIndex #-} writeIndex :: (MonadIO m, Storable a) => Handle -> Int -> a -> m () writeIndex h i a = undefined -- | @writeSlice h i count stream@ writes a stream to the file handle @h@ -- starting at index @i@ and writing up to @count@ elements in the forward -- direction ending at the index @i + count - 1@. -- -- @since 0.7.0 {-# INLINE writeSlice #-} writeSlice :: (IsStream t, Monad m, Storable a) => Handle -> Int -> Int -> t m a -> m () writeSlice h i len s = undefined -- | @writeSliceRev h i count stream@ writes a stream to the file handle @h@ -- starting at index @i@ and writing up to @count@ elements in the reverse -- direction ending at the index @i - count + 1@. -- -- @since 0.7.0 {-# INLINE writeSliceRev #-} writeSliceRev :: (IsStream t, Monad m, Storable a) => Handle -> Int -> Int -> t m a -> m () writeSliceRev arr i len s = undefined -}