{-# LANGUAGE FlexibleContexts           #-}
{-|
Module      : Control.Concurrent.NQE.Publisher
Copyright   : No rights reserved
License     : UNLICENSE
Maintainer  : xenog@protonmail.com
Stability   : experimental
Portability : POSIX

A publisher is a process that forwards messages to subscribers. NQE publishers
are simple, and do not implement filtering directly, although that can be done
on the 'STM' 'Listen' actions that forward messages to subscribers.

If a subscriber has been added to a publisher using the 'subscribe' function, it
needs to be removed later using 'unsubscribe' when it is no longer needed, or
the publisher will continue calling its 'Listen' action in the future, likely
causing memory leaks.
-}
module Control.Concurrent.NQE.Publisher
    ( Subscriber
    , PublisherMessage(..)
    , Publisher
    , withSubscription
    , subscribe
    , unsubscribe
    , withPublisher
    , publisher
    , publisherProcess
    ) where

import           Control.Concurrent.NQE.Process
import           Control.Concurrent.Unique
import           Control.Monad.Reader
import           Data.Function
import           Data.Hashable
import           Data.List
import           UnliftIO

-- | Handle of a subscriber to a process. Should be kept in order to
-- unsubscribe.
data Subscriber msg = Subscriber (Listen msg) Unique

instance Eq (Subscriber msg) where
    (==) = (==) `on` f
      where
        f (Subscriber _ u) = u

instance Hashable (Subscriber msg) where
    hashWithSalt i (Subscriber _ u) = hashWithSalt i u

-- | Messages that a publisher will take.
data PublisherMessage msg
    = Subscribe !(Listen msg) !(Listen (Subscriber msg))
    | Unsubscribe !(Subscriber msg)
    | Event msg

-- | Alias for a publisher process.
type Publisher msg = Process (PublisherMessage msg)

-- | Create a mailbox, subscribe it to a publisher and pass it to the supplied
-- function . End subscription when function returns.
withSubscription ::
       MonadUnliftIO m => Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription pub f = do
    inbox <- newInbox
    let sub = subscribe pub (`sendSTM` inbox)
        unsub = unsubscribe pub
    bracket sub unsub $ \_ -> f inbox

-- | 'Listen' to events from a publisher.
subscribe :: MonadIO m => Publisher msg -> Listen msg -> m (Subscriber msg)
subscribe pub sub = Subscribe sub `query` pub

-- | Stop listening to events from a publisher. Must provide 'Subscriber' that
-- was returned from corresponding 'subscribe' action.
unsubscribe :: MonadIO m => Publisher msg -> Subscriber msg -> m ()
unsubscribe pub sub = Unsubscribe sub `send` pub

-- | Start a publisher in the background and pass it to a function. The
-- publisher will be stopped when the function function returns.
withPublisher :: MonadUnliftIO m => (Publisher msg -> m a) -> m a
withPublisher = withProcess publisherProcess

-- | Start a publisher in the background.
publisher :: MonadUnliftIO m => m (Publisher msg)
publisher = process publisherProcess

-- | Start a publisher in the current thread.
publisherProcess :: MonadUnliftIO m => Inbox (PublisherMessage msg) -> m ()
publisherProcess inbox = newTVarIO [] >>= runReaderT go
  where
    go = forever $ receive inbox >>= publisherMessage

-- | Internal function to dispatch a publisher message.
publisherMessage ::
       (MonadIO m, MonadReader (TVar [Subscriber msg]) m)
    => PublisherMessage msg
    -> m ()
publisherMessage (Subscribe sub r) =
    ask >>= \box -> do
        u <- liftIO newUnique
        let s = Subscriber sub u
        atomically $ do
            modifyTVar box (`union` [s])
            r s
publisherMessage (Unsubscribe sub) =
    ask >>= \box -> atomically (modifyTVar box (delete sub))
publisherMessage (Event event) =
    ask >>= \box ->
        atomically $
        readTVar box >>= \subs ->
            forM_ subs $ \(Subscriber sub _) -> sub event