{-# OPTIONS_GHC -Wno-missing-fields #-}
{-# OPTIONS_GHC -Wno-incomplete-patterns #-}

{-|
Module      : Z.IO.BIO.Concurrent
Description : Base64 codec
Copyright   : (c) Dong Han, 2017-2020
License     : BSD
Maintainer  : winterland1989@gmail.com
Stability   : experimental
Portability : non-portable

This module provides some concurrent 'BIO' node, to ease the implementation of producer-consumer model.
All sources and sinks return by this module are safe to be used in multiple threads.

  * Use 'newTQueueNode' for common cases.
  * Use 'newTBQueueNode' if you have a fast producer and you don't want input get piled up in memory.
  * Use 'newBroadcastTChanNode' if you want messages get broadcasted, i.e. every message written by
    producers will be received by every consumers.

It's important to correctly set the numebr of producers, internally it keeps a counter on how many producers
reached their ends, and send EOF to all consumers when last producer ends. So it's a good idea to catch
exceptions and pull the sink(which indicate EOF) on producer side.

@
(sink, src) <- newTQueueNode 2  -- it's important to correctly set the numebr of producers

--------------------------------------------------------------------------------
-- producers

forkIO $ do
    ...
    push x sink             -- producer using push
    ...
    pull sink               -- when EOF is reached, manually pull, you may consider put it in a bracket.

forkIO $ do
    ...
    (runBIO $ ... >|> sink) -- producer using BIO
        `onException` (pull sink)

--------------------------------------------------------------------------------
-- consumers

forkIO $ do
    ...
    r <- pull src           -- consumer using pull
    case r of Just r' -> ...
              _ -> ...      -- Nothing indicate all producers reached EOF

forkIO $ do
    ...
    runBIO $ src >|> ...    -- consumer using BIO
@

-}

module Z.IO.BIO.Concurrent where

import Control.Monad
import Control.Concurrent.STM
import GHC.Natural
import Z.IO.BIO
import Z.Data.PrimRef
import Z.IO.Exception

-- | Make an unbounded queue and a pair of sink and souce connected to it.
newTQueueNode :: Int -- ^ number of producers
              -> IO (Sink a, Source a)
newTQueueNode :: Int -> IO (Sink a, Source a)
newTQueueNode Int
n = do
    TQueue (Maybe a)
q <- IO (TQueue (Maybe a))
forall a. IO (TQueue a)
newTQueueIO
    Counter
ec <- Int -> IO Counter
newCounter Int
0
    (Sink a, Source a) -> IO (Sink a, Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return
        ( (a -> IO (Maybe Void)) -> IO (Maybe Void) -> Sink a
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (\ a
x -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> Maybe a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Maybe a)
q (a -> Maybe a
forall a. a -> Maybe a
Just a
x)) IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing)
                (do Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
                    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n) (STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> Maybe a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Maybe a)
q Maybe a
forall a. Maybe a
Nothing))
                    Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing)
        , BIO :: forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO { pull :: IO (Maybe a)
pull = ((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
uninterruptibleMask (((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a))
-> ((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ \ forall a. IO a -> IO a
restore -> do
                    Maybe a
x <- IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> STM (Maybe a)
forall a. TQueue a -> STM a
readTQueue TQueue (Maybe a)
q)
                    case Maybe a
x of Just a
_ -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
x
                              Maybe a
_ -> do STM () -> IO ()
forall a. STM a -> IO a
atomically (TQueue (Maybe a) -> Maybe a -> STM ()
forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue (Maybe a)
q Maybe a
forall a. Maybe a
Nothing)
                                      Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing})

-- | Make an bounded queue and a pair of sink and souce connected to it.
newTBQueueNode :: Int       -- ^ number of producers
               -> Natural   -- ^ queue buffer bound
               -> IO (Sink a, Source a)
newTBQueueNode :: Int -> Natural -> IO (Sink a, Source a)
newTBQueueNode Int
n Natural
bound = do
    TBQueue (Maybe a)
q <- Natural -> IO (TBQueue (Maybe a))
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
bound
    Counter
ec <- Int -> IO Counter
newCounter Int
0
    (Sink a, Source a) -> IO (Sink a, Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return
        ( (a -> IO (Maybe Void)) -> IO (Maybe Void) -> Sink a
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (\ a
x -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> Maybe a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe a)
q (a -> Maybe a
forall a. a -> Maybe a
Just a
x)) IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing)
                (do Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
                    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n) (STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> Maybe a -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe a)
q Maybe a
forall a. Maybe a
Nothing))
                    Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing)
        , BIO :: forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO { pull :: IO (Maybe a)
pull = ((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
uninterruptibleMask (((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a))
-> ((forall a. IO a -> IO a) -> IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ \ forall a. IO a -> IO a
restore -> do
                    Maybe a
x <- IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> STM (Maybe a)
forall a. TBQueue a -> STM a
readTBQueue TBQueue (Maybe a)
q)
                    case Maybe a
x of Just a
_ -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
x
                              Maybe a
_      -> do STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe a) -> Maybe a -> STM ()
forall a. TBQueue a -> a -> STM ()
unGetTBQueue TBQueue (Maybe a)
q Maybe a
forall a. Maybe a
Nothing)
                                           Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing})

-- | Make a broadcast chan and a sink connected to it, and a function return sources to receive broadcast message.
newBroadcastTChanNode :: Int                        -- ^ number of producers
                      -> IO (Sink a, IO (Source a)) -- ^ (Sink, IO Source)
newBroadcastTChanNode :: Int -> IO (Sink a, IO (Source a))
newBroadcastTChanNode Int
n = do
    TChan (Maybe a)
b <- IO (TChan (Maybe a))
forall a. IO (TChan a)
newBroadcastTChanIO
    Counter
ec <- Int -> IO Counter
newCounter Int
0
    let dupSrc :: IO (Source a)
dupSrc = do
            TChan (Maybe a)
c <- STM (TChan (Maybe a)) -> IO (TChan (Maybe a))
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> STM (TChan (Maybe a))
forall a. TChan a -> STM (TChan a)
dupTChan TChan (Maybe a)
b)
            Source a -> IO (Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO :: forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO { pull :: IO (Maybe a)
pull = do
                            Maybe a
x <- STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> STM (Maybe a)
forall a. TChan a -> STM a
readTChan TChan (Maybe a)
c)
                            case Maybe a
x of Just a
_ -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
x
                                      Maybe a
_ -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing })
    (Sink a, IO (Source a)) -> IO (Sink a, IO (Source a))
forall (m :: * -> *) a. Monad m => a -> m a
return ( (a -> IO (Maybe Void)) -> IO (Maybe Void) -> Sink a
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (\ a
x -> STM () -> IO ()
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> Maybe a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Maybe a)
b (a -> Maybe a
forall a. a -> Maybe a
Just a
x)) IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing)
                    (do Int
i <- Counter -> Int -> IO Int
atomicAddCounter' Counter
ec Int
1
                        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n) (STM () -> IO ()
forall a. STM a -> IO a
atomically (TChan (Maybe a) -> Maybe a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Maybe a)
b Maybe a
forall a. Maybe a
Nothing))
                        Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing)
           , IO (Source a)
dupSrc)