-- |
-- Module      : Streamly.Internal.Data.Ring
-- Copyright   : (c) 2021 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--

module Streamly.Internal.Data.Ring
    ( Ring(..)

    -- * Generation
    , createRing
    , writeLastN

    -- * Modification
    , seek
    , unsafeInsertRingWith

    -- * Conversion
    , toMutArray
    , toStreamWith
    ) where

#include "assert.hs"

import Control.Monad.IO.Class (liftIO, MonadIO)
import Streamly.Internal.Data.Stream.StreamD.Type (Stream)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Array.Generic.Mut.Type
    ( MutArray(..)
    , new
    , uninit
    , putIndexUnsafe
    , putSliceUnsafe
    )
-- import qualified Streamly.Internal.Data.Stream.StreamD.Type as Stream
import qualified Streamly.Internal.Data.Fold.Type as Fold

-- XXX Use MutableArray rather than keeping a MutArray here.
data Ring a = Ring
    { Ring a -> MutArray a
ringArr :: MutArray a
    -- XXX We can keep the current fill amount, Or we can keep a count of total
    -- elements inserted and compute ring head as well using mod on that,
    -- assuming it won't overflow. But mod could be expensive.
    , Ring a -> Int
ringHead :: !Int -- current index to be over-written
    , Ring a -> Int
ringMax :: !Int  -- first index beyond allocated memory
    }

-------------------------------------------------------------------------------
-- Generation
-------------------------------------------------------------------------------

-- XXX If we align the ringMax to nearest power of two then computation of the
-- index to write could be cheaper.
{-# INLINE createRing #-}
createRing :: MonadIO m => Int -> m (Ring a)
createRing :: Int -> m (Ring a)
createRing Int
count = IO (Ring a) -> m (Ring a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Ring a) -> m (Ring a)) -> IO (Ring a) -> m (Ring a)
forall a b. (a -> b) -> a -> b
$ do
    MutArray a
arr <- Int -> IO (MutArray a)
forall (m :: * -> *) a. MonadIO m => Int -> m (MutArray a)
new Int
count
    MutArray a
arr1 <- MutArray a -> Int -> IO (MutArray a)
forall (m :: * -> *) a.
MonadIO m =>
MutArray a -> Int -> m (MutArray a)
uninit MutArray a
arr Int
count
    Ring a -> IO (Ring a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Ring :: forall a. MutArray a -> Int -> Int -> Ring a
Ring
        { ringArr :: MutArray a
ringArr = MutArray a
arr1
        , ringHead :: Int
ringHead = Int
0
        , ringMax :: Int
ringMax = Int
count
        })


{-# INLINE writeLastN #-}
writeLastN :: MonadIO m => Int -> Fold m a (Ring a)
writeLastN :: Int -> Fold m a (Ring a)
writeLastN Int
n = (Tuple' (Ring a) Int
 -> a -> m (Step (Tuple' (Ring a) Int) (Ring a)))
-> m (Step (Tuple' (Ring a) Int) (Ring a))
-> (Tuple' (Ring a) Int -> m (Ring a))
-> Fold m a (Ring a)
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Tuple' (Ring a) Int -> a -> m (Step (Tuple' (Ring a) Int) (Ring a))
forall (m :: * -> *) b a b.
(MonadIO m, Num b) =>
Tuple' (Ring a) b -> a -> m (Step (Tuple' (Ring a) b) b)
step m (Step (Tuple' (Ring a) Int) (Ring a))
forall a a. m (Step (Tuple' (Ring a) Int) (Ring a))
initial Tuple' (Ring a) Int -> m (Ring a)
forall (m :: * -> *) a.
Monad m =>
Tuple' (Ring a) Int -> m (Ring a)
extract

    where

    initial :: m (Step (Tuple' (Ring a) Int) (Ring a))
initial = do
        if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
        then Ring a -> Step (Tuple' (Ring a) Int) (Ring a)
forall s b. b -> Step s b
Fold.Done (Ring a -> Step (Tuple' (Ring a) Int) (Ring a))
-> m (Ring a) -> m (Step (Tuple' (Ring a) Int) (Ring a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> m (Ring a)
forall (m :: * -> *) a. MonadIO m => Int -> m (Ring a)
createRing Int
0
        else do
            Ring a
rb <- Int -> m (Ring a)
forall (m :: * -> *) a. MonadIO m => Int -> m (Ring a)
createRing Int
n
            Step (Tuple' (Ring a) Int) (Ring a)
-> m (Step (Tuple' (Ring a) Int) (Ring a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Ring a) Int) (Ring a)
 -> m (Step (Tuple' (Ring a) Int) (Ring a)))
-> Step (Tuple' (Ring a) Int) (Ring a)
-> m (Step (Tuple' (Ring a) Int) (Ring a))
forall a b. (a -> b) -> a -> b
$ Tuple' (Ring a) Int -> Step (Tuple' (Ring a) Int) (Ring a)
forall s b. s -> Step s b
Fold.Partial (Tuple' (Ring a) Int -> Step (Tuple' (Ring a) Int) (Ring a))
-> Tuple' (Ring a) Int -> Step (Tuple' (Ring a) Int) (Ring a)
forall a b. (a -> b) -> a -> b
$ Ring a -> Int -> Tuple' (Ring a) Int
forall a b. a -> b -> Tuple' a b
Tuple' Ring a
rb (Int
0 :: Int)

    step :: Tuple' (Ring a) b -> a -> m (Step (Tuple' (Ring a) b) b)
step (Tuple' Ring a
rb b
cnt) a
x = do
        Int
rh1 <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ Ring a -> a -> IO Int
forall a. Ring a -> a -> IO Int
unsafeInsertRingWith Ring a
rb a
x
        Step (Tuple' (Ring a) b) b -> m (Step (Tuple' (Ring a) b) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Ring a) b) b -> m (Step (Tuple' (Ring a) b) b))
-> Step (Tuple' (Ring a) b) b -> m (Step (Tuple' (Ring a) b) b)
forall a b. (a -> b) -> a -> b
$ Tuple' (Ring a) b -> Step (Tuple' (Ring a) b) b
forall s b. s -> Step s b
Fold.Partial (Tuple' (Ring a) b -> Step (Tuple' (Ring a) b) b)
-> Tuple' (Ring a) b -> Step (Tuple' (Ring a) b) b
forall a b. (a -> b) -> a -> b
$ Ring a -> b -> Tuple' (Ring a) b
forall a b. a -> b -> Tuple' a b
Tuple' (Ring a
rb {ringHead :: Int
ringHead = Int
rh1}) (b
cnt b -> b -> b
forall a. Num a => a -> a -> a
+ b
1)

    extract :: Tuple' (Ring a) Int -> m (Ring a)
extract (Tuple' rb :: Ring a
rb@Ring{Int
MutArray a
ringMax :: Int
ringHead :: Int
ringArr :: MutArray a
ringMax :: forall a. Ring a -> Int
ringHead :: forall a. Ring a -> Int
ringArr :: forall a. Ring a -> MutArray a
..} Int
cnt) =
        Ring a -> m (Ring a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Ring a -> m (Ring a)) -> Ring a -> m (Ring a)
forall a b. (a -> b) -> a -> b
$
            if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
ringMax
            then MutArray a -> Int -> Int -> Ring a
forall a. MutArray a -> Int -> Int -> Ring a
Ring MutArray a
ringArr Int
0 Int
ringHead
            else Ring a
rb

-------------------------------------------------------------------------------
-- Modification
-------------------------------------------------------------------------------

-- XXX This is safe
-- Take the ring head and return the new ring head.
{-# INLINE unsafeInsertRingWith #-}
unsafeInsertRingWith :: Ring a -> a -> IO Int
unsafeInsertRingWith :: Ring a -> a -> IO Int
unsafeInsertRingWith Ring{Int
MutArray a
ringMax :: Int
ringHead :: Int
ringArr :: MutArray a
ringMax :: forall a. Ring a -> Int
ringHead :: forall a. Ring a -> Int
ringArr :: forall a. Ring a -> MutArray a
..} a
x = do
    assertM(Int
ringMax Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1)
    assertM(Int
ringHead Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
ringMax)
    Int -> MutArray a -> a -> IO ()
forall (m :: * -> *) a. MonadIO m => Int -> MutArray a -> a -> m ()
putIndexUnsafe Int
ringHead MutArray a
ringArr a
x
    let rh1 :: Int
rh1 = Int
ringHead Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
        next :: Int
next = if Int
rh1 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
ringMax then Int
0 else Int
rh1
    Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
next

-- | Move the ring head clockwise (+ve adj) or counter clockwise (-ve adj) by
-- the given amount.
{-# INLINE seek #-}
seek :: MonadIO m => Int -> Ring a -> m (Ring a)
seek :: Int -> Ring a -> m (Ring a)
seek Int
adj rng :: Ring a
rng@Ring{Int
MutArray a
ringMax :: Int
ringHead :: Int
ringArr :: MutArray a
ringMax :: forall a. Ring a -> Int
ringHead :: forall a. Ring a -> Int
ringArr :: forall a. Ring a -> MutArray a
..}
    | Int
ringMax Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 = IO (Ring a) -> m (Ring a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Ring a) -> m (Ring a)) -> IO (Ring a) -> m (Ring a)
forall a b. (a -> b) -> a -> b
$ do
        -- XXX try avoiding mod when in bounds
        let idx1 :: Int
idx1 = Int
ringHead Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
adj
            next :: Int
next = Int -> Int -> Int
forall a. Integral a => a -> a -> a
mod Int
idx1 Int
ringMax
        Ring a -> IO (Ring a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Ring a -> IO (Ring a)) -> Ring a -> IO (Ring a)
forall a b. (a -> b) -> a -> b
$ MutArray a -> Int -> Int -> Ring a
forall a. MutArray a -> Int -> Int -> Ring a
Ring MutArray a
ringArr Int
next Int
ringMax
    | Bool
otherwise = Ring a -> m (Ring a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ring a
rng

-------------------------------------------------------------------------------
-- Conversion
-------------------------------------------------------------------------------

-- | @toMutArray rignHeadAdjustment lengthToRead ring@.
-- Convert the ring into a boxed mutable array. Note that the returned MutArray
-- may share the same underlying memory as the Ring.
{-# INLINE toMutArray #-}
toMutArray :: MonadIO m => Int -> Int -> Ring a -> m (MutArray a)
toMutArray :: Int -> Int -> Ring a -> m (MutArray a)
toMutArray Int
adj Int
n Ring{Int
MutArray a
ringMax :: Int
ringHead :: Int
ringArr :: MutArray a
ringMax :: forall a. Ring a -> Int
ringHead :: forall a. Ring a -> Int
ringArr :: forall a. Ring a -> MutArray a
..} = do
    let len :: Int
len = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
ringMax Int
n
    let idx :: Int
idx = Int -> Int -> Int
forall a. Integral a => a -> a -> a
mod (Int
ringHead Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
adj) Int
ringMax
        end :: Int
end = Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
len
    if Int
end Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
ringMax
    then
        -- putSliceUnsafe ringArr idx arr1 0 len
        MutArray a -> m (MutArray a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MutArray a -> m (MutArray a)) -> MutArray a -> m (MutArray a)
forall a b. (a -> b) -> a -> b
$ MutArray a
ringArr { arrStart :: Int
arrStart = Int
idx, arrLen :: Int
arrLen = Int
len }
    else do
        -- XXX Just swap the elements in the existing ring and return the
        -- same array without reallocation.
        MutArray a
arr <- IO (MutArray a) -> m (MutArray a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MutArray a) -> m (MutArray a))
-> IO (MutArray a) -> m (MutArray a)
forall a b. (a -> b) -> a -> b
$ Int -> IO (MutArray a)
forall (m :: * -> *) a. MonadIO m => Int -> m (MutArray a)
new Int
len
        MutArray a
arr1 <- MutArray a -> Int -> m (MutArray a)
forall (m :: * -> *) a.
MonadIO m =>
MutArray a -> Int -> m (MutArray a)
uninit MutArray a
arr Int
len
        MutArray a -> Int -> MutArray a -> Int -> Int -> m ()
forall (m :: * -> *) a.
MonadIO m =>
MutArray a -> Int -> MutArray a -> Int -> Int -> m ()
putSliceUnsafe MutArray a
ringArr Int
idx MutArray a
arr1 Int
0 (Int
ringMax Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
idx)
        MutArray a -> Int -> MutArray a -> Int -> Int -> m ()
forall (m :: * -> *) a.
MonadIO m =>
MutArray a -> Int -> MutArray a -> Int -> Int -> m ()
putSliceUnsafe MutArray a
ringArr Int
0 MutArray a
arr1 (Int
ringMax Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
idx) (Int
end Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
ringMax)
        MutArray a -> m (MutArray a)
forall (m :: * -> *) a. Monad m => a -> m a
return MutArray a
arr1

-- This would be theoretically slower than toMutArray because of a branch
-- introduced for each element in the second half of the ring.

-- | Seek by n and then read the entire ring. Use 'take' on the stream to
-- restrict the reads.
toStreamWith :: Int -> Ring a -> Stream m a
toStreamWith :: Int -> Ring a -> Stream m a
toStreamWith = Int -> Ring a -> Stream m a
forall a. (?callStack::CallStack) => a
undefined
{-
toStreamWith n Ring{..}
    | ringMax > 0 = concatEffect $ liftIO $ do
        idx <- readIORef ringHead
        let idx1 = idx + adj
            next = mod idx1 ringMax
            s1 = undefined  -- stream initial slice
            s2 = undefined  -- stream next slice
        return (s1 `Stream.append` s2)
    | otherwise = Stream.nil
-}