{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE UndecidableInstances #-} {-# OPTIONS_GHC -Wmissing-deriving-strategies #-} {- | Description: Garbage collected event folding CRDT. This module provides a CRDT data structure that collects and applies operations (called "events") that mutate an underlying data structure (like folding). In addition to mutating the underlying data, each operation can also produce an output that can be obtained by the client. The output can be either totally consistent across all replicas (which is slower), or it can be returned immediately and possibly reflect an inconsistent state. -} module Data.CRDT.EventFold ( -- * Basic API -- ** Creating new CRDTs. new, -- ** Adding new events. event, -- ** Coordinating replica updates. {- | Functions in this section are used to help merge foreign copies of the CRDT, and transmit our own copy. (This library does not provide any kind of transport support, except that all the relevant types have 'Binary' instances. Actually arranging for these things to get shipped across a wire is left to the user.) In principal, the only two functions you need are 'fullMerge' and 'acknowledge'. You can ship the full 'EventFold' value to a remote participant and it can incorporate any changes using 'fullMerge', and vice versa. You can receive an 'EventFold' value from another participant and incorporate its changes locally using 'fullMerge'. You can then acknowledge the incorporation using 'acknowledge'. However, if your underlying data structure is large, it may be more efficient to just ship a sort of diff containing the information that the local participant thinks the remote participant might be missing. That is what 'events', 'mergeMaybe', and 'mergeEither' are for. Calling 'acknowledge' is important because that is the magic that allows CRDT garbage collection to happen. "CRDT garbage collection" means we don't store an infinite series of events that always grows and never shrinks. We only store the outstanding events that we can't prove have been seen by every participant. Events that we /can/ prove have been seen by every participant are applied to the infimum (a.k.a. "base value") and the event itself is discarded. -} fullMerge, acknowledge, events, mergeMaybe, mergeEither, MergeError(..), -- ** Participation. participate, disassociate, -- ** Defining your state and events. Event(..), EventResult(..), -- * Inspecting the 'EventFold'. isBlockedOnError, projectedValue, infimumValue, infimumId, infimumParticipants, allParticipants, projParticipants, origin, divergent, -- * Underlying Types EventFoldF, EventFold, EventId, EventPack, ) where import Data.Bifunctor (first) import Data.Binary (Binary(get, put)) import Data.Default.Class (Default(def)) import Data.DoubleWord (Word128(Word128), Word256(Word256)) import Data.Functor.Identity (Identity(Identity), runIdentity) import Data.Map (Map, keys, toAscList, toDescList, unionWith) import Data.Maybe (catMaybes) import Data.Set ((\\), Set, member, union) import Data.Word (Word64) import GHC.Generics (Generic) import qualified Data.Map as Map import qualified Data.Map.Merge.Lazy as Map.Merge import qualified Data.Set as Set {- | This represents a replicated data structure into which participants can add 'Event's that are folded into a base 'State'. You can also think of the "events" as operations that mutate the base state, and the point of this CRDT is to coordinate the application of the operations across all participants so that they are applied consistently even if the operations themselves are not commutative, idempotent, or monotonic. Those properties to the CRDT by the way in which it manages the events, and it is therefore unnecessary that the events themselves have them. Variables are: - @o@ - Origin - @p@ - Participant - @e@ - Event - @f@ - The Monad in which the events live The "Origin" is a value that is more or less meant to identify the "thing" being replicated, and in particular identify the historical lineage of the 'EventFold'. The idea is that it is meaningless to try and merge two 'EventFold's that do not share a common history (identified by the origin value) and doing so is a programming error. It is only used to try and check for this type of programming error and throw an exception if it happens instead of producing undefined (and difficult to detect) behavior. -} data EventFoldF o p e f = EventFold { psOrigin :: o, psInfimum :: Infimum (State e) p, psEvents :: Map (EventId p) (f (Delta p e), Set p) } deriving stock (Generic) deriving stock instance ( Eq (f (Delta p e)) , Eq (Output e) , Eq o , Eq p , Eq e ) => Eq (EventFoldF o p e f) instance ( Binary (f (Delta p e)), Binary o, Binary p, Binary e, Binary (State e), Binary (Output e) ) => Binary (EventFoldF o p e f) deriving stock instance ( Show (f (Delta p e)) , Show o , Show p , Show (State e) ) => Show (EventFoldF o p e f) type EventFold o p e = EventFoldF o p e Identity {- | `Infimum` is the infimum, or greatest lower bound, of the possible values of @s@. -} data Infimum s p = Infimum { eventId :: EventId p, participants :: Set p, stateValue :: s } deriving stock (Generic, Show) instance (Binary s, Binary p) => Binary (Infimum s p) instance (Eq p) => Eq (Infimum s p) where Infimum s1 _ _ == Infimum s2 _ _ = s1 == s2 instance (Ord p) => Ord (Infimum s p) where compare (Infimum s1 _ _) (Infimum s2 _ _) = compare s1 s2 {- | `EventId` is a monotonically increasing, totally ordered identification value which allows us to lend the attribute of monotonicity to event application operations which would not naturally be monotonic. -} data EventId p = BottomEid | Eid Word256 p deriving stock (Generic, Eq, Ord, Show) instance (Binary p) => Binary (EventId p) where put = put . toMaybe where toMaybe :: EventId p -> Maybe (Word64, Word64, Word64, Word64, p) toMaybe BottomEid = Nothing toMaybe (Eid (Word256 (Word128 a b) (Word128 c d)) p) = Just (a, b, c, d, p) get = do theThing <- get return $ case theThing of Nothing -> BottomEid Just (a, b, c, d, p) -> Eid (Word256 (Word128 a b) (Word128 c d)) p instance Default (EventId p) where def = BottomEid {- | This is the exception type for illegal merges. These errors indicate a serious programming bugs. -} data MergeError o p e = DifferentOrigins o o {- ^ The 'EventFold's do not have the same origin. It makes no sense to merge 'EventFold's that have different origins because they do not share a common history. -} | EventPackTooNew (EventFold o p e) (EventPack o p e) {- ^ The 'EventPack''s infimum is greater than any event known to 'EventFold' into which it is being merged. This should be impossible and indicates that either the local 'EventFold' has rolled back an event that it had previously acknowledged, or else the source of the 'EventPack' moved the infimum forward without a full acknowledgement from all participants. Both of these conditions should be regarded as serious bugs. -} | EventPackTooSparse (EventFold o p e) (EventPack o p e) {- ^ The 'EventPack' assumes we know about events that we do not in fact know about. This is only possible if we rolled back our copy of the state somehow and "forgot" about state that we had previous acknowledged, or else some other participant erroneously acknowledged some events on our behalf. -} deriving stock instance ( Show (Output e) , Show o , Show p , Show e , Show (State e) ) => Show (MergeError o p e) {- | `Delta` is how we represent mutations to the event fold state. -} data Delta p e = Join p | UnJoin p | Event e | Error (Output e) (Set p) deriving stock (Generic) deriving stock instance (Eq p, Eq e, Eq (Output e)) => Eq (Delta p e) deriving stock instance (Show p, Show e, Show (Output e)) => Show (Delta p e) instance (Binary p, Binary e, Binary (Output e)) => Binary (Delta p e) {- | Instances of this class define the particular "events" being "folded" over in a distributed fashion. In addition to the event type itself, there are a couple of type families which define the 'State' into which folded events are accumulated, and the 'Output' which application of a particular event can generate. TL;DR: This is how users define their own custom operations. -} class Event e where type Output e type State e {- | Apply an event to a state value. **This function MUST be total!!!** -} apply :: e -> State e -> EventResult e {- | The most trivial event type. -} instance Event () where type Output () = () type State () = () apply () () = Pure () () {- | The union of two event types. -} instance (Event a, Event b) => Event (Either a b) where type Output (Either a b) = Either (Output a) (Output b) type State (Either a b) = (State a, State b) apply (Left e) (a, b) = case apply e a of SystemError o -> SystemError (Left o) Pure o s -> Pure (Left o) (s, b) apply (Right e) (a, b) = case apply e b of SystemError o -> SystemError (Right o) Pure o s -> Pure (Right o) (a, s) {- | The result of applying an event. Morally speaking, events are always pure functions. However, mundane issues like finite memory constraints and finite execution time can cause referentially opaque behavior. In a normal Haskell program, this usually leads to a crash or an exception, and the crash or exception can itself, in a way, be thought of as being referentially transparent, because there is no way for it to both happen and, simultaneously, not happen. However, in our case we are replicating computations across many different pieces of hardware, so there most definitely is a way for these aberrant system failures to both happen and not happen simultaneously. What happens if the computation of the event runs out of memory on one machine, but not on another? There exists a strategy for dealing with these problems: if the computation of an event experiences a failure on every participant, then the event is pushed into the infimum as a failure (i.e. a no-op), but if any single participant successfully computes the event then all other participants can (somehow) request a "Full Merge" from the successful participant. The Full Merge will include the infimum __value__ computed by the successful participant, which will include the successful application of the problematic event. The error participants can thus bypass computation of the problem event altogether, and can simply overwrite their infimum with the infimum provided by the Full Merge. Doing a full merge can be much more expensive than doing a simple 'EventPack' merge, because it requires transmitting the full value of the 'EventFold' instead of just the outstanding operations. This type represents how computation of the event finished; with either a pure result, or some kind of system error. In general 'SystemError' is probably only ever useful for when your event type somehow executes untrusted code (for instance when your event type is a Turing-complete DSL that allows users to submit their own custom-programmed "events") and you want to limit the resources that can be consumed by such user-generated code. It is much less useful when you are encoding some well defined business logic directly in Haskell. -} data EventResult e = SystemError (Output e) | Pure (Output e) (State e) {- | Construct a new 'EventFold' with the given origin and initial participant. -} new :: (Default (State e), Ord p) => o {- ^ The "origin", iditifying the historical lineage of this CRDT. -} -> p {- ^ The initial participant. -} -> EventFold o p e new o participant = EventFold { psOrigin = o, psInfimum = Infimum { eventId = def, participants = Set.singleton participant, stateValue = def }, psEvents = mempty } {- | Get the outstanding events that need to be propagated to a particular participant. -} events :: (Ord p) => p -> EventFold o p e -> EventPack o p e events peer ps = EventPack { epEvents = omitAcknowledged <$> psEvents ps, epOrigin = psOrigin ps, epInfimum = eventId (psInfimum ps) } where {- | Don't send the event data to participants which have already acknowledged it, saving network and cpu resources. -} omitAcknowledged (d, acks) = ( case (d, peer `member` acks) of (Identity Error {}, _) -> Just (runIdentity d) (_, False) -> Just (runIdentity d) _ -> Nothing, acks ) {- | A package containing events that can be merged into an event fold. -} data EventPack o p e = EventPack { epEvents :: Map (EventId p) (Maybe (Delta p e), Set p), epOrigin :: o, epInfimum :: EventId p } deriving stock (Generic) deriving stock instance ( Show o, Show p, Show e, Show (Output e) ) => Show (EventPack o p e) instance ( Binary o, Binary p, Binary e, Binary (Output e) ) => Binary (EventPack o p e) {- | Monotonically merge the information in two 'EventFold's. The resulting 'EventFold' may have a higher infimum value, but it will never have a lower one. Only 'EventFold's that originated from the same 'new' call can be merged. If the origins are mismatched, then 'Nothing' is returned. Returns the new 'EventFold' value, along with the output for all of the events that can now be considered "fully consistent". -} mergeMaybe :: (Eq o, Event e, Ord p) => EventFold o p e -> EventPack o p e -> Maybe (EventFold o p e, Map (EventId p) (Output e)) mergeMaybe ps es = either (const Nothing) Just (mergeEither ps es) {- | Like `mergeMaybe`, but returns an error indicating exactly what went wrong. -} mergeEither :: (Eq o, Event e, Ord p) => EventFold o p e -> EventPack o p e -> Either (MergeError o p e) (EventFold o p e, Map (EventId p) (Output e)) mergeEither EventFold {psOrigin = o1} EventPack {epOrigin = o2} | o1 /= o2 = Left (DifferentOrigins o1 o2) mergeEither ps pak | tooNew = Left (EventPackTooNew ps pak) where maxState = maximum . Set.insert (eventId . psInfimum $ ps) . Map.keysSet . psEvents $ ps tooNew :: Bool tooNew = maxState < epInfimum pak mergeEither orig@(EventFold o infimum d1) ep@(EventPack d2 _ i2) = case reduce i2 EventFold { psOrigin = o, psInfimum = infimum, psEvents = Map.Merge.merge (Map.Merge.mapMissing (const (first Just))) Map.Merge.preserveMissing (Map.Merge.zipWithMatched (const mergeAcks)) (first runIdentity <$> d1) d2 } of Nothing -> Left (EventPackTooSparse orig ep) Just ps -> Right ps where mergeAcks :: (Ord p) => (Delta p e, Set p) -> (Maybe (Delta p e), Set p) -> (Maybe (Delta p e), Set p) mergeAcks (Error output eacks1, acks1) (Just (Error _ eacks2), acks2) = (Just (Error output (eacks1 `union` eacks2)), acks1 `union` acks2) mergeAcks (Error {}, acks1) (d, acks2) = (d, acks1 `union` acks2) mergeAcks (d, acks1) (Just _, acks2) = (Just d, acks1 `union` acks2) mergeAcks (d, acks1) (Nothing, acks2) = (Just d, acks1 `union` acks2) {- | Like 'mergeEither', but merge a full 'EventFold' instead of just an event pack. Returns the new 'EventFold' value, along with the output for all of the events that can now be considered "fully consistent". -} fullMerge :: (Eq o, Event e, Ord p) => EventFold o p e -> EventFold o p e -> Either (MergeError o p e) (EventFold o p e, Map (EventId p) (Output e)) fullMerge ps (EventFold o2 i2 d2) = mergeEither ps {psInfimum = max (psInfimum ps) i2} EventPack { epOrigin = o2, epEvents = first (Just . runIdentity) <$> d2, epInfimum = eventId i2 } {- | Record the fact that the participant acknowledges the information contained in the 'EventFold'. The implication is that the participant __must__ base all future operations on the result of this function. Returns the new 'EventFold' value, along with the output for all of the events that can now be considered "fully consistent". -} acknowledge :: (Event e, Ord p) => p -> EventFold o p e -> (EventFold o p e, Map (EventId p) (Output e)) acknowledge p ps = {- First do a normal reduction, then do a special acknowledgement of the reduction error, if any. -} let (ps2, outputs) = runIdentity $ reduce (eventId (psInfimum ps)) ps {psEvents = fmap ackOne (psEvents ps)} (ps3, outputs2) = ackErr p ps2 in (ps3, outputs <> outputs2) where ackOne (e, acks) = (e, Set.insert p acks) {- | Acknowledge the reduction error, if one exists. -} ackErr :: (Event e, Ord p) => p -> EventFold o p e -> (EventFold o p e, Map (EventId p) (Output e)) ackErr p ps = runIdentity $ reduce (eventId (psInfimum ps)) ps { psEvents = case Map.minViewWithKey (psEvents ps) of Just ((eid, (Identity (Error o eacks), acks)), deltas) -> Map.insert eid (Identity (Error o (Set.insert p eacks)), acks) deltas _ -> psEvents ps } {- | Allow a participant to join in the distributed nature of the 'EventFold'. Return the 'EventId' at which the participation is recorded, and the resulting 'EventFold'. The purpose of returning the state is so that it can use it to tell when the participation event has reached the infimum. -} participate :: (Ord p) => p -> p -> EventFold o p e -> (EventId p, EventFold o p e) participate self peer ps@EventFold {psEvents} = let eid = nextId self ps in ( eid, ps { psEvents = Map.insert eid (Identity (Join peer), mempty) psEvents } ) {- | Indicate that a participant is removing itself from participating in the distributed 'EventFold'. -} disassociate :: (Ord p) => p -> p -> EventFold o p e -> EventFold o p e disassociate self peer ps@EventFold {psEvents} = ps { psEvents = Map.insert (nextId self ps) (Identity (UnJoin peer), mempty) psEvents } {- | Introduce a change to the EventFold on behalf of the participant. Return the new 'EventFold', along with the projected output of the event, along with an 'EventId' which can be used to get the fully consistent event output at a later time. -} event :: (Ord p, Event e) => p -> e -> EventFold o p e -> (Output e, EventId p, EventFold o p e) event p e ps@EventFold {psEvents} = let eid = nextId p ps in ( case apply e (projectedValue ps) of Pure output _ -> output SystemError output -> output, eid, ps { psEvents = Map.insert eid (Identity (Event e), mempty) psEvents } ) {- | Return the current projected value of the 'EventFold'. -} projectedValue :: (Event e) => EventFold o p e -> State e projectedValue EventFold {psInfimum = Infimum {stateValue}, psEvents} = foldr (\ e s -> case apply e s of Pure _ newState -> newState SystemError _ -> s ) stateValue changes where changes = foldMap getDelta (toDescList psEvents) getDelta :: (EventId p, (Identity (Delta p e), Set p)) -> [e] getDelta (_, (Identity (Event e), _)) = [e] getDelta _ = mempty {- | Return the current infimum value of the 'EventFold'. -} infimumValue :: EventFoldF o p e f -> State e infimumValue EventFold {psInfimum = Infimum {stateValue}} = stateValue {- | Return the 'EventId' of the infimum value. -} infimumId :: EventFoldF o p e f -> EventId p infimumId = eventId . psInfimum {- | Gets the known participants at the infimum. -} infimumParticipants :: EventFoldF o p e f -> Set p infimumParticipants EventFold {psInfimum = Infimum {participants}} = participants {- | Get all known participants. This includes participants that are projected for removal. -} allParticipants :: (Ord p) => EventFold o p e -> Set p allParticipants EventFold { psInfimum = Infimum {participants}, psEvents } = foldr updateParticipants participants (toDescList psEvents) where updateParticipants :: (Ord p) => (EventId p, (Identity (Delta p e), Set p)) -> Set p -> Set p updateParticipants (_, (Identity (Join p), _)) = Set.insert p updateParticipants _ = id {- | Get all the projected participants. This does not include participants that are projected for removal. -} projParticipants :: (Ord p) => EventFold o p e -> Set p projParticipants EventFold { psInfimum = Infimum {participants}, psEvents } = foldr updateParticipants participants (toDescList psEvents) where updateParticipants :: (Ord p) => (EventId p, (Identity (Delta p e), Set p)) -> Set p -> Set p updateParticipants (_, (Identity (Join p), _)) = Set.insert p updateParticipants (_, (Identity (UnJoin p), _)) = Set.delete p updateParticipants _ = id {- | Returns the participants that we think might be diverging. In this context, a participant is "diverging" if there is an event that the participant has not acknowledged but we are expecting it to acknowledge. Along with the participant, return the last known `EventId` which that participant has acknowledged. -} divergent :: forall o p e. (Ord p) => EventFold o p e -> Map p (EventId p) divergent EventFold { psInfimum = Infimum {participants, eventId}, psEvents } = let (byParticipant, maxEid) = eidByParticipant in Map.filter (< maxEid) byParticipant where eidByParticipant :: (Map p (EventId p), EventId p) eidByParticipant = foldr accum (Map.fromList [(p, eventId) | p <- Set.toList participants], eventId) ( let flatten (a, (Identity b, c)) = (a, b, c) in (flatten <$> toAscList psEvents) ) accum :: (EventId p, Delta p e, Set p) -> (Map p (EventId p), EventId p) -> (Map p (EventId p), EventId p) accum (eid, Join p, acks) (acc, maxEid) = ( unionWith max (Map.insert p eid acc) (Map.fromList [(a, eid) | a <- Set.toList acks]), max maxEid eid ) accum (eid, _, acks) (acc, maxEid) = ( unionWith max acc (Map.fromList [(a, eid) | a <- Set.toList acks]), max maxEid eid ) {- | Return the origin value of the 'EventFold'. -} origin :: EventFoldF o p e f -> o origin = psOrigin {- | This helper function is responsible for figuring out if the 'EventFold' has enough information to derive a new infimum value. In other words, this is where garbage collection happens. -} reduce :: forall o p e f. ( Event e , Monad f , Ord p ) => EventId p {- ^ The infimum 'EventId' as known by some node in the cluster. "Some node" can be different than "this node" in the case where another node advanced the infimum before we did (because it knew about our acknowledgement, but we didn't know about its acknowledgement) and sent us an 'EventPack' with this value of the infimum. In this case, this infimum value acts as a universal acknowledgement of all events coming before it. -} -> EventFoldF o p e f -> f (EventFold o p e, Map (EventId p) (Output e)) reduce infState ps@EventFold { psInfimum = infimum@Infimum {participants, stateValue}, psEvents } = case Map.minViewWithKey psEvents of Nothing -> pure ( EventFold { psOrigin = psOrigin ps, psInfimum = psInfimum ps, psEvents = mempty }, mempty ) Just ((eid, (getUpdate, acks)), newDeltas) | eid <= eventId infimum -> {- The event is obsolete. Ignore it. -} reduce infState ps { psEvents = newDeltas } | isRenegade eid -> {- This is a renegade event. Ignore it. -} reduce infState ps { psEvents = newDeltas } | otherwise -> do implicitAcks <- unjoins eid update <- getUpdate let {- | Join events must be acknowledged by the joining participant before moving into the infimum. -} joining = case update of Join p -> Set.singleton p _ -> mempty if Set.null (((participants `union` joining) \\ acks) \\ implicitAcks) || eid <= infState then case update of Join p -> reduce infState ps { psInfimum = infimum { eventId = eid, participants = Set.insert p participants }, psEvents = newDeltas } UnJoin p -> reduce infState ps { psInfimum = infimum { eventId = eid, participants = Set.delete p participants }, psEvents = newDeltas } Error output eacks | Set.null (participants \\ eacks) -> do (ps2, outputs) <- reduce infState ps { psInfimum = infimum { eventId = eid } } pure (ps2, Map.insert eid output outputs) | otherwise -> do events_ <- runEvents psEvents pure ( EventFold { psOrigin = psOrigin ps, psInfimum = psInfimum ps, psEvents = events_ }, mempty ) Event e -> case apply e stateValue of SystemError output -> do events_ <- runEvents newDeltas pure ( EventFold { psOrigin = psOrigin ps, psInfimum = infimum, psEvents = Map.insert eid (Identity (Error output mempty), acks) events_ }, mempty ) Pure output newState -> do (ps2, outputs) <- reduce infState ps { psInfimum = infimum { eventId = eid, stateValue = newState }, psEvents = newDeltas } pure (ps2, Map.insert eid output outputs) else do events_ <- runEvents psEvents pure ( EventFold { psOrigin = psOrigin ps, psInfimum = psInfimum ps, psEvents = events_ }, mempty ) where {- | Unwrap the events from their monad. -} runEvents :: Map (EventId p) (f (Delta p e), Set p) -> f (Map (EventId p) (Identity (Delta p e), Set p)) runEvents events_ = Map.fromList <$> sequence [ do d <- fd pure (eid, (Identity d, acks)) | (eid, (fd, acks)) <- Map.toList events_ ] {- | Figure out which nodes have upcoming unjoins. -} unjoins :: EventId p {- ^ The even under consideration, unjoins only after which we are interested. -} -> f (Set p) unjoins eid = Set.fromList . Map.elems . Map.filterWithKey (\k _ -> eid <= k) <$> unjoinMap {- | The static map of unjoins. -} unjoinMap :: f (Map (EventId p) p) unjoinMap = Map.fromList . catMaybes <$> sequence [ update >>= \case UnJoin p -> pure (Just (eid, p)) _ -> pure Nothing | (eid, (update, _acks)) <- Map.toList psEvents ] {- | Renegade events are events that originate from a non-participating peer. This might happen in a network partition situation, where the cluster ejected a peer that later reappears on the network, broadcasting updates. -} isRenegade BottomEid = False isRenegade (Eid _ p) = not (p `member` participants) {- | A utility function that constructs the next `EventId` on behalf of a participant. -} nextId :: (Ord p) => p -> EventFoldF o p e f -> EventId p nextId p EventFold {psInfimum = Infimum {eventId}, psEvents} = case maximum (eventId:keys psEvents) of BottomEid -> Eid 0 p Eid ord _ -> Eid (succ ord) p {- | Return 'True' if progress on the 'EventFold' is blocked on a 'SystemError'. -} isBlockedOnError :: EventFold o p e -> Bool isBlockedOnError ps = case Map.minView (psEvents ps) of Just ((Identity (Error _ _), _), _) -> True _ -> False