{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
module Box.Connectors
( fuse_,
fuseSTM_,
fuse,
fuseSTM,
forkEmit,
feedback,
feedbackE,
fuseEmit,
fuseEmitM,
fuseCommit,
fuseCommitM,
emerge,
emergeM,
splitCommit,
splitCommitSTM,
contCommit,
)
where
import Prelude
import Box.Box
import Box.Committer
import Box.Cont
import Box.Emitter
import Box.Queue
import Control.Concurrent.Classy.Async as C
import Control.Monad
import Control.Monad.Conc.Class as C
fuse_ :: (Monad m) => Emitter m a -> Committer m a -> m ()
fuse_ e c = go
where
go = do
a <- emit e
c' <- maybe (pure False) (commit c) a
when c' go
fuseSTM_ :: (MonadConc m) => Emitter (STM m) a -> Committer (STM m) a -> m ()
fuseSTM_ e c = go
where
go = do
b <-
C.atomically $ do
a <- emit e
maybe (pure False) (commit c) a
when b go
fuse :: (Monad m) => (a -> m (Maybe b)) -> Cont m (Box m b a) -> m ()
fuse f box = with box $ \(Box c e) -> fuse_ (emap f e) c
fuseSTM :: (MonadConc m) => (a -> (STM m) (Maybe b)) -> Cont m (Box (STM m) b a) -> m ()
fuseSTM f box = with box $ \(Box c e) -> fuseSTM_ (emap f e) c
forkEmit :: (Monad m) => Emitter m a -> Committer m a -> Emitter m a
forkEmit e c =
Emitter $ do
a <- emit e
maybe (pure ()) (void <$> commit c) a
pure a
fuseCommit :: (MonadConc m) => Committer (STM m) a -> Cont m (Committer (STM m) a)
fuseCommit c = Cont $ \caction -> queueC' caction (`fuseSTM_` c)
fuseCommitM :: (MonadConc m) => Committer m a -> Cont m (Committer m a)
fuseCommitM c = Cont $ \caction -> queueCM' caction (`fuse_` c)
fuseEmit :: (MonadConc m) => Emitter (STM m) a -> Cont m (Emitter (STM m) a)
fuseEmit e = Cont $ \eaction -> queueE' (fuseSTM_ e) eaction
fuseEmitM :: (MonadConc m) => Emitter m a -> Cont m (Emitter m a)
fuseEmitM e = Cont $ \eaction -> queueEM' (fuse_ e) eaction
emerge ::
(MonadConc m) =>
Cont m (Emitter (STM m) a, Emitter (STM m) a) ->
Cont m (Emitter (STM m) a)
emerge e =
Cont $ \eaction ->
with e $ \e' ->
fst
<$> C.concurrently
(queueE' (fuseSTM_ (fst e')) eaction)
(queueE' (fuseSTM_ (snd e')) eaction)
emergeM ::
(MonadConc m) =>
Cont m (Emitter m a, Emitter m a) ->
Cont m (Emitter m a)
emergeM e =
Cont $ \eaction ->
with e $ \e' ->
fst
<$> C.concurrently
(queueEM' (fuse_ (fst e')) eaction)
(queueEM' (fuse_ (snd e')) eaction)
splitCommitSTM ::
(MonadConc m) =>
Cont m (Committer (STM m) a) ->
Cont m (Either (Committer (STM m) a) (Committer (STM m) a))
splitCommitSTM c =
Cont $ \kk ->
with c $ \c' ->
concurrentlyLeft
(queueC' (kk . Left) (`fuseSTM_` c'))
(queueC' (kk . Right) (`fuseSTM_` c'))
splitCommit ::
(MonadConc m) =>
Cont m (Committer m a) ->
Cont m (Either (Committer m a) (Committer m a))
splitCommit c =
Cont $ \kk ->
with c $ \c' ->
concurrentlyLeft
(queueCM' (kk . Left) (`fuse_` c'))
(queueCM' (kk . Right) (`fuse_` c'))
contCommit :: Either (Committer m a) (Committer m b) -> (Committer m a -> Committer m b) -> Committer m b
contCommit ec f =
Committer $ \a ->
case ec of
Left lc -> commit (f lc) a
Right rc -> commit rc a
feedback ::
(MonadConc m) =>
(a -> m (Maybe b)) ->
Cont m (Box m b a) ->
Cont m (Box m b a)
feedback f box =
Cont $ \bio ->
with box $ \(Box c e) -> do
fuse_ (emap f e) c
bio (Box c e)
feedbackE ::
(MonadConc m) =>
(a -> m (Maybe a)) ->
Emitter m a ->
Cont m (Emitter m a)
feedbackE f e =
emergeM ((,) <$> pure e <*> fuseEmitM (emap f e))