module MessageDb.Subscription.PositionStrategy
( LastPositionSaved,
CurrentPosition,
PositionSaved,
PositionStrategy (..),
dontSave,
PositionUpdateInterval (..),
writeToStream,
)
where
import Control.Monad (void)
import Data.Coerce (coerce)
import qualified Database.PostgreSQL.Simple as Postgres
import qualified MessageDb.Functions as Functions
import qualified MessageDb.Message as Message
import MessageDb.StreamName (StreamName)
import MessageDb.Units (NumberOfMessages (..))
import Numeric.Natural (Natural)
type LastPositionSaved = Message.GlobalPosition
type CurrentPosition = Message.GlobalPosition
type PositionSaved = Message.GlobalPosition
data PositionStrategy = PositionStrategy
{ PositionStrategy -> IO GlobalPosition
restore :: IO Message.GlobalPosition
, PositionStrategy
-> GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
save :: LastPositionSaved -> CurrentPosition -> IO (Maybe PositionSaved)
}
dontSave :: PositionStrategy
dontSave :: PositionStrategy
dontSave =
PositionStrategy :: IO GlobalPosition
-> (GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition))
-> PositionStrategy
PositionStrategy
{ restore :: IO GlobalPosition
restore = GlobalPosition -> IO GlobalPosition
forall (f :: * -> *) a. Applicative f => a -> f a
pure GlobalPosition
0
, save :: GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
save = \GlobalPosition
_ GlobalPosition
_ -> Maybe GlobalPosition -> IO (Maybe GlobalPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe GlobalPosition
forall a. Maybe a
Nothing
}
newtype PositionUpdateInterval = PositioUpdateInterval
{ PositionUpdateInterval -> NumberOfMessages
fromPositionUpdateInterval :: NumberOfMessages
}
deriving (PositionUpdateInterval -> PositionUpdateInterval -> Bool
(PositionUpdateInterval -> PositionUpdateInterval -> Bool)
-> (PositionUpdateInterval -> PositionUpdateInterval -> Bool)
-> Eq PositionUpdateInterval
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
$c/= :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
== :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
$c== :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
Eq, Eq PositionUpdateInterval
Eq PositionUpdateInterval
-> (PositionUpdateInterval -> PositionUpdateInterval -> Ordering)
-> (PositionUpdateInterval -> PositionUpdateInterval -> Bool)
-> (PositionUpdateInterval -> PositionUpdateInterval -> Bool)
-> (PositionUpdateInterval -> PositionUpdateInterval -> Bool)
-> (PositionUpdateInterval -> PositionUpdateInterval -> Bool)
-> (PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval)
-> Ord PositionUpdateInterval
PositionUpdateInterval -> PositionUpdateInterval -> Bool
PositionUpdateInterval -> PositionUpdateInterval -> Ordering
PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$cmin :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
max :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$cmax :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
>= :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
$c>= :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
> :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
$c> :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
<= :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
$c<= :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
< :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
$c< :: PositionUpdateInterval -> PositionUpdateInterval -> Bool
compare :: PositionUpdateInterval -> PositionUpdateInterval -> Ordering
$ccompare :: PositionUpdateInterval -> PositionUpdateInterval -> Ordering
$cp1Ord :: Eq PositionUpdateInterval
Ord, Integer -> PositionUpdateInterval
PositionUpdateInterval -> PositionUpdateInterval
PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
(PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval -> PositionUpdateInterval)
-> (Integer -> PositionUpdateInterval)
-> Num PositionUpdateInterval
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> PositionUpdateInterval
$cfromInteger :: Integer -> PositionUpdateInterval
signum :: PositionUpdateInterval -> PositionUpdateInterval
$csignum :: PositionUpdateInterval -> PositionUpdateInterval
abs :: PositionUpdateInterval -> PositionUpdateInterval
$cabs :: PositionUpdateInterval -> PositionUpdateInterval
negate :: PositionUpdateInterval -> PositionUpdateInterval
$cnegate :: PositionUpdateInterval -> PositionUpdateInterval
* :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$c* :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
- :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$c- :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
+ :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$c+ :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
Num, Num PositionUpdateInterval
Ord PositionUpdateInterval
Num PositionUpdateInterval
-> Ord PositionUpdateInterval
-> (PositionUpdateInterval -> Rational)
-> Real PositionUpdateInterval
PositionUpdateInterval -> Rational
forall a. Num a -> Ord a -> (a -> Rational) -> Real a
toRational :: PositionUpdateInterval -> Rational
$ctoRational :: PositionUpdateInterval -> Rational
$cp2Real :: Ord PositionUpdateInterval
$cp1Real :: Num PositionUpdateInterval
Real, Int -> PositionUpdateInterval
PositionUpdateInterval -> Int
PositionUpdateInterval -> [PositionUpdateInterval]
PositionUpdateInterval -> PositionUpdateInterval
PositionUpdateInterval
-> PositionUpdateInterval -> [PositionUpdateInterval]
PositionUpdateInterval
-> PositionUpdateInterval
-> PositionUpdateInterval
-> [PositionUpdateInterval]
(PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval -> PositionUpdateInterval)
-> (Int -> PositionUpdateInterval)
-> (PositionUpdateInterval -> Int)
-> (PositionUpdateInterval -> [PositionUpdateInterval])
-> (PositionUpdateInterval
-> PositionUpdateInterval -> [PositionUpdateInterval])
-> (PositionUpdateInterval
-> PositionUpdateInterval -> [PositionUpdateInterval])
-> (PositionUpdateInterval
-> PositionUpdateInterval
-> PositionUpdateInterval
-> [PositionUpdateInterval])
-> Enum PositionUpdateInterval
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: PositionUpdateInterval
-> PositionUpdateInterval
-> PositionUpdateInterval
-> [PositionUpdateInterval]
$cenumFromThenTo :: PositionUpdateInterval
-> PositionUpdateInterval
-> PositionUpdateInterval
-> [PositionUpdateInterval]
enumFromTo :: PositionUpdateInterval
-> PositionUpdateInterval -> [PositionUpdateInterval]
$cenumFromTo :: PositionUpdateInterval
-> PositionUpdateInterval -> [PositionUpdateInterval]
enumFromThen :: PositionUpdateInterval
-> PositionUpdateInterval -> [PositionUpdateInterval]
$cenumFromThen :: PositionUpdateInterval
-> PositionUpdateInterval -> [PositionUpdateInterval]
enumFrom :: PositionUpdateInterval -> [PositionUpdateInterval]
$cenumFrom :: PositionUpdateInterval -> [PositionUpdateInterval]
fromEnum :: PositionUpdateInterval -> Int
$cfromEnum :: PositionUpdateInterval -> Int
toEnum :: Int -> PositionUpdateInterval
$ctoEnum :: Int -> PositionUpdateInterval
pred :: PositionUpdateInterval -> PositionUpdateInterval
$cpred :: PositionUpdateInterval -> PositionUpdateInterval
succ :: PositionUpdateInterval -> PositionUpdateInterval
$csucc :: PositionUpdateInterval -> PositionUpdateInterval
Enum, Enum PositionUpdateInterval
Real PositionUpdateInterval
Real PositionUpdateInterval
-> Enum PositionUpdateInterval
-> (PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval)
-> (PositionUpdateInterval
-> PositionUpdateInterval
-> (PositionUpdateInterval, PositionUpdateInterval))
-> (PositionUpdateInterval
-> PositionUpdateInterval
-> (PositionUpdateInterval, PositionUpdateInterval))
-> (PositionUpdateInterval -> Integer)
-> Integral PositionUpdateInterval
PositionUpdateInterval -> Integer
PositionUpdateInterval
-> PositionUpdateInterval
-> (PositionUpdateInterval, PositionUpdateInterval)
PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
forall a.
Real a
-> Enum a
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> (a, a))
-> (a -> a -> (a, a))
-> (a -> Integer)
-> Integral a
toInteger :: PositionUpdateInterval -> Integer
$ctoInteger :: PositionUpdateInterval -> Integer
divMod :: PositionUpdateInterval
-> PositionUpdateInterval
-> (PositionUpdateInterval, PositionUpdateInterval)
$cdivMod :: PositionUpdateInterval
-> PositionUpdateInterval
-> (PositionUpdateInterval, PositionUpdateInterval)
quotRem :: PositionUpdateInterval
-> PositionUpdateInterval
-> (PositionUpdateInterval, PositionUpdateInterval)
$cquotRem :: PositionUpdateInterval
-> PositionUpdateInterval
-> (PositionUpdateInterval, PositionUpdateInterval)
mod :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$cmod :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
div :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$cdiv :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
rem :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$crem :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
quot :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$cquot :: PositionUpdateInterval
-> PositionUpdateInterval -> PositionUpdateInterval
$cp2Integral :: Enum PositionUpdateInterval
$cp1Integral :: Real PositionUpdateInterval
Integral)
deriving (Int -> PositionUpdateInterval -> ShowS
[PositionUpdateInterval] -> ShowS
PositionUpdateInterval -> String
(Int -> PositionUpdateInterval -> ShowS)
-> (PositionUpdateInterval -> String)
-> ([PositionUpdateInterval] -> ShowS)
-> Show PositionUpdateInterval
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PositionUpdateInterval] -> ShowS
$cshowList :: [PositionUpdateInterval] -> ShowS
show :: PositionUpdateInterval -> String
$cshow :: PositionUpdateInterval -> String
showsPrec :: Int -> PositionUpdateInterval -> ShowS
$cshowsPrec :: Int -> PositionUpdateInterval -> ShowS
Show) via NumberOfMessages
writeToStream :: Functions.WithConnection -> PositionUpdateInterval -> StreamName -> PositionStrategy
writeToStream :: WithConnection
-> PositionUpdateInterval -> StreamName -> PositionStrategy
writeToStream WithConnection
withConnection PositionUpdateInterval
positionUpdateInterval StreamName
streamName =
let messageType :: Message.MessageType
messageType :: MessageType
messageType =
MessageType
"GlobalPositionSaved"
savePosition :: Postgres.Connection -> Message.GlobalPosition -> IO ()
savePosition :: Connection -> GlobalPosition -> IO ()
savePosition Connection
connection GlobalPosition
position =
IO (MessageId, StreamPosition) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (MessageId, StreamPosition) -> IO ())
-> IO (MessageId, StreamPosition) -> IO ()
forall a b. (a -> b) -> a -> b
$
Connection
-> StreamName
-> MessageType
-> GlobalPosition
-> Maybe ()
-> Maybe ExpectedVersion
-> IO (MessageId, StreamPosition)
forall payload metadata.
(ToJSON payload, ToJSON metadata) =>
Connection
-> StreamName
-> MessageType
-> payload
-> Maybe metadata
-> Maybe ExpectedVersion
-> IO (MessageId, StreamPosition)
Functions.writeMessage @Message.GlobalPosition @()
Connection
connection
StreamName
streamName
MessageType
messageType
GlobalPosition
position
Maybe ()
forall a. Maybe a
Nothing
Maybe ExpectedVersion
forall a. Maybe a
Nothing
restore :: IO Message.GlobalPosition
restore :: IO GlobalPosition
restore = do
Maybe Message
maybeMessage <- (Connection -> IO (Maybe Message)) -> IO (Maybe Message)
WithConnection
withConnection ((Connection -> IO (Maybe Message)) -> IO (Maybe Message))
-> (Connection -> IO (Maybe Message)) -> IO (Maybe Message)
forall a b. (a -> b) -> a -> b
$ \Connection
connection ->
Connection -> StreamName -> IO (Maybe Message)
Functions.getLastStreamMessage Connection
connection StreamName
streamName
GlobalPosition -> IO GlobalPosition
forall (f :: * -> *) a. Applicative f => a -> f a
pure (GlobalPosition -> IO GlobalPosition)
-> GlobalPosition -> IO GlobalPosition
forall a b. (a -> b) -> a -> b
$ case (Message -> Either String GlobalPosition)
-> Maybe Message -> Maybe (Either String GlobalPosition)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Payload -> Either String GlobalPosition
forall value. FromJSON value => Payload -> Either String value
Message.parsePayload (Payload -> Either String GlobalPosition)
-> (Message -> Payload) -> Message -> Either String GlobalPosition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message -> Payload
Message.messagePayload) Maybe Message
maybeMessage of
Just (Right GlobalPosition
position) -> GlobalPosition
position
Maybe (Either String GlobalPosition)
_ -> GlobalPosition
0
save :: LastPositionSaved -> CurrentPosition -> IO (Maybe PositionSaved)
save :: GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
save GlobalPosition
lastPositionSaved GlobalPosition
currentPosition =
let interval :: Integer
interval = Natural -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural -> Integer) -> Natural -> Integer
forall a b. (a -> b) -> a -> b
$ PositionUpdateInterval -> Natural
coerce @_ @Natural PositionUpdateInterval
positionUpdateInterval
difference :: Integer
difference = GlobalPosition -> Integer
Message.globalPositionToInteger (GlobalPosition -> Integer) -> GlobalPosition -> Integer
forall a b. (a -> b) -> a -> b
$ GlobalPosition
currentPosition GlobalPosition -> GlobalPosition -> GlobalPosition
forall a. Num a => a -> a -> a
- GlobalPosition
lastPositionSaved
in if Integer
difference Integer -> Integer -> Bool
forall a. Ord a => a -> a -> Bool
< Integer
interval
then Maybe GlobalPosition -> IO (Maybe GlobalPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe GlobalPosition
forall a. Maybe a
Nothing
else do
(Connection -> IO ()) -> IO ()
WithConnection
withConnection ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
connection ->
Connection -> GlobalPosition -> IO ()
savePosition Connection
connection GlobalPosition
currentPosition
Maybe GlobalPosition -> IO (Maybe GlobalPosition)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe GlobalPosition -> IO (Maybe GlobalPosition))
-> Maybe GlobalPosition -> IO (Maybe GlobalPosition)
forall a b. (a -> b) -> a -> b
$ GlobalPosition -> Maybe GlobalPosition
forall a. a -> Maybe a
Just GlobalPosition
currentPosition
in PositionStrategy :: IO GlobalPosition
-> (GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition))
-> PositionStrategy
PositionStrategy{IO GlobalPosition
GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
save :: GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
restore :: IO GlobalPosition
save :: GlobalPosition -> GlobalPosition -> IO (Maybe GlobalPosition)
restore :: IO GlobalPosition
..}