module Box.Connectors
( qList,
qListWith,
popList,
pushList,
pushListN,
sink,
sinkWith,
source,
sourceWith,
forkEmit,
bufferCommitter,
bufferEmitter,
concurrentE,
concurrentC,
takeQ,
evalEmitter,
evalEmitterWith,
)
where
import Box.Box
import Box.Codensity
import Box.Committer
import Box.Emitter
import Box.Functor
import Box.Queue
import Control.Concurrent.Async
import Control.Monad
import Control.Monad.State.Lazy
import Data.Foldable
import Data.Functor
import Data.Sequence qualified as Seq
import Prelude
qList :: [a] -> CoEmitter IO a
qList :: forall a. [a] -> CoEmitter IO a
qList [a]
xs = forall a. Queue a -> [a] -> CoEmitter IO a
qListWith forall a. Queue a
Unbounded [a]
xs
qListWith :: Queue a -> [a] -> CoEmitter IO a
qListWith :: forall a. Queue a -> [a] -> CoEmitter IO a
qListWith Queue a
q [a]
xs = forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q (\Committer IO a
c -> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t :: * -> *). Foldable t => t Bool -> Bool
and (forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer IO a
c) [a]
xs))
popList :: (Monad m) => [a] -> Committer m a -> m ()
popList :: forall (m :: * -> *) a. Monad m => [a] -> Committer m a -> m ()
popList [a]
xs Committer m a
c = forall (m :: * -> *) s a.
Monad m =>
s -> Committer m a -> Emitter (StateT s m) a -> m ()
glueES (forall a. [a] -> Seq a
Seq.fromList [a]
xs) Committer m a
c forall (m :: * -> *) a. Monad m => Emitter (StateT (Seq a) m) a
pop
pushList :: (Monad m) => Emitter m a -> m [a]
pushList :: forall (m :: * -> *) a. Monad m => Emitter m a -> m [a]
pushList Emitter m a
e = forall (t :: * -> *) a. Foldable t => t a -> [a]
toList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT forall a. Seq a
Seq.empty (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
push (forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))
pushListN :: (Monad m) => Int -> Emitter m a -> m [a]
pushListN :: forall (m :: * -> *) a. Monad m => Int -> Emitter m a -> m [a]
pushListN Int
n Emitter m a
e = forall (t :: * -> *) a. Foldable t => t a -> [a]
toList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT forall a. Seq a
Seq.empty (forall (m :: * -> *) a.
Monad m =>
Int -> Committer m a -> Emitter m a -> m ()
glueN Int
n forall (m :: * -> *) a. Monad m => Committer (StateT (Seq a) m) a
push (forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Emitter m a
e))
sink1 :: (Monad m) => (a -> m ()) -> Emitter m a -> m ()
sink1 :: forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> Emitter m a -> m ()
sink1 a -> m ()
f Emitter m a
e = do
Maybe a
a <- forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe a
a a -> m ()
f
sink :: Int -> (a -> IO ()) -> CoCommitter IO a
sink :: forall a. Int -> (a -> IO ()) -> CoCommitter IO a
sink Int
n a -> IO ()
f = forall a. Queue a -> Int -> (a -> IO ()) -> CoCommitter IO a
sinkWith forall a. Queue a
Unbounded Int
n a -> IO ()
f
sinkWith :: Queue a -> Int -> (a -> IO ()) -> CoCommitter IO a
sinkWith :: forall a. Queue a -> Int -> (a -> IO ()) -> CoCommitter IO a
sinkWith Queue a
q Int
n a -> IO ()
f = forall a r. Queue a -> (Emitter IO a -> IO r) -> CoCommitter IO a
commitQ Queue a
q forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> Emitter m a -> m ()
sink1 a -> IO ()
f
source1 :: (Monad m) => m a -> Committer m a -> m ()
source1 :: forall (m :: * -> *) a. Monad m => m a -> Committer m a -> m ()
source1 m a
a Committer m a
c = do
a
a' <- m a
a
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c a
a'
source :: Int -> IO a -> CoEmitter IO a
source :: forall a. Int -> IO a -> CoEmitter IO a
source Int
n IO a
f = forall a. Queue a -> Int -> IO a -> CoEmitter IO a
sourceWith forall a. Queue a
Unbounded Int
n IO a
f
sourceWith :: Queue a -> Int -> IO a -> CoEmitter IO a
sourceWith :: forall a. Queue a -> Int -> IO a -> CoEmitter IO a
sourceWith Queue a
q Int
n IO a
f = forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m a -> Committer m a -> m ()
source1 IO a
f
forkEmit :: (Monad m) => Emitter m a -> Committer m a -> Emitter m a
forkEmit :: forall (m :: * -> *) a.
Monad m =>
Emitter m a -> Committer m a -> Emitter m a
forkEmit Emitter m a
e Committer m a
c =
forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter forall a b. (a -> b) -> a -> b
$ do
Maybe a
a <- forall (m :: * -> *) a. Emitter m a -> m (Maybe a)
emit Emitter m a
e
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (forall (f :: * -> *) a. Functor f => f a -> f ()
void forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer m a
c) Maybe a
a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
a
bufferCommitter :: Committer IO a -> CoCommitter IO a
bufferCommitter :: forall a. Committer IO a -> CoCommitter IO a
bufferCommitter Committer IO a
c = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ \Committer IO a -> IO b
caction -> forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL forall a. Queue a
Unbounded Committer IO a -> IO b
caction (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer IO a
c)
bufferEmitter :: Emitter IO a -> CoEmitter IO a
bufferEmitter :: forall a. Emitter IO a -> CoEmitter IO a
bufferEmitter Emitter IO a
e = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ \Emitter IO a -> IO b
eaction -> forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO r
queueR forall a. Queue a
Unbounded (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter IO a
e) Emitter IO a -> IO b
eaction
concurrentE ::
Queue a ->
Emitter IO a ->
Emitter IO a ->
CoEmitter IO a
concurrentE :: forall a. Queue a -> Emitter IO a -> Emitter IO a -> CoEmitter IO a
concurrentE Queue a
q Emitter IO a
e Emitter IO a
e' =
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ \Emitter IO a -> IO b
eaction -> forall a b. (a, b) -> b
snd forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b. IO a -> IO b -> IO (a, b)
concurrently (forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO (l, r)
queue Queue a
q (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter IO a
e) Emitter IO a -> IO b
eaction) (forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO (l, r)
queue Queue a
q (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
`glue` Emitter IO a
e') Emitter IO a -> IO b
eaction)
concurrentC :: Queue a -> Committer IO a -> Committer IO a -> CoCommitter IO a
concurrentC :: forall a.
Queue a -> Committer IO a -> Committer IO a -> CoCommitter IO a
concurrentC Queue a
q Committer IO a
c Committer IO a
c' = forall a.
Either (Committer IO a) (Committer IO a) -> Committer IO a
mergeC forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
Queue a
-> Committer IO a
-> Committer IO a
-> Codensity IO (Either (Committer IO a) (Committer IO a))
eitherC Queue a
q Committer IO a
c Committer IO a
c'
eitherC ::
Queue a ->
Committer IO a ->
Committer IO a ->
Codensity IO (Either (Committer IO a) (Committer IO a))
eitherC :: forall a.
Queue a
-> Committer IO a
-> Committer IO a
-> Codensity IO (Either (Committer IO a) (Committer IO a))
eitherC Queue a
q Committer IO a
cl Committer IO a
cr =
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$
\Either (Committer IO a) (Committer IO a) -> IO b
kk ->
forall a b. (a, b) -> a
fst
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b. IO a -> IO b -> IO (a, b)
concurrently
(forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL Queue a
q (Either (Committer IO a) (Committer IO a) -> IO b
kk forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left) (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer IO a
cl))
(forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL Queue a
q (Either (Committer IO a) (Committer IO a) -> IO b
kk forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right) (forall (m :: * -> *) a.
Monad m =>
Committer m a -> Emitter m a -> m ()
glue Committer IO a
cr))
mergeC :: Either (Committer IO a) (Committer IO a) -> Committer IO a
mergeC :: forall a.
Either (Committer IO a) (Committer IO a) -> Committer IO a
mergeC Either (Committer IO a) (Committer IO a)
ec =
forall (m :: * -> *) a. (a -> m Bool) -> Committer m a
Committer forall a b. (a -> b) -> a -> b
$ \a
a ->
case Either (Committer IO a) (Committer IO a)
ec of
Left Committer IO a
lc -> forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer IO a
lc a
a
Right Committer IO a
rc -> forall (m :: * -> *) a. Committer m a -> a -> m Bool
commit Committer IO a
rc a
a
takeQ :: Queue a -> Int -> Emitter IO a -> CoEmitter IO a
takeQ :: forall a. Queue a -> Int -> Emitter IO a -> CoEmitter IO a
takeQ Queue a
q Int
n Emitter IO a
e = forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q forall a b. (a -> b) -> a -> b
$ \Committer IO a
c -> forall (m :: * -> *) s a.
Monad m =>
s -> Committer m a -> Emitter (StateT s m) a -> m ()
glueES Int
0 Committer IO a
c (forall (m :: * -> *) a.
Monad m =>
Int -> Emitter m a -> Emitter (StateT Int m) a
takeE Int
n Emitter IO a
e)
evalEmitter :: s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitter :: forall s a. s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitter s
s Emitter (StateT s IO) a
e = forall a s.
Queue a -> s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitterWith forall a. Queue a
Unbounded s
s Emitter (StateT s IO) a
e
evalEmitterWith :: Queue a -> s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitterWith :: forall a s.
Queue a -> s -> Emitter (StateT s IO) a -> CoEmitter IO a
evalEmitterWith Queue a
q s
s Emitter (StateT s IO) a
e = forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q forall a b. (a -> b) -> a -> b
$ \Committer IO a
c -> forall (m :: * -> *) s a.
Monad m =>
s -> Committer m a -> Emitter (StateT s m) a -> m ()
glueES s
s Committer IO a
c Emitter (StateT s IO) a
e