{-# LANGUAGE
DataKinds
, KindSignatures
, MultiParamTypeClasses
, FunctionalDependencies
, FlexibleInstances
, ScopedTypeVariables
, PartialTypeSignatures
#-}
module Control.Concurrent.Chan.Extra where
import Control.Concurrent.Chan.Typed
( ChanRW (..), writeChanRW, readChanRW, newChanRW)
import Control.Concurrent.STM.TChan.Typed
( TChanRW (..), writeTChanRW, readTChanRW, newTChanRW)
import Control.Concurrent.Chan.Scope (Scope (..), Writable, Readable)
import Data.IORef (newIORef, readIORef, writeIORef)
import Control.Monad (forever)
import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar (newEmptyMVar, tryTakeMVar, putMVar)
import Control.Concurrent.Chan (Chan, readChan, writeChan, newChan)
import Control.Concurrent.Async (Async, async, cancel, wait)
import Control.Concurrent.STM
(atomically, tryTakeTMVar, putTMVar, newEmptyTMVarIO)
import Control.Concurrent.STM.TChan (TChan)
class ChanScoped (c :: Scope -> * -> *) where
readOnly :: Readable scope => c scope a -> c 'Read a
writeOnly :: Writable scope => c scope a -> c 'Write a
allowReading :: Writable scope => c scope a -> c 'ReadWrite a
allowWriting :: Readable scope => c scope a -> c 'ReadWrite a
type DiffNanosec = Int
class ChanExtra (inputC :: * -> *) (outputC :: * -> *)
| inputC -> outputC, outputC -> inputC where
debounceStatic :: DiffNanosec
-> outputC a -> IO (inputC a, Async ())
throttleStatic :: DiffNanosec
-> outputC a -> IO (inputC a, Async ())
intersperseStatic :: DiffNanosec
-> IO a
-> outputC a
-> IO (inputC a, Async (), Async ())
instance ChanExtra Chan Chan where
debounceStatic toWaitFurther outputChan = do
(ChanRW inputChan, t) <- debounceStatic toWaitFurther (ChanRW outputChan :: ChanRW 'Read _)
pure (inputChan, t)
throttleStatic toWaitFurther outputChan = do
(ChanRW inputChan, t) <- throttleStatic toWaitFurther (ChanRW outputChan :: ChanRW 'Read _)
pure (inputChan, t)
intersperseStatic timeBetween xM outputChan = do
(ChanRW inputChan, writer, listener) <- intersperseStatic timeBetween xM (ChanRW outputChan :: ChanRW 'Read _)
pure (inputChan, writer, listener)
instance ChanExtra TChan TChan where
debounceStatic toWaitFurther outputTChan = do
(TChanRW inputTChan, t) <- debounceStatic toWaitFurther (TChanRW outputTChan :: TChanRW 'Read _)
pure (inputTChan, t)
throttleStatic toWaitFurther outputTChan = do
(TChanRW inputTChan, t) <- throttleStatic toWaitFurther (TChanRW outputTChan :: TChanRW 'Read _)
pure (inputTChan, t)
intersperseStatic timeBetween xM outputTChan = do
(TChanRW inputTChan, writer, listener) <- intersperseStatic timeBetween xM (TChanRW outputTChan :: TChanRW 'Read _)
pure (inputTChan, writer, listener)
instance ChanScoped TChanRW where
readOnly (TChanRW x) = TChanRW x
writeOnly (TChanRW x) = TChanRW x
allowReading (TChanRW x) = TChanRW x
allowWriting (TChanRW x) = TChanRW x
instance ChanExtra (TChanRW 'Write) (TChanRW 'Read) where
debounceStatic toWaitFurther outputChan = do
presentedChan <- atomically newTChanRW
writingThread <- newEmptyTMVarIO
writer <- async $ forever $ do
x <- atomically (readTChanRW presentedChan)
newWriter <- async $ do
threadDelay toWaitFurther
atomically (writeTChanRW (allowWriting outputChan) x)
mInvoker <- atomically (tryTakeTMVar writingThread)
case mInvoker of
Nothing -> pure ()
Just i -> cancel i
atomically (putTMVar writingThread newWriter)
pure (writeOnly presentedChan, writer)
throttleStatic toWaitFurther outputChan = do
presentedChan <- atomically newTChanRW
writingThread <- newEmptyTMVarIO
writer <- async $ forever $ do
x <- atomically (readTChanRW presentedChan)
mInvoker <- atomically (tryTakeTMVar writingThread)
case mInvoker of
Nothing -> pure ()
Just i -> wait i
newWriter <- async $ do
threadDelay toWaitFurther
atomically (writeTChanRW (allowWriting outputChan) x)
atomically (putTMVar writingThread newWriter)
pure (writeOnly presentedChan, writer)
intersperseStatic timeBetween xM outputChan = do
presentedChan <- atomically newTChanRW
writingThread <- newEmptyTMVarIO
writer <- async $ forever $ do
mInvoker <- atomically (tryTakeTMVar writingThread)
case mInvoker of
Nothing -> pure ()
Just i -> wait i
newWriter <- async $ do
threadDelay timeBetween
x <- xM
atomically (writeTChanRW (allowWriting outputChan) x)
atomically (putTMVar writingThread newWriter)
listener <- async $ forever $ do
y <- atomically (readTChanRW presentedChan)
mInvoker <- atomically (tryTakeTMVar writingThread)
case mInvoker of
Nothing -> pure ()
Just i -> cancel i
atomically (writeTChanRW (allowWriting outputChan) y)
pure (writeOnly presentedChan, writer, listener)
instance ChanScoped ChanRW where
readOnly (ChanRW x) = ChanRW x
writeOnly (ChanRW x) = ChanRW x
allowReading (ChanRW x) = ChanRW x
allowWriting (ChanRW x) = ChanRW x
instance ChanExtra (ChanRW 'Write) (ChanRW 'Read) where
debounceStatic toWaitFurther outputChan = do
presentedChan <- newChanRW
writingThread <- newEmptyMVar
writer <- async $ forever $ do
x <- readChanRW presentedChan
newWriter <- async $ do
threadDelay toWaitFurther
writeChanRW (allowWriting outputChan) x
mInvoker <- tryTakeMVar writingThread
case mInvoker of
Nothing -> pure ()
Just i -> cancel i
putMVar writingThread newWriter
pure (writeOnly presentedChan, writer)
throttleStatic toWaitFurther outputChan = do
presentedChan <- newChanRW
writingThread <- newEmptyMVar
writer <- async $ forever $ do
x <- readChanRW presentedChan
mInvoker <- tryTakeMVar writingThread
case mInvoker of
Nothing -> pure ()
Just i -> wait i
newWriter <- async $ do
threadDelay toWaitFurther
writeChanRW (allowWriting outputChan) x
putMVar writingThread newWriter
pure (writeOnly presentedChan, writer)
intersperseStatic timeBetween xM outputChan = do
presentedChan <- newChanRW
writingThread <- newEmptyMVar
writer <- async $ forever $ do
mInvoker <- tryTakeMVar writingThread
case mInvoker of
Nothing -> pure ()
Just i -> wait i
newWriter <- async $ do
threadDelay timeBetween
x <- xM
writeChanRW (allowWriting outputChan) x
putMVar writingThread newWriter
listener <- async $ forever $ do
y <- readChanRW presentedChan
mInvoker <- tryTakeMVar writingThread
case mInvoker of
Nothing -> pure ()
Just i -> cancel i
writeChanRW (allowWriting outputChan) y
pure (writeOnly presentedChan, writer, listener)