module Network.WebSockets.Util.PubSub
( PubSub
, newPubSub
, publish
, subscribe
) where
import Control.Applicative ((<$>))
import Control.Exception (IOException, handle)
import Control.Monad (foldM, forever)
import Control.Monad.Trans (liftIO)
import Data.IntMap (IntMap)
import Data.List (foldl')
import qualified Control.Concurrent.MVar as MV
import qualified Data.IntMap as IM
import Network.WebSockets
data PubSub_ p = PubSub_
{ pubSubNextId :: Int
, pubSubSinks :: IntMap (Sink p)
}
addClient :: Sink p -> PubSub_ p -> (PubSub_ p, Int)
addClient sink (PubSub_ nid sinks) =
(PubSub_ (nid + 1) (IM.insert nid sink sinks), nid)
removeClient :: Int -> PubSub_ p -> PubSub_ p
removeClient ref ps = ps {pubSubSinks = IM.delete ref (pubSubSinks ps)}
newtype PubSub p = PubSub (MV.MVar (PubSub_ p))
newPubSub :: IO (PubSub p)
newPubSub = PubSub <$> MV.newMVar PubSub_
{ pubSubNextId = 0
, pubSubSinks = IM.empty
}
publish :: PubSub p -> Message p -> IO ()
publish (PubSub mvar) msg = MV.modifyMVar_ mvar $ \pubSub -> do
broken <- foldM publish' [] (IM.toList $ pubSubSinks pubSub)
return $ foldl' (\p b -> removeClient b p) pubSub broken
where
publish' broken (i, s) =
handle (\(_ :: IOException) -> return (i : broken)) $ do
sendSink s msg
return broken
subscribe :: Protocol p => PubSub p -> WebSockets p ()
subscribe (PubSub mvar) = do
sink <- getSink
ref <- liftIO $ MV.modifyMVar mvar $ return . addClient sink
catchWsError loop $ const $ liftIO $
MV.modifyMVar_ mvar $ return . removeClient ref
where
loop = forever $ do
_ <- receiveDataMessage
return ()