-- | Strategies for saving subscription position.
module MessageDb.Subscription.PositionStrategy
  ( LastPositionSaved,
    CurrentPosition,
    PositionSaved,
    PositionStrategy (..),
    dontSave,
    PositionUpdateInterval (..),
    writeToStream,
  )
where

import Control.Monad (void)
import Data.Coerce (coerce)
import qualified Database.PostgreSQL.Simple as Postgres
import qualified MessageDb.Functions as Functions
import qualified MessageDb.Message as Message
import MessageDb.StreamName (StreamName)
import MessageDb.Units (NumberOfMessages (..))
import Numeric.Natural (Natural)


type LastPositionSaved = Message.GlobalPosition

type CurrentPosition = Message.GlobalPosition

type PositionSaved = Message.GlobalPosition


-- | Strategy for saving and restoring a subscription's position.
data PositionStrategy = PositionStrategy
  { PositionStrategy -> IO GlobalPosition
restore :: IO Message.GlobalPosition
  , PositionStrategy
-> GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
save :: LastPositionSaved -> CurrentPosition -> IO (Maybe PositionSaved)
  }


-- | Start at zero and don't ever save the position.
dontSave :: PositionStrategy
dontSave :: PositionStrategy
dontSave =
  PositionStrategy :: IO GlobalPosition
-> (GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition))
-> PositionStrategy
PositionStrategy
    { restore :: IO GlobalPosition
restore = GlobalPosition -> IO GlobalPosition
forall (f :: * -> *) a. Applicative f => a -> f a
pure GlobalPosition
0
    , save :: GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
save = \GlobalPosition
_ GlobalPosition
_ -> Maybe GlobalPosition -> IO (Maybe GlobalPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe GlobalPosition
forall a. Maybe a
Nothing
    }


-- | Minimum difference between the current position and last position saved to save the position.
newtype PositionUpdateInterval = PositioUpdateInterval
  { PositionUpdateInterval -> NumberOfMessages
fromPositionUpdateInterval :: NumberOfMessages
  }
  deriving (PositionUpdateInterval -> PositionUpdateInterval -> Bool
(PositionUpdateInterval -> PositionUpdateInterval -> Bool)
-> (PositionUpdateInterval -> PositionUpdateInterval -> Bool)
-> Eq PositionUpdateInterval
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
$c/= :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
== :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
$c== :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
Eq, Eq PositionUpdateInterval
Eq PositionUpdateInterval
-> (PositionUpdateInterval -> PositionUpdateInterval -> Ordering)
-> (PositionUpdateInterval -> PositionUpdateInterval -> Bool)
-> (PositionUpdateInterval -> PositionUpdateInterval -> Bool)
-> (PositionUpdateInterval -> PositionUpdateInterval -> Bool)
-> (PositionUpdateInterval -> PositionUpdateInterval -> Bool)
-> (PositionUpdateInterval
    -> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
    -> PositionUpdateInterval -> PositionUpdateInterval)
-> Ord PositionUpdateInterval
PositionUpdateInterval -> PositionUpdateInterval -> Bool
PositionUpdateInterval -> PositionUpdateInterval -> Ordering
PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$cmin :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
max :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$cmax :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
>= :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
$c>= :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
> :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
$c> :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
<= :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
$c<= :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
< :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
$c< :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
compare :: PositionUpdateInterval -> PositionUpdateInterval -> Ordering
$ccompare :: PositionUpdateInterval -> PositionUpdateInterval -> Ordering
$cp1Ord :: Eq PositionUpdateInterval
Ord, Integer -> PositionUpdateInterval
PositionUpdateInterval -> PositionUpdateInterval
PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
(PositionUpdateInterval
 -> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
    -> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
    -> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval -> PositionUpdateInterval)
-> (Integer -> PositionUpdateInterval)
-> Num PositionUpdateInterval
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> PositionUpdateInterval
$cfromInteger :: Integer -> PositionUpdateInterval
signum :: PositionUpdateInterval -> PositionUpdateInterval
$csignum :: PositionUpdateInterval -> PositionUpdateInterval
abs :: PositionUpdateInterval -> PositionUpdateInterval
$cabs :: PositionUpdateInterval -> PositionUpdateInterval
negate :: PositionUpdateInterval -> PositionUpdateInterval
$cnegate :: PositionUpdateInterval -> PositionUpdateInterval
* :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$c* :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
- :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$c- :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
+ :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$c+ :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
Num, Num PositionUpdateInterval
Ord PositionUpdateInterval
Num PositionUpdateInterval
-> Ord PositionUpdateInterval
-> (PositionUpdateInterval -> Rational)
-> Real PositionUpdateInterval
PositionUpdateInterval -> Rational
forall a. Num a -> Ord a -> (a -> Rational) -> Real a
toRational :: PositionUpdateInterval -> Rational
$ctoRational :: PositionUpdateInterval -> Rational
$cp2Real :: Ord PositionUpdateInterval
$cp1Real :: Num PositionUpdateInterval
Real, Int -> PositionUpdateInterval
PositionUpdateInterval -> Int
PositionUpdateInterval -> [PositionUpdateInterval]
PositionUpdateInterval -> PositionUpdateInterval
PositionUpdateInterval
-> PositionUpdateInterval -> [PositionUpdateInterval]
PositionUpdateInterval
-> PositionUpdateInterval
-> PositionUpdateInterval
-> [PositionUpdateInterval]
(PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval -> PositionUpdateInterval)
-> (Int -> PositionUpdateInterval)
-> (PositionUpdateInterval -> Int)
-> (PositionUpdateInterval -> [PositionUpdateInterval])
-> (PositionUpdateInterval
    -> PositionUpdateInterval -> [PositionUpdateInterval])
-> (PositionUpdateInterval
    -> PositionUpdateInterval -> [PositionUpdateInterval])
-> (PositionUpdateInterval
    -> PositionUpdateInterval
    -> PositionUpdateInterval
    -> [PositionUpdateInterval])
-> Enum PositionUpdateInterval
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: PositionUpdateInterval
-> PositionUpdateInterval
-> PositionUpdateInterval
-> [PositionUpdateInterval]
$cenumFromThenTo :: PositionUpdateInterval
-> PositionUpdateInterval
-> PositionUpdateInterval
-> [PositionUpdateInterval]
enumFromTo :: PositionUpdateInterval
-> PositionUpdateInterval -> [PositionUpdateInterval]
$cenumFromTo :: PositionUpdateInterval
-> PositionUpdateInterval -> [PositionUpdateInterval]
enumFromThen :: PositionUpdateInterval
-> PositionUpdateInterval -> [PositionUpdateInterval]
$cenumFromThen :: PositionUpdateInterval
-> PositionUpdateInterval -> [PositionUpdateInterval]
enumFrom :: PositionUpdateInterval -> [PositionUpdateInterval]
$cenumFrom :: PositionUpdateInterval -> [PositionUpdateInterval]
fromEnum :: PositionUpdateInterval -> Int
$cfromEnum :: PositionUpdateInterval -> Int
toEnum :: Int -> PositionUpdateInterval
$ctoEnum :: Int -> PositionUpdateInterval
pred :: PositionUpdateInterval -> PositionUpdateInterval
$cpred :: PositionUpdateInterval -> PositionUpdateInterval
succ :: PositionUpdateInterval -> PositionUpdateInterval
$csucc :: PositionUpdateInterval -> PositionUpdateInterval
Enum, Enum PositionUpdateInterval
Real PositionUpdateInterval
Real PositionUpdateInterval
-> Enum PositionUpdateInterval
-> (PositionUpdateInterval
    -> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
    -> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
    -> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
    -> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
    -> PositionUpdateInterval
    -> (PositionUpdateInterval, PositionUpdateInterval))
-> (PositionUpdateInterval
    -> PositionUpdateInterval
    -> (PositionUpdateInterval, PositionUpdateInterval))
-> (PositionUpdateInterval -> Integer)
-> Integral PositionUpdateInterval
PositionUpdateInterval -> Integer
PositionUpdateInterval
-> PositionUpdateInterval
-> (PositionUpdateInterval, PositionUpdateInterval)
PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
forall a.
Real a
-> Enum a
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> (a, a))
-> (a -> a -> (a, a))
-> (a -> Integer)
-> Integral a
toInteger :: PositionUpdateInterval -> Integer
$ctoInteger :: PositionUpdateInterval -> Integer
divMod :: PositionUpdateInterval
-> PositionUpdateInterval
-> (PositionUpdateInterval, PositionUpdateInterval)
$cdivMod :: PositionUpdateInterval
-> PositionUpdateInterval
-> (PositionUpdateInterval, PositionUpdateInterval)
quotRem :: PositionUpdateInterval
-> PositionUpdateInterval
-> (PositionUpdateInterval, PositionUpdateInterval)
$cquotRem :: PositionUpdateInterval
-> PositionUpdateInterval
-> (PositionUpdateInterval, PositionUpdateInterval)
mod :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$cmod :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
div :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$cdiv :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
rem :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$crem :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
quot :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$cquot :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$cp2Integral :: Enum PositionUpdateInterval
$cp1Integral :: Real PositionUpdateInterval
Integral)
  deriving (Int -> PositionUpdateInterval -> ShowS
[PositionUpdateInterval] -> ShowS
PositionUpdateInterval -> String
(Int -> PositionUpdateInterval -> ShowS)
-> (PositionUpdateInterval -> String)
-> ([PositionUpdateInterval] -> ShowS)
-> Show PositionUpdateInterval
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PositionUpdateInterval] -> ShowS
$cshowList :: [PositionUpdateInterval] -> ShowS
show :: PositionUpdateInterval -> String
$cshow :: PositionUpdateInterval -> String
showsPrec :: Int -> PositionUpdateInterval -> ShowS
$cshowsPrec :: Int -> PositionUpdateInterval -> ShowS
Show) via NumberOfMessages


-- | Write the subscription's position to a stream.
writeToStream :: Functions.WithConnection -> PositionUpdateInterval -> StreamName -> PositionStrategy
writeToStream :: WithConnection
-> PositionUpdateInterval -> StreamName -> PositionStrategy
writeToStream WithConnection
withConnection PositionUpdateInterval
positionUpdateInterval StreamName
streamName =
  let messageType :: Message.MessageType
      messageType :: MessageType
messageType =
        MessageType
"GlobalPositionSaved"

      savePosition :: Postgres.Connection -> Message.GlobalPosition -> IO ()
      savePosition :: Connection -> GlobalPosition -> IO ()
savePosition Connection
connection GlobalPosition
position =
        IO (MessageId, StreamPosition) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (MessageId, StreamPosition) -> IO ())
-> IO (MessageId, StreamPosition) -> IO ()
forall a b. (a -> b) -> a -> b
$
          Connection
-> StreamName
-> MessageType
-> GlobalPosition
-> Maybe ()
-> Maybe ExpectedVersion
-> IO (MessageId, StreamPosition)
forall payload metadata.
(ToJSON payload, ToJSON metadata) =>
Connection
-> StreamName
-> MessageType
-> payload
-> Maybe metadata
-> Maybe ExpectedVersion
-> IO (MessageId, StreamPosition)
Functions.writeMessage @Message.GlobalPosition @()
            Connection
connection
            StreamName
streamName
            MessageType
messageType
            GlobalPosition
position
            Maybe ()
forall a. Maybe a
Nothing
            Maybe ExpectedVersion
forall a. Maybe a
Nothing

      restore :: IO Message.GlobalPosition
      restore :: IO GlobalPosition
restore = do
        Maybe Message
maybeMessage <- (Connection -> IO (Maybe Message)) -> IO (Maybe Message)
WithConnection
withConnection ((Connection -> IO (Maybe Message)) -> IO (Maybe Message))
-> (Connection -> IO (Maybe Message)) -> IO (Maybe Message)
forall a b. (a -> b) -> a -> b
$ \Connection
connection ->
          Connection -> StreamName -> IO (Maybe Message)
Functions.getLastStreamMessage Connection
connection StreamName
streamName

        GlobalPosition -> IO GlobalPosition
forall (f :: * -> *) a. Applicative f => a -> f a
pure (GlobalPosition -> IO GlobalPosition)
-> GlobalPosition -> IO GlobalPosition
forall a b. (a -> b) -> a -> b
$ case (Message -> Either String GlobalPosition)
-> Maybe Message -> Maybe (Either String GlobalPosition)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Payload -> Either String GlobalPosition
forall value. FromJSON value => Payload -> Either String value
Message.parsePayload (Payload -> Either String GlobalPosition)
-> (Message -> Payload) -> Message -> Either String GlobalPosition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message -> Payload
Message.messagePayload) Maybe Message
maybeMessage of
          Just (Right GlobalPosition
position) -> GlobalPosition
position
          Maybe (Either String GlobalPosition)
_ -> GlobalPosition
0

      save :: LastPositionSaved -> CurrentPosition -> IO (Maybe PositionSaved)
      save :: GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
save GlobalPosition
lastPositionSaved GlobalPosition
currentPosition =
        let interval :: Integer
interval = Natural -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural -> Integer) -> Natural -> Integer
forall a b. (a -> b) -> a -> b
$ PositionUpdateInterval -> Natural
coerce @_ @Natural PositionUpdateInterval
positionUpdateInterval
            difference :: Integer
difference = GlobalPosition -> Integer
Message.globalPositionToInteger (GlobalPosition -> Integer) -> GlobalPosition -> Integer
forall a b. (a -> b) -> a -> b
$ GlobalPosition
currentPosition GlobalPosition -> GlobalPosition -> GlobalPosition
forall a. Num a => a -> a -> a
- GlobalPosition
lastPositionSaved
         in if Integer
difference Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
< Integer
interval
              then Maybe GlobalPosition -> IO (Maybe GlobalPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe GlobalPosition
forall a. Maybe a
Nothing
              else do
                (Connection -> IO ()) -> IO ()
WithConnection
withConnection ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
connection ->
                  Connection -> GlobalPosition -> IO ()
savePosition Connection
connection GlobalPosition
currentPosition

                Maybe GlobalPosition -> IO (Maybe GlobalPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe GlobalPosition -> IO (Maybe GlobalPosition))
-> Maybe GlobalPosition -> IO (Maybe GlobalPosition)
forall a b. (a -> b) -> a -> b
$ GlobalPosition -> Maybe GlobalPosition
forall a. a -> Maybe a
Just GlobalPosition
currentPosition
   in PositionStrategy :: IO GlobalPosition
-> (GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition))
-> PositionStrategy
PositionStrategy{IO GlobalPosition
GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
save :: GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
restore :: IO GlobalPosition
save :: GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
restore :: IO GlobalPosition
..}