{-# OPTIONS_GHC -Wno-missing-fields #-}
{-# OPTIONS_GHC -Wno-incomplete-patterns #-}
module Z.IO.BIO.Concurrent where
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import qualified Data.Sequence as Seq
import Data.Sequence (Seq((:<|),(:|>)))
import GHC.Natural
import Z.IO.BIO
import Z.Data.PrimRef
import Z.IO.Exception
zipBIO :: BIO a b -> BIO a c -> BIO a (b,c)
zipBIO :: BIO a b -> BIO a c -> BIO a (b, c)
zipBIO BIO a b
b1 BIO a c
b2 = \ Maybe (b, c) -> IO ()
k Maybe a
mx -> do
TVar Bool
bEOF <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
TVar Bool
cEOF <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
TVar (Seq b)
bBuf <- Seq b -> IO (TVar (Seq b))
forall a. a -> IO (TVar a)
newTVarIO Seq b
forall a. Seq a
Seq.empty
TVar (Seq c)
cBuf <- Seq c -> IO (TVar (Seq c))
forall a. a -> IO (TVar a)
newTVarIO Seq c
forall a. Seq a
Seq.empty
ThreadId
_ <- IO () -> IO ThreadId
forkIO (BIO a b
b1 (TVar (Seq b) -> TVar Bool -> Maybe b -> IO ()
forall a. TVar (Seq a) -> TVar Bool -> Maybe a -> IO ()
f TVar (Seq b)
bBuf TVar Bool
bEOF) Maybe a
mx)
ThreadId
_ <- IO () -> IO ThreadId
forkIO (BIO a c
b2 (TVar (Seq c) -> TVar Bool -> Maybe c -> IO ()
forall a. TVar (Seq a) -> TVar Bool -> Maybe a -> IO ()
f TVar (Seq c)
cBuf TVar Bool
cEOF) Maybe a
mx)
(Maybe (b, c) -> IO ())
-> TVar (Seq b) -> TVar (Seq c) -> TVar Bool -> TVar Bool -> IO ()
forall a b a.
(Maybe (a, b) -> IO a)
-> TVar (Seq a) -> TVar (Seq b) -> TVar Bool -> TVar Bool -> IO a
loop Maybe (b, c) -> IO ()
k TVar (Seq b)
bBuf TVar (Seq c)
cBuf TVar Bool
bEOF TVar Bool
cEOF
where
f :: TVar (Seq a) -> TVar Bool -> Maybe a -> IO ()
f TVar (Seq a)
xBuf TVar Bool
xEOF = \ Maybe a
mx ->
case Maybe a
mx of
Just a
x -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Seq a) -> (Seq a -> Seq a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Seq a)
xBuf (Seq a -> a -> Seq a
forall a. Seq a -> a -> Seq a
:|> a
x)
Maybe a
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
xEOF Bool
True
loop :: (Maybe (a, b) -> IO a)
-> TVar (Seq a) -> TVar (Seq b) -> TVar Bool -> TVar Bool -> IO a
loop Maybe (a, b) -> IO a
k TVar (Seq a)
bBuf TVar (Seq b)
cBuf TVar Bool
bEOF TVar Bool
cEOF = IO (IO a) -> IO a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO a) -> IO a)
-> (STM (IO a) -> IO (IO a)) -> STM (IO a) -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (IO a) -> IO (IO a)
forall a. STM a -> IO a
atomically (STM (IO a) -> IO a) -> STM (IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ do
Seq a
bs <- TVar (Seq a) -> STM (Seq a)
forall a. TVar a -> STM a
readTVar TVar (Seq a)
bBuf
Seq b
cs <- TVar (Seq b) -> STM (Seq b)
forall a. TVar a -> STM a
readTVar TVar (Seq b)
cBuf
Bool
beof <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
bEOF
Bool
ceof <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
cEOF
case Seq a
bs of
a
b :<| Seq a
bs' -> case Seq b
cs of
b
c :<| Seq b
cs' -> do
TVar (Seq a) -> Seq a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq a)
bBuf Seq a
bs'
TVar (Seq b) -> Seq b -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Seq b)
cBuf Seq b
cs'
IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, b) -> IO a
k ((a, b) -> Maybe (a, b)
forall a. a -> Maybe a
Just (a
b, b
c)) IO a -> IO a -> IO a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Maybe (a, b) -> IO a)
-> TVar (Seq a) -> TVar (Seq b) -> TVar Bool -> TVar Bool -> IO a
loop Maybe (a, b) -> IO a
k TVar (Seq a)
bBuf TVar (Seq b)
cBuf TVar Bool
bEOF TVar Bool
cEOF)
Seq b
_ -> if Bool
ceof then IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, b) -> IO a
k Maybe (a, b)
forall a. Maybe a
EOF) else STM (IO a)
forall a. STM a
retry
Seq a
_ -> if Bool
beof then IO a -> STM (IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, b) -> IO a
k Maybe (a, b)
forall a. Maybe a
EOF) else STM (IO a)
forall a. STM a
retry
newTQueueNode :: Int
-> IO (Sink a, Source a)
newTQueueNode :: Int -> IO (Sink a, Source a)
newTQueueNode Int
n = do
TQueue (Maybe a)
q <- IO (TQueue (Maybe a))
forall a. IO (TQueue a)
newTQueueIO
Counter
ec <- Int -> IO Counter
newCounter Int
0
(Sink a, Source a) -> IO (Sink a, Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return
( \ Maybe Void -> IO ()
k Maybe a
mx -> case Maybe a
mx of
Just a
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> Maybe a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Maybe a)
q Maybe a
mx)
Maybe a
_ -> do
Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> Maybe a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Maybe a)
q Maybe a
forall a. Maybe a
EOF)
Maybe Void -> IO ()
k Maybe Void
forall a. Maybe a
EOF
, \ Maybe a -> IO ()
k Maybe Void
_ ->
let loop :: IO ()
loop = ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
uninterruptibleMask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ forall a. IO a -> IO a
restore -> do
Maybe a
x <- IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> STM (Maybe a)
forall a. TQueue a -> STM a
readTQueue TQueue (Maybe a)
q)
case Maybe a
x of Just a
_ -> Maybe a -> IO ()
k Maybe a
x IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
Maybe a
_ -> do STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> Maybe a -> STM ()
forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue (Maybe a)
q Maybe a
forall a. Maybe a
EOF)
Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF
in IO ()
loop)
newTBQueueNode :: Int
-> Natural
-> IO (Sink a, Source a)
newTBQueueNode :: Int -> Natural -> IO (Sink a, Source a)
newTBQueueNode Int
n Natural
bound = do
TBQueue (Maybe a)
q <- Natural -> IO (TBQueue (Maybe a))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bound
Counter
ec <- Int -> IO Counter
newCounter Int
0
(Sink a, Source a) -> IO (Sink a, Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return
( \ Maybe Void -> IO ()
k Maybe a
mx -> case Maybe a
mx of
Just a
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> Maybe a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe a)
q Maybe a
mx)
Maybe a
_ -> do
Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> Maybe a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe a)
q Maybe a
forall a. Maybe a
EOF)
Maybe Void -> IO ()
k Maybe Void
forall a. Maybe a
EOF
, \ Maybe a -> IO ()
k Maybe Void
_ ->
let loop :: IO ()
loop = ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
uninterruptibleMask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ forall a. IO a -> IO a
restore -> do
Maybe a
x <- IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> STM (Maybe a)
forall a. TBQueue a -> STM a
readTBQueue TBQueue (Maybe a)
q)
case Maybe a
x of Just a
_ -> Maybe a -> IO ()
k Maybe a
x IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
Maybe a
_ -> do STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> Maybe a -> STM ()
forall a. TBQueue a -> a -> STM ()
unGetTBQueue TBQueue (Maybe a)
q Maybe a
forall a. Maybe a
EOF)
Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF
in IO ()
loop)
newBroadcastTChanNode :: Int
-> IO (Sink a, IO (Source a))
newBroadcastTChanNode :: Int -> IO (Sink a, IO (Source a))
newBroadcastTChanNode Int
n = do
TChan (Maybe a)
b <- IO (TChan (Maybe a))
forall a. IO (TChan a)
newBroadcastTChanIO
Counter
ec <- Int -> IO Counter
newCounter Int
0
let dupSrc :: IO (Source a)
dupSrc = do
TChan (Maybe a)
c <- STM (TChan (Maybe a)) -> IO (TChan (Maybe a))
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> STM (TChan (Maybe a))
forall a. TChan a -> STM (TChan a)
dupTChan TChan (Maybe a)
b)
Source a -> IO (Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Source a -> IO (Source a)) -> Source a -> IO (Source a)
forall a b. (a -> b) -> a -> b
$ \ Maybe a -> IO ()
k Maybe Void
_ ->
let loop :: IO ()
loop = do
Maybe a
x <- STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> STM (Maybe a)
forall a. TChan a -> STM a
readTChan TChan (Maybe a)
c)
case Maybe a
x of Just a
_ -> Maybe a -> IO ()
k Maybe a
x IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
Maybe a
_ -> Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF
in IO ()
loop
(Sink a, IO (Source a)) -> IO (Sink a, IO (Source a))
forall (m :: * -> *) a. Monad m => a -> m a
return
(\ Maybe Void -> IO ()
k Maybe a
mx -> case Maybe a
mx of
Just a
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> Maybe a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Maybe a)
b Maybe a
mx)
Maybe a
_ -> do Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n) (STM () -> IO ()
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> Maybe a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Maybe a)
b Maybe a
forall a. Maybe a
EOF))
Maybe Void -> IO ()
k Maybe Void
forall a. Maybe a
EOF
, IO (Source a)
dupSrc)