{-# OPTIONS_GHC -Wno-missing-fields #-}
{-# OPTIONS_GHC -Wno-incomplete-patterns #-}
module Z.IO.BIO.Concurrent where
import Control.Monad
import Control.Concurrent.STM
import GHC.Natural
import Z.IO.BIO
import Z.Data.PrimRef
import Z.IO.Exception
newTQueueNode :: Int
-> IO (Sink a, Source a)
newTQueueNode :: Int -> IO (Sink a, Source a)
newTQueueNode Int
n = do
TQueue (Maybe a)
q <- IO (TQueue (Maybe a))
forall a. IO (TQueue a)
newTQueueIO
Counter
ec <- Int -> IO Counter
newCounter Int
0
(Sink a, Source a) -> IO (Sink a, Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return
( (HasCallStack => a -> IO (Maybe Void))
-> (HasCallStack => IO (Maybe Void)) -> Sink a
forall inp out.
(HasCallStack => inp -> IO (Maybe out))
-> (HasCallStack => IO (Maybe out)) -> BIO inp out
BIO (\ a
x -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> Maybe a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Maybe a)
q (a -> Maybe a
forall a. a -> Maybe a
Just a
x)) IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing)
(do Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n) (STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> Maybe a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Maybe a)
q Maybe a
forall a. Maybe a
Nothing))
Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing)
, BIO :: forall inp out.
(HasCallStack => inp -> IO (Maybe out))
-> (HasCallStack => IO (Maybe out)) -> BIO inp out
BIO { pull :: HasCallStack => IO (Maybe a)
pull = ((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
uninterruptibleMask (((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a))
-> ((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ \ forall a. IO a -> IO a
restore -> do
Maybe a
x <- IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> STM (Maybe a)
forall a. TQueue a -> STM a
readTQueue TQueue (Maybe a)
q)
case Maybe a
x of Just _ -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
x
Maybe a
_ -> do STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> Maybe a -> STM ()
forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue (Maybe a)
q Maybe a
forall a. Maybe a
Nothing)
Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing})
newTBQueueNode :: Int
-> Natural
-> IO (Sink a, Source a)
newTBQueueNode :: Int -> Natural -> IO (Sink a, Source a)
newTBQueueNode Int
n Natural
bound = do
TBQueue (Maybe a)
q <- Natural -> IO (TBQueue (Maybe a))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bound
Counter
ec <- Int -> IO Counter
newCounter Int
0
(Sink a, Source a) -> IO (Sink a, Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return
( (HasCallStack => a -> IO (Maybe Void))
-> (HasCallStack => IO (Maybe Void)) -> Sink a
forall inp out.
(HasCallStack => inp -> IO (Maybe out))
-> (HasCallStack => IO (Maybe out)) -> BIO inp out
BIO (\ a
x -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> Maybe a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe a)
q (a -> Maybe a
forall a. a -> Maybe a
Just a
x)) IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing)
(do Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n) (STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> Maybe a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe a)
q Maybe a
forall a. Maybe a
Nothing))
Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing)
, BIO :: forall inp out.
(HasCallStack => inp -> IO (Maybe out))
-> (HasCallStack => IO (Maybe out)) -> BIO inp out
BIO { pull :: HasCallStack => IO (Maybe a)
pull = ((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
uninterruptibleMask (((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a))
-> ((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ \ forall a. IO a -> IO a
restore -> do
Maybe a
x <- IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> STM (Maybe a)
forall a. TBQueue a -> STM a
readTBQueue TBQueue (Maybe a)
q)
case Maybe a
x of Just _ -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
x
Maybe a
_ -> do STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> Maybe a -> STM ()
forall a. TBQueue a -> a -> STM ()
unGetTBQueue TBQueue (Maybe a)
q Maybe a
forall a. Maybe a
Nothing)
Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing})
newBroadcastTChanNode :: Int
-> IO (Sink a, IO (Source a))
newBroadcastTChanNode :: Int -> IO (Sink a, IO (Source a))
newBroadcastTChanNode Int
n = do
TChan (Maybe a)
b <- IO (TChan (Maybe a))
forall a. IO (TChan a)
newBroadcastTChanIO
Counter
ec <- Int -> IO Counter
newCounter Int
0
let dupSrc :: IO (Source a)
dupSrc = do
TChan (Maybe a)
c <- STM (TChan (Maybe a)) -> IO (TChan (Maybe a))
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> STM (TChan (Maybe a))
forall a. TChan a -> STM (TChan a)
dupTChan TChan (Maybe a)
b)
Source a -> IO (Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO :: forall inp out.
(HasCallStack => inp -> IO (Maybe out))
-> (HasCallStack => IO (Maybe out)) -> BIO inp out
BIO { pull :: HasCallStack => IO (Maybe a)
pull = do
Maybe a
x <- STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> STM (Maybe a)
forall a. TChan a -> STM a
readTChan TChan (Maybe a)
c)
case Maybe a
x of Just _ -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
x
Maybe a
_ -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing })
(Sink a, IO (Source a)) -> IO (Sink a, IO (Source a))
forall (m :: * -> *) a. Monad m => a -> m a
return ( (HasCallStack => a -> IO (Maybe Void))
-> (HasCallStack => IO (Maybe Void)) -> Sink a
forall inp out.
(HasCallStack => inp -> IO (Maybe out))
-> (HasCallStack => IO (Maybe out)) -> BIO inp out
BIO (\ a
x -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> Maybe a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Maybe a)
b (a -> Maybe a
forall a. a -> Maybe a
Just a
x)) IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing)
(do Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n) (STM () -> IO ()
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> Maybe a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Maybe a)
b Maybe a
forall a. Maybe a
Nothing))
Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing)
, IO (Source a)
dupSrc)