module Engine.Worker
( Versioned(..)
, Var
, newVar
, readVar
, stateVar
, stateVarMap
, HasInput(..)
, pushInput
, pushInputSTM
, updateInput
, updateInputSTM
, getInputData
, getInputDataSTM
, HasConfig(..)
, modifyConfig
, modifyConfigSTM
, HasOutput(..)
, pushOutput
, pushOutputSTM
, updateOutput
, updateOutputSTM
, getOutputData
, getOutputDataSTM
, Cell
, spawnCell
, Timed(..)
, spawnTimed
, spawnTimed_
, Merge(..)
, spawnMerge1
, spawnMerge2
, spawnMerge3
, spawnMerge4
, spawnMergeT
, ObserverIO
, newObserverIO
, observeIO
, observeIO_
, readObservedIO
, Source
, newSource
, pubSource
, subSource
) where
import RIO
import Control.Concurrent.Chan.Unagi.Unboxed (UnagiPrim)
import Control.Concurrent.Chan.Unagi.Unboxed qualified as UnagiPrim
import Data.Vector.Unboxed qualified as Unboxed
import UnliftIO.Concurrent (forkIO, killThread)
import UnliftIO.Resource (MonadResource)
import UnliftIO.Resource qualified as Resource
import Data.StateVar qualified as StateVar
data Versioned a = Versioned
{ forall a. Versioned a -> Vector Word64
vVersion :: Unboxed.Vector Word64
, forall a. Versioned a -> a
vData :: a
}
deriving (Int -> Versioned a -> ShowS
forall a. Show a => Int -> Versioned a -> ShowS
forall a. Show a => [Versioned a] -> ShowS
forall a. Show a => Versioned a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Versioned a] -> ShowS
$cshowList :: forall a. Show a => [Versioned a] -> ShowS
show :: Versioned a -> String
$cshow :: forall a. Show a => Versioned a -> String
showsPrec :: Int -> Versioned a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> Versioned a -> ShowS
Show, forall a b. a -> Versioned b -> Versioned a
forall a b. (a -> b) -> Versioned a -> Versioned b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Versioned b -> Versioned a
$c<$ :: forall a b. a -> Versioned b -> Versioned a
fmap :: forall a b. (a -> b) -> Versioned a -> Versioned b
$cfmap :: forall a b. (a -> b) -> Versioned a -> Versioned b
Functor, forall a. Eq a => a -> Versioned a -> Bool
forall a. Num a => Versioned a -> a
forall a. Ord a => Versioned a -> a
forall m. Monoid m => Versioned m -> m
forall a. Versioned a -> Bool
forall a. Versioned a -> Int
forall a. Versioned a -> [a]
forall a. (a -> a -> a) -> Versioned a -> a
forall m a. Monoid m => (a -> m) -> Versioned a -> m
forall b a. (b -> a -> b) -> b -> Versioned a -> b
forall a b. (a -> b -> b) -> b -> Versioned a -> b
forall (t :: * -> *).
(forall m. Monoid m => t m -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. t a -> [a])
-> (forall a. t a -> Bool)
-> (forall a. t a -> Int)
-> (forall a. Eq a => a -> t a -> Bool)
-> (forall a. Ord a => t a -> a)
-> (forall a. Ord a => t a -> a)
-> (forall a. Num a => t a -> a)
-> (forall a. Num a => t a -> a)
-> Foldable t
product :: forall a. Num a => Versioned a -> a
$cproduct :: forall a. Num a => Versioned a -> a
sum :: forall a. Num a => Versioned a -> a
$csum :: forall a. Num a => Versioned a -> a
minimum :: forall a. Ord a => Versioned a -> a
$cminimum :: forall a. Ord a => Versioned a -> a
maximum :: forall a. Ord a => Versioned a -> a
$cmaximum :: forall a. Ord a => Versioned a -> a
elem :: forall a. Eq a => a -> Versioned a -> Bool
$celem :: forall a. Eq a => a -> Versioned a -> Bool
length :: forall a. Versioned a -> Int
$clength :: forall a. Versioned a -> Int
null :: forall a. Versioned a -> Bool
$cnull :: forall a. Versioned a -> Bool
toList :: forall a. Versioned a -> [a]
$ctoList :: forall a. Versioned a -> [a]
foldl1 :: forall a. (a -> a -> a) -> Versioned a -> a
$cfoldl1 :: forall a. (a -> a -> a) -> Versioned a -> a
foldr1 :: forall a. (a -> a -> a) -> Versioned a -> a
$cfoldr1 :: forall a. (a -> a -> a) -> Versioned a -> a
foldl' :: forall b a. (b -> a -> b) -> b -> Versioned a -> b
$cfoldl' :: forall b a. (b -> a -> b) -> b -> Versioned a -> b
foldl :: forall b a. (b -> a -> b) -> b -> Versioned a -> b
$cfoldl :: forall b a. (b -> a -> b) -> b -> Versioned a -> b
foldr' :: forall a b. (a -> b -> b) -> b -> Versioned a -> b
$cfoldr' :: forall a b. (a -> b -> b) -> b -> Versioned a -> b
foldr :: forall a b. (a -> b -> b) -> b -> Versioned a -> b
$cfoldr :: forall a b. (a -> b -> b) -> b -> Versioned a -> b
foldMap' :: forall m a. Monoid m => (a -> m) -> Versioned a -> m
$cfoldMap' :: forall m a. Monoid m => (a -> m) -> Versioned a -> m
foldMap :: forall m a. Monoid m => (a -> m) -> Versioned a -> m
$cfoldMap :: forall m a. Monoid m => (a -> m) -> Versioned a -> m
fold :: forall m. Monoid m => Versioned m -> m
$cfold :: forall m. Monoid m => Versioned m -> m
Foldable, Functor Versioned
Foldable Versioned
forall (t :: * -> *).
Functor t
-> Foldable t
-> (forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> t a -> f (t b))
-> (forall (f :: * -> *) a. Applicative f => t (f a) -> f (t a))
-> (forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> t a -> m (t b))
-> (forall (m :: * -> *) a. Monad m => t (m a) -> m (t a))
-> Traversable t
forall (m :: * -> *) a.
Monad m =>
Versioned (m a) -> m (Versioned a)
forall (f :: * -> *) a.
Applicative f =>
Versioned (f a) -> f (Versioned a)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Versioned a -> m (Versioned b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Versioned a -> f (Versioned b)
sequence :: forall (m :: * -> *) a.
Monad m =>
Versioned (m a) -> m (Versioned a)
$csequence :: forall (m :: * -> *) a.
Monad m =>
Versioned (m a) -> m (Versioned a)
mapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Versioned a -> m (Versioned b)
$cmapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Versioned a -> m (Versioned b)
sequenceA :: forall (f :: * -> *) a.
Applicative f =>
Versioned (f a) -> f (Versioned a)
$csequenceA :: forall (f :: * -> *) a.
Applicative f =>
Versioned (f a) -> f (Versioned a)
traverse :: forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Versioned a -> f (Versioned b)
$ctraverse :: forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Versioned a -> f (Versioned b)
Traversable)
instance Eq (Versioned a) where
Versioned a
a == :: Versioned a -> Versioned a -> Bool
== Versioned a
b = forall a. Versioned a -> Vector Word64
vVersion Versioned a
a forall a. Eq a => a -> a -> Bool
== forall a. Versioned a -> Vector Word64
vVersion Versioned a
b
instance Ord (Versioned a) where
compare :: Versioned a -> Versioned a -> Ordering
compare Versioned a
a Versioned a
b = forall a. Ord a => a -> a -> Ordering
compare (forall a. Versioned a -> Vector Word64
vVersion Versioned a
a) (forall a. Versioned a -> Vector Word64
vVersion Versioned a
b)
type Var a = TVar (Versioned a)
newVar :: MonadUnliftIO m => a -> m (Var a)
newVar :: forall (m :: * -> *) a. MonadUnliftIO m => a -> m (Var a)
newVar a
initial = forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a. Unbox a => a -> Vector a
Unboxed.singleton Word64
0
, $sel:vData:Versioned :: a
vData = a
initial
}
stateVar :: HasInput var => var -> StateVar.StateVar (GetInput var)
stateVar :: forall var. HasInput var => var -> StateVar (GetInput var)
stateVar var
var = forall a. IO a -> (a -> IO ()) -> StateVar a
StateVar.makeStateVar
(forall worker (m :: * -> *).
(HasInput worker, MonadIO m) =>
worker -> m (GetInput worker)
getInputData var
var)
(\GetInput var
new -> forall (m :: * -> *) var.
(MonadIO m, HasInput var) =>
var -> (GetInput var -> GetInput var) -> m ()
pushInput var
var \GetInput var
_old -> GetInput var
new)
stateVarMap
:: HasInput var
=> (GetInput var -> a)
-> (a -> GetInput var -> GetInput var)
-> var
-> StateVar.StateVar a
stateVarMap :: forall var a.
HasInput var =>
(GetInput var -> a)
-> (a -> GetInput var -> GetInput var) -> var -> StateVar a
stateVarMap GetInput var -> a
mapGet a -> GetInput var -> GetInput var
mapSet var
var = forall a. IO a -> (a -> IO ()) -> StateVar a
StateVar.makeStateVar
(forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap GetInput var -> a
mapGet forall a b. (a -> b) -> a -> b
$ forall worker (m :: * -> *).
(HasInput worker, MonadIO m) =>
worker -> m (GetInput worker)
getInputData var
var)
(\a
new -> forall (m :: * -> *) var.
(MonadIO m, HasInput var) =>
var -> (GetInput var -> GetInput var) -> m ()
pushInput var
var (a -> GetInput var -> GetInput var
mapSet a
new))
{-# INLINEABLE readVar #-}
readVar :: (MonadUnliftIO m) => Var a -> m a
readVar :: forall (m :: * -> *) a. MonadUnliftIO m => Var a -> m a
readVar = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO
{-# INLINEABLE pushVarSTM #-}
pushVarSTM :: Var a -> (a -> a) -> STM ()
pushVarSTM :: forall a. Var a -> (a -> a) -> STM ()
pushVarSTM Var a
var a -> a
f =
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' Var a
var \Versioned{Vector Word64
vVersion :: Vector Word64
$sel:vVersion:Versioned :: forall a. Versioned a -> Vector Word64
vVersion, a
vData :: a
$sel:vData:Versioned :: forall a. Versioned a -> a
vData} ->
Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
Unboxed.map (forall a. Num a => a -> a -> a
+ Word64
1) Vector Word64
vVersion
, $sel:vData:Versioned :: a
vData = a -> a
f a
vData
}
{-# INLINEABLE updateVarSTM #-}
updateVarSTM :: Var a -> (a -> Maybe a) -> STM ()
updateVarSTM :: forall a. Var a -> (a -> Maybe a) -> STM ()
updateVarSTM Var a
var a -> Maybe a
f = do
Versioned a
current <- forall a. TVar a -> STM a
readTVar Var a
var
let
oldData :: a
oldData = forall a. Versioned a -> a
vData Versioned a
current
case a -> Maybe a
f a
oldData of
Maybe a
Nothing ->
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just a
newData ->
forall a. TVar a -> a -> STM ()
writeTVar Var a
var Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
Unboxed.map (forall a. Num a => a -> a -> a
+Word64
1) (forall a. Versioned a -> Vector Word64
vVersion Versioned a
current)
, $sel:vData:Versioned :: a
vData = a
newData
}
class HasInput a where
type GetInput a
getInput :: a -> Var (GetInput a)
instance HasInput (Var a) where
type GetInput (Var a) = a
getInput :: Var a -> Var (GetInput (Var a))
getInput = forall a. a -> a
id
instance HasInput a => HasInput (a, b) where
type GetInput (a, b) = GetInput a
getInput :: (a, b) -> Var (GetInput (a, b))
getInput = forall a. HasInput a => a -> Var (GetInput a)
getInput forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst
{-# INLINEABLE pushInput #-}
pushInput
:: (MonadIO m, HasInput var)
=> var
-> (GetInput var -> GetInput var)
-> m ()
pushInput :: forall (m :: * -> *) var.
(MonadIO m, HasInput var) =>
var -> (GetInput var -> GetInput var) -> m ()
pushInput var
input =
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall var.
HasInput var =>
var -> (GetInput var -> GetInput var) -> STM ()
pushInputSTM var
input
{-# INLINEABLE pushInputSTM #-}
pushInputSTM
:: HasInput var
=> var
-> (GetInput var -> GetInput var)
-> STM ()
pushInputSTM :: forall var.
HasInput var =>
var -> (GetInput var -> GetInput var) -> STM ()
pushInputSTM var
input = forall a. Var a -> (a -> a) -> STM ()
pushVarSTM (forall a. HasInput a => a -> Var (GetInput a)
getInput var
input)
{-# INLINEABLE updateInput #-}
updateInput
:: ( MonadIO m
, HasInput var
)
=> var
-> (GetInput var -> Maybe (GetInput var))
-> m ()
updateInput :: forall (m :: * -> *) var.
(MonadIO m, HasInput var) =>
var -> (GetInput var -> Maybe (GetInput var)) -> m ()
updateInput var
input =
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall var.
HasInput var =>
var -> (GetInput var -> Maybe (GetInput var)) -> STM ()
updateInputSTM var
input
updateInputSTM
:: HasInput var
=> var
-> (GetInput var -> Maybe (GetInput var))
-> STM ()
updateInputSTM :: forall var.
HasInput var =>
var -> (GetInput var -> Maybe (GetInput var)) -> STM ()
updateInputSTM var
input = forall a. Var a -> (a -> Maybe a) -> STM ()
updateVarSTM (forall a. HasInput a => a -> Var (GetInput a)
getInput var
input)
{-# INLINEABLE getInputData #-}
getInputData :: (HasInput worker, MonadIO m) => worker -> m (GetInput worker)
getInputData :: forall worker (m :: * -> *).
(HasInput worker, MonadIO m) =>
worker -> m (GetInput worker)
getInputData = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. HasInput a => a -> Var (GetInput a)
getInput
{-# INLINEABLE getInputDataSTM #-}
getInputDataSTM :: (HasInput worker) => worker -> STM (GetInput worker)
getInputDataSTM :: forall worker. HasInput worker => worker -> STM (GetInput worker)
getInputDataSTM = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TVar a -> STM a
readTVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. HasInput a => a -> Var (GetInput a)
getInput
class HasConfig a where
type GetConfig a
getConfig :: a -> TVar (GetConfig a)
instance HasConfig (TVar a) where
type GetConfig (TVar a) = a
getConfig :: TVar a -> TVar (GetConfig (TVar a))
getConfig = forall a. a -> a
id
modifyConfig
:: (MonadIO m, HasConfig var)
=> var
-> (GetConfig var -> GetConfig var)
-> m ()
modifyConfig :: forall (m :: * -> *) var.
(MonadIO m, HasConfig var) =>
var -> (GetConfig var -> GetConfig var) -> m ()
modifyConfig var
config =
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall var.
HasConfig var =>
var -> (GetConfig var -> GetConfig var) -> STM ()
modifyConfigSTM var
config
modifyConfigSTM
:: HasConfig var
=> var
-> (GetConfig var -> GetConfig var)
-> STM ()
modifyConfigSTM :: forall var.
HasConfig var =>
var -> (GetConfig var -> GetConfig var) -> STM ()
modifyConfigSTM var
config =
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (forall a. HasConfig a => a -> TVar (GetConfig a)
getConfig var
config)
class HasOutput a where
type GetOutput a
getOutput :: a -> Var (GetOutput a)
instance HasOutput (Var a) where
type GetOutput (Var a) = a
getOutput :: Var a -> Var (GetOutput (Var a))
getOutput = forall a. a -> a
id
instance HasOutput b => HasOutput (a, b) where
type GetOutput (a, b) = GetOutput b
getOutput :: (a, b) -> Var (GetOutput (a, b))
getOutput = forall a. HasOutput a => a -> Var (GetOutput a)
getOutput forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> b
snd
{-# INLINEABLE pushOutput #-}
pushOutput
:: (MonadIO m, HasOutput var)
=> var
-> (GetOutput var -> GetOutput var)
-> m ()
pushOutput :: forall (m :: * -> *) var.
(MonadIO m, HasOutput var) =>
var -> (GetOutput var -> GetOutput var) -> m ()
pushOutput var
output =
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall var.
HasOutput var =>
var -> (GetOutput var -> GetOutput var) -> STM ()
pushOutputSTM var
output
pushOutputSTM
:: HasOutput var
=> var
-> (GetOutput var -> GetOutput var)
-> STM ()
pushOutputSTM :: forall var.
HasOutput var =>
var -> (GetOutput var -> GetOutput var) -> STM ()
pushOutputSTM var
output GetOutput var -> GetOutput var
f =
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput var
output) \Versioned{Vector Word64
vVersion :: Vector Word64
$sel:vVersion:Versioned :: forall a. Versioned a -> Vector Word64
vVersion, GetOutput var
vData :: GetOutput var
$sel:vData:Versioned :: forall a. Versioned a -> a
vData} ->
Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
Unboxed.map (forall a. Num a => a -> a -> a
+Word64
1) Vector Word64
vVersion
, $sel:vData:Versioned :: GetOutput var
vData = GetOutput var -> GetOutput var
f GetOutput var
vData
}
{-# INLINEABLE updateOutput #-}
updateOutput
:: (MonadIO m, HasOutput var)
=> var
-> (GetOutput var -> Maybe (GetOutput var))
-> m ()
updateOutput :: forall (m :: * -> *) var.
(MonadIO m, HasOutput var) =>
var -> (GetOutput var -> Maybe (GetOutput var)) -> m ()
updateOutput var
output =
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall var.
HasOutput var =>
var -> (GetOutput var -> Maybe (GetOutput var)) -> STM ()
updateOutputSTM var
output
updateOutputSTM
:: HasOutput var
=> var
-> (GetOutput var -> Maybe (GetOutput var))
-> STM ()
updateOutputSTM :: forall var.
HasOutput var =>
var -> (GetOutput var -> Maybe (GetOutput var)) -> STM ()
updateOutputSTM var
output GetOutput var -> Maybe (GetOutput var)
f = do
Versioned (GetOutput var)
current <- forall a. TVar a -> STM a
readTVar Var (GetOutput var)
outputVar
let
oldData :: GetOutput var
oldData = forall a. Versioned a -> a
vData Versioned (GetOutput var)
current
case GetOutput var -> Maybe (GetOutput var)
f GetOutput var
oldData of
Maybe (GetOutput var)
Nothing ->
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just GetOutput var
newData ->
forall a. TVar a -> a -> STM ()
writeTVar Var (GetOutput var)
outputVar Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a b. (Unbox a, Unbox b) => (a -> b) -> Vector a -> Vector b
Unboxed.map (forall a. Num a => a -> a -> a
+Word64
1) (forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput var)
current)
, $sel:vData:Versioned :: GetOutput var
vData = GetOutput var
newData
}
where
outputVar :: Var (GetOutput var)
outputVar = forall a. HasOutput a => a -> Var (GetOutput a)
getOutput var
output
{-# INLINEABLE getOutputData #-}
getOutputData :: (HasOutput worker, MonadIO m) => worker -> m (GetOutput worker)
getOutputData :: forall worker (m :: * -> *).
(HasOutput worker, MonadIO m) =>
worker -> m (GetOutput worker)
getOutputData = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. HasOutput a => a -> Var (GetOutput a)
getOutput
{-# INLINEABLE getOutputDataSTM #-}
getOutputDataSTM :: (HasOutput worker) => worker -> STM (GetOutput worker)
getOutputDataSTM :: forall worker. HasOutput worker => worker -> STM (GetOutput worker)
getOutputDataSTM = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TVar a -> STM a
readTVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. HasOutput a => a -> Var (GetOutput a)
getOutput
type Cell input output = (Var input, Merge output)
spawnCell
:: ( MonadUnliftIO m
, MonadResource m
)
=> (input -> output)
-> input
-> m (Cell input output)
spawnCell :: forall (m :: * -> *) input output.
(MonadUnliftIO m, MonadResource m) =>
(input -> output) -> input -> m (Cell input output)
spawnCell input -> output
f input
initialInput = do
Var input
input <- forall (m :: * -> *) a. MonadUnliftIO m => a -> m (Var a)
newVar input
initialInput
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Var input
input,) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) i o.
(MonadUnliftIO m, MonadResource m, HasOutput i) =>
(GetOutput i -> o) -> i -> m (Merge o)
spawnMerge1 input -> output
f Var input
input
data Timed config output = Timed
{ forall config output. Timed config output -> ThreadId
tWorker :: ThreadId
, forall config output. Timed config output -> ReleaseKey
tKey :: Resource.ReleaseKey
, forall config output. Timed config output -> TVar Bool
tActive :: TVar Bool
, forall config output. Timed config output -> TVar config
tConfig :: TVar config
, forall config output. Timed config output -> Var output
tOutput :: Var output
}
instance HasConfig (Timed config output) where
type GetConfig (Timed config output) = config
getConfig :: Timed config output -> TVar (GetConfig (Timed config output))
getConfig = forall config output. Timed config output -> TVar config
tConfig
instance HasOutput (Timed config output) where
type GetOutput (Timed config output) = output
getOutput :: Timed config output -> Var (GetOutput (Timed config output))
getOutput = forall config output. Timed config output -> Var output
tOutput
spawnTimed_
:: ( MonadUnliftIO m
, MonadResource m
)
=> Bool
-> Int
-> output
-> m output
-> m (Timed () output)
spawnTimed_ :: forall (m :: * -> *) output.
(MonadUnliftIO m, MonadResource m) =>
Bool -> Int -> output -> m output -> m (Timed () output)
spawnTimed_ Bool
startActive Int
dt output
initialOutput m output
stepF =
forall (m :: * -> *) config output state.
(MonadUnliftIO m, MonadResource m) =>
Bool
-> Either Int (config -> Int)
-> (config -> m (output, state))
-> (state -> config -> m (Maybe output, state))
-> config
-> m (Timed config output)
spawnTimed
Bool
startActive
(forall a b. a -> Either a b
Left Int
dt)
(\() -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (output
initialOutput, ()))
(\()
_old () ->
m output
stepF forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \output
res ->
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just output
res, ())
)
()
spawnTimed
:: ( MonadUnliftIO m
, MonadResource m
)
=> Bool
-> Either Int (config -> Int)
-> (config -> m (output, state))
-> (state -> config -> m (Maybe output, state))
-> config
-> m (Timed config output)
spawnTimed :: forall (m :: * -> *) config output state.
(MonadUnliftIO m, MonadResource m) =>
Bool
-> Either Int (config -> Int)
-> (config -> m (output, state))
-> (state -> config -> m (Maybe output, state))
-> config
-> m (Timed config output)
spawnTimed Bool
startActive Either Int (config -> Int)
dtF config -> m (output, state)
initF state -> config -> m (Maybe output, state)
stepF config
initialConfig = do
TVar Bool
tActive <- forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
startActive
TVar config
tConfig <- forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO config
initialConfig
(output
initialOutput, state
initialState) <- config -> m (output, state)
initF config
initialConfig
Var output
tOutput <- forall (m :: * -> *) a. MonadUnliftIO m => a -> m (Var a)
newVar output
initialOutput
ThreadId
tWorker <- forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
TVar Bool -> TVar config -> Var output -> state -> m ()
step TVar Bool
tActive TVar config
tConfig Var output
tOutput state
initialState
ReleaseKey
tKey <- forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
tWorker
pure Timed{ThreadId
TVar config
TVar Bool
Var output
ReleaseKey
tKey :: ReleaseKey
tWorker :: ThreadId
tOutput :: Var output
tConfig :: TVar config
tActive :: TVar Bool
$sel:tOutput:Timed :: Var output
$sel:tConfig:Timed :: TVar config
$sel:tActive:Timed :: TVar Bool
$sel:tKey:Timed :: ReleaseKey
$sel:tWorker:Timed :: ThreadId
..}
where
step :: TVar Bool -> TVar config -> Var output -> state -> m ()
step TVar Bool
activeVar TVar config
configVar Var output
output state
curState = do
case Either Int (config -> Int)
dtF of
Left Int
static ->
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
static
Right config -> Int
fromConfig ->
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar config
configVar forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay forall b c a. (b -> c) -> (a -> b) -> a -> c
. config -> Int
fromConfig
Bool
active <- forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar Bool
activeVar
if Bool
active then do
config
config <- forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar config
configVar
(Maybe output
nextOutput, state
nextState) <- state -> config -> m (Maybe output, state)
stepF state
curState config
config
forall (m :: * -> *) var.
(MonadIO m, HasOutput var) =>
var -> (GetOutput var -> Maybe (GetOutput var)) -> m ()
updateOutput Var output
output forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const Maybe output
nextOutput
TVar Bool -> TVar config -> Var output -> state -> m ()
step TVar Bool
activeVar TVar config
configVar Var output
output state
nextState
else
TVar Bool -> TVar config -> Var output -> state -> m ()
step TVar Bool
activeVar TVar config
configVar Var output
output state
curState
data Merge o = Merge
{ forall o. Merge o -> ThreadId
mWorker :: ThreadId
, forall o. Merge o -> ReleaseKey
mKey :: Resource.ReleaseKey
, forall o. Merge o -> TVar (Versioned o)
mOutput :: TVar (Versioned o)
}
instance HasOutput (Merge o) where
type GetOutput (Merge o) = o
getOutput :: Merge o -> Var (GetOutput (Merge o))
getOutput = forall o. Merge o -> TVar (Versioned o)
mOutput
spawnMerge1
:: ( MonadUnliftIO m
, MonadResource m
, HasOutput i
)
=> (GetOutput i -> o)
-> i
-> m (Merge o)
spawnMerge1 :: forall (m :: * -> *) i o.
(MonadUnliftIO m, MonadResource m, HasOutput i) =>
(GetOutput i -> o) -> i -> m (Merge o)
spawnMerge1 GetOutput i -> o
f i
i = do
TVar (Versioned o)
output <- forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
Versioned (GetOutput i)
initial <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i
i)
forall a. a -> STM (TVar a)
newTVar Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput i)
initial
, $sel:vData:Versioned :: o
vData = GetOutput i -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i)
initial)
}
ThreadId
worker <- forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
Versioned (GetOutput i)
next <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i
i)
Versioned o
old <- forall a. TVar a -> STM a
readTVar TVar (Versioned o)
output
let nextVersion :: Vector Word64
nextVersion = forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput i)
next
if Vector Word64
nextVersion forall a. Ord a => a -> a -> Bool
> forall a. Versioned a -> Vector Word64
vVersion Versioned o
old then
forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned o)
output Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
, $sel:vData:Versioned :: o
vData = GetOutput i -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i)
next)
}
else
forall a. STM a
retrySTM
ReleaseKey
key <- forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
worker
pure Merge
{ $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
, $sel:mKey:Merge :: ReleaseKey
mKey = ReleaseKey
key
, $sel:mOutput:Merge :: TVar (Versioned o)
mOutput = TVar (Versioned o)
output
}
spawnMerge2
:: ( MonadUnliftIO m
, MonadResource m
, HasOutput i1
, HasOutput i2
)
=> (GetOutput i1 -> GetOutput i2 -> o)
-> i1
-> i2
-> m (Merge o)
spawnMerge2 :: forall (m :: * -> *) i1 i2 o.
(MonadUnliftIO m, MonadResource m, HasOutput i1, HasOutput i2) =>
(GetOutput i1 -> GetOutput i2 -> o) -> i1 -> i2 -> m (Merge o)
spawnMerge2 GetOutput i1 -> GetOutput i2 -> o
f i1
i1 i2
i2 = do
TVar (Versioned o)
output <- forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
(Versioned (GetOutput i1)
initial1, Versioned (GetOutput i2)
initial2) <- (,)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
forall a. a -> STM (TVar a)
newTVar Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = forall {a} {a}. Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
initial1 Versioned (GetOutput i2)
initial2
, $sel:vData:Versioned :: o
vData = GetOutput i1 -> GetOutput i2 -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i1)
initial1) (forall a. Versioned a -> a
vData Versioned (GetOutput i2)
initial2)
}
ThreadId
worker <- forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
Versioned (GetOutput i1)
next1 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
Versioned (GetOutput i2)
next2 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
Versioned o
old <- forall a. TVar a -> STM a
readTVar TVar (Versioned o)
output
let nextVersion :: Vector Word64
nextVersion = forall {a} {a}. Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
next1 Versioned (GetOutput i2)
next2
if Vector Word64
nextVersion forall a. Ord a => a -> a -> Bool
> forall a. Versioned a -> Vector Word64
vVersion Versioned o
old then
forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned o)
output Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
, $sel:vData:Versioned :: o
vData = GetOutput i1 -> GetOutput i2 -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i1)
next1) (forall a. Versioned a -> a
vData Versioned (GetOutput i2)
next2)
}
else
forall a. STM a
retrySTM
ReleaseKey
key <- forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
worker
pure Merge
{ $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
, $sel:mKey:Merge :: ReleaseKey
mKey = ReleaseKey
key
, $sel:mOutput:Merge :: TVar (Versioned o)
mOutput = TVar (Versioned o)
output
}
where
mkVersion :: Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned a
a Versioned a
b = forall a. Versioned a -> Vector Word64
vVersion Versioned a
a forall a. Semigroup a => a -> a -> a
<> forall a. Versioned a -> Vector Word64
vVersion Versioned a
b
spawnMerge3
:: ( MonadUnliftIO m
, MonadResource m
, HasOutput i1
, HasOutput i2
, HasOutput i3
)
=> (GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o)
-> i1
-> i2
-> i3
-> m (Merge o)
spawnMerge3 :: forall (m :: * -> *) i1 i2 i3 o.
(MonadUnliftIO m, MonadResource m, HasOutput i1, HasOutput i2,
HasOutput i3) =>
(GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o)
-> i1 -> i2 -> i3 -> m (Merge o)
spawnMerge3 GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o
f i1
i1 i2
i2 i3
i3 = do
TVar (Versioned o)
output <- forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
(Versioned (GetOutput i1)
initial1, Versioned (GetOutput i2)
initial2, Versioned (GetOutput i3)
initial3) <- (,,)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i3
i3)
forall a. a -> STM (TVar a)
newTVar Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = forall {a} {a} {a}.
Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
initial1 Versioned (GetOutput i2)
initial2 Versioned (GetOutput i3)
initial3
, $sel:vData:Versioned :: o
vData = GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i1)
initial1) (forall a. Versioned a -> a
vData Versioned (GetOutput i2)
initial2) (forall a. Versioned a -> a
vData Versioned (GetOutput i3)
initial3)
}
ThreadId
worker <- forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
Versioned (GetOutput i1)
next1 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
Versioned (GetOutput i2)
next2 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
Versioned (GetOutput i3)
next3 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i3
i3)
Versioned o
old <- forall a. TVar a -> STM a
readTVar TVar (Versioned o)
output
let nextVersion :: Vector Word64
nextVersion = forall {a} {a} {a}.
Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
next1 Versioned (GetOutput i2)
next2 Versioned (GetOutput i3)
next3
if Vector Word64
nextVersion forall a. Ord a => a -> a -> Bool
> forall a. Versioned a -> Vector Word64
vVersion Versioned o
old then
forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned o)
output Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
, $sel:vData:Versioned :: o
vData = GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i1)
next1) (forall a. Versioned a -> a
vData Versioned (GetOutput i2)
next2) (forall a. Versioned a -> a
vData Versioned (GetOutput i3)
next3)
}
else
forall a. STM a
retrySTM
ReleaseKey
key <- forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
worker
pure Merge
{ $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
, $sel:mKey:Merge :: ReleaseKey
mKey = ReleaseKey
key
, $sel:mOutput:Merge :: TVar (Versioned o)
mOutput = TVar (Versioned o)
output
}
where
mkVersion :: Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned a
a Versioned a
b Versioned a
c = forall a. Monoid a => [a] -> a
mconcat [forall a. Versioned a -> Vector Word64
vVersion Versioned a
a, forall a. Versioned a -> Vector Word64
vVersion Versioned a
b, forall a. Versioned a -> Vector Word64
vVersion Versioned a
c]
spawnMerge4
:: ( MonadUnliftIO m
, MonadResource m
, HasOutput i1
, HasOutput i2
, HasOutput i3
, HasOutput i4
)
=> (GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o)
-> i1
-> i2
-> i3
-> i4
-> m (Merge o)
spawnMerge4 :: forall (m :: * -> *) i1 i2 i3 i4 o.
(MonadUnliftIO m, MonadResource m, HasOutput i1, HasOutput i2,
HasOutput i3, HasOutput i4) =>
(GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o)
-> i1 -> i2 -> i3 -> i4 -> m (Merge o)
spawnMerge4 GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o
f i1
i1 i2
i2 i3
i3 i4
i4 = do
TVar (Versioned o)
output <- forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
(Versioned (GetOutput i1)
initial1, Versioned (GetOutput i2)
initial2, Versioned (GetOutput i3)
initial3, Versioned (GetOutput i4)
initial4) <- (,,,)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i3
i3)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i4
i4)
forall a. a -> STM (TVar a)
newTVar Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = forall {a} {a} {a} {a}.
Versioned a
-> Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
initial1 Versioned (GetOutput i2)
initial2 Versioned (GetOutput i3)
initial3 Versioned (GetOutput i4)
initial4
, $sel:vData:Versioned :: o
vData = GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i1)
initial1) (forall a. Versioned a -> a
vData Versioned (GetOutput i2)
initial2) (forall a. Versioned a -> a
vData Versioned (GetOutput i3)
initial3) (forall a. Versioned a -> a
vData Versioned (GetOutput i4)
initial4)
}
ThreadId
worker <- forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
Versioned (GetOutput i1)
next1 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i1
i1)
Versioned (GetOutput i2)
next2 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i2
i2)
Versioned (GetOutput i3)
next3 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i3
i3)
Versioned (GetOutput i4)
next4 <- forall a. TVar a -> STM a
readTVar (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput i4
i4)
Versioned o
old <- forall a. TVar a -> STM a
readTVar TVar (Versioned o)
output
let nextVersion :: Vector Word64
nextVersion = forall {a} {a} {a} {a}.
Versioned a
-> Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned (GetOutput i1)
next1 Versioned (GetOutput i2)
next2 Versioned (GetOutput i3)
next3 Versioned (GetOutput i4)
next4
if Vector Word64
nextVersion forall a. Ord a => a -> a -> Bool
> forall a. Versioned a -> Vector Word64
vVersion Versioned o
old then
forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned o)
output Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
, $sel:vData:Versioned :: o
vData = GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o
f (forall a. Versioned a -> a
vData Versioned (GetOutput i1)
next1) (forall a. Versioned a -> a
vData Versioned (GetOutput i2)
next2) (forall a. Versioned a -> a
vData Versioned (GetOutput i3)
next3) (forall a. Versioned a -> a
vData Versioned (GetOutput i4)
next4)
}
else
forall a. STM a
retrySTM
ReleaseKey
key <- forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
worker
pure Merge
{ $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
, $sel:mKey:Merge :: ReleaseKey
mKey = ReleaseKey
key
, $sel:mOutput:Merge :: TVar (Versioned o)
mOutput = TVar (Versioned o)
output
}
where
mkVersion :: Versioned a
-> Versioned a -> Versioned a -> Versioned a -> Vector Word64
mkVersion Versioned a
a Versioned a
b Versioned a
c Versioned a
d = forall a. Monoid a => [a] -> a
mconcat [forall a. Versioned a -> Vector Word64
vVersion Versioned a
a, forall a. Versioned a -> Vector Word64
vVersion Versioned a
b, forall a. Versioned a -> Vector Word64
vVersion Versioned a
c, forall a. Versioned a -> Vector Word64
vVersion Versioned a
d]
spawnMergeT
:: ( Traversable t
, HasOutput input
, MonadUnliftIO m
, MonadResource m
)
=> (t (GetOutput input) -> output)
-> t input
-> m (Merge output)
spawnMergeT :: forall (t :: * -> *) input (m :: * -> *) output.
(Traversable t, HasOutput input, MonadUnliftIO m,
MonadResource m) =>
(t (GetOutput input) -> output) -> t input -> m (Merge output)
spawnMergeT t (GetOutput input) -> output
f t input
inputs = do
TVar (Versioned output)
output <- forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
t (Versioned (GetOutput input))
initial <- forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall a. TVar a -> STM a
readTVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. HasOutput a => a -> Var (GetOutput a)
getOutput) t input
inputs
forall a. a -> STM (TVar a)
newTVar Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap forall a. Versioned a -> Vector Word64
vVersion t (Versioned (GetOutput input))
initial
, $sel:vData:Versioned :: output
vData = t (GetOutput input) -> output
f (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData t (Versioned (GetOutput input))
initial)
}
ThreadId
worker <- forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
t (Versioned (GetOutput input))
next <- forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall a. TVar a -> STM a
readTVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. HasOutput a => a -> Var (GetOutput a)
getOutput) t input
inputs
Versioned output
old <- forall a. TVar a -> STM a
readTVar TVar (Versioned output)
output
let nextVersion :: Vector Word64
nextVersion = forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap forall a. Versioned a -> Vector Word64
vVersion t (Versioned (GetOutput input))
next
if Vector Word64
nextVersion forall a. Ord a => a -> a -> Bool
> forall a. Versioned a -> Vector Word64
vVersion Versioned output
old then
forall a. TVar a -> a -> STM ()
writeTVar TVar (Versioned output)
output Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = Vector Word64
nextVersion
, $sel:vData:Versioned :: output
vData = t (GetOutput input) -> output
f (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData t (Versioned (GetOutput input))
next)
}
else
forall a. STM a
retrySTM
ReleaseKey
key <- forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
Resource.register forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
worker
pure Merge
{ $sel:mWorker:Merge :: ThreadId
mWorker = ThreadId
worker
, $sel:mKey:Merge :: ReleaseKey
mKey = ReleaseKey
key
, $sel:mOutput:Merge :: TVar (Versioned output)
mOutput = TVar (Versioned output)
output
}
type ObserverIO a = IORef (Versioned a)
newObserverIO :: MonadIO m => a -> m (ObserverIO a)
newObserverIO :: forall (m :: * -> *) a. MonadIO m => a -> m (ObserverIO a)
newObserverIO a
initial = forall (m :: * -> *) a. MonadIO m => a -> m (IORef a)
newIORef Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a. Monoid a => a
mempty
, $sel:vData:Versioned :: a
vData = a
initial
}
observeIO
:: (MonadUnliftIO m, HasOutput output)
=> output
-> ObserverIO a
-> (a -> GetOutput output -> m a)
-> m a
observeIO :: forall (m :: * -> *) output a.
(MonadUnliftIO m, HasOutput output) =>
output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m a
observeIO output
output ObserverIO a
currentRef a -> GetOutput output -> m a
action = do
Versioned (GetOutput output)
outputV <- forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (forall a. HasOutput a => a -> Var (GetOutput a)
getOutput output
output)
Versioned a
currentV <- forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef ObserverIO a
currentRef
if forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput output)
outputV forall a. Ord a => a -> a -> Bool
> forall a. Versioned a -> Vector Word64
vVersion Versioned a
currentV then do
a
derived <- a -> GetOutput output -> m a
action (forall a. Versioned a -> a
vData Versioned a
currentV) (forall a. Versioned a -> a
vData Versioned (GetOutput output)
outputV)
forall (m :: * -> *) a. MonadIO m => IORef a -> a -> m ()
atomicWriteIORef ObserverIO a
currentRef Versioned
{ $sel:vVersion:Versioned :: Vector Word64
vVersion = forall a. Versioned a -> Vector Word64
vVersion Versioned (GetOutput output)
outputV
, $sel:vData:Versioned :: a
vData = a
derived
}
pure a
derived
else
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. Versioned a -> a
vData Versioned a
currentV)
observeIO_
:: (MonadUnliftIO m, HasOutput output)
=> output
-> ObserverIO a
-> (a -> GetOutput output -> m a)
-> m ()
observeIO_ :: forall (m :: * -> *) output a.
(MonadUnliftIO m, HasOutput output) =>
output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m ()
observeIO_ output
output ObserverIO a
currentRef =
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) output a.
(MonadUnliftIO m, HasOutput output) =>
output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m a
observeIO output
output ObserverIO a
currentRef
{-# INLINEABLE readObservedIO #-}
readObservedIO :: (MonadUnliftIO m) => IORef (Versioned a) -> m a
readObservedIO :: forall (m :: * -> *) a.
MonadUnliftIO m =>
IORef (Versioned a) -> m a
readObservedIO = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Versioned a -> a
vData forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef
newtype Source a = Source (UnagiPrim.InChan a)
newSource :: (MonadUnliftIO m, UnagiPrim a) => m (Source a)
newSource :: forall (m :: * -> *) a.
(MonadUnliftIO m, UnagiPrim a) =>
m (Source a)
newSource = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. InChan a -> Source a
Source forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a. UnagiPrim a => IO (InChan a, OutChan a)
UnagiPrim.newChan
pubSource :: (MonadUnliftIO m, UnagiPrim a) => Source a -> a -> m ()
pubSource :: forall (m :: * -> *) a.
(MonadUnliftIO m, UnagiPrim a) =>
Source a -> a -> m ()
pubSource (Source InChan a
ic) = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. UnagiPrim a => InChan a -> a -> IO ()
UnagiPrim.writeChan InChan a
ic
subSource :: MonadUnliftIO m => Source a -> m (UnagiPrim.OutChan a)
subSource :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Source a -> m (OutChan a)
subSource (Source InChan a
ic) = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. InChan a -> IO (OutChan a)
UnagiPrim.dupChan InChan a
ic