module Engine.Worker ( Versioned(..) , Var , newVar , readVar , stateVar , stateVarMap , HasInput(..) , pushInput , pushInputSTM , updateInput , updateInputSTM , getInputData , getInputDataSTM , HasConfig(..) , modifyConfig , modifyConfigSTM , HasOutput(..) , pushOutput , pushOutputSTM , updateOutput , updateOutputSTM , getOutputData , getOutputDataSTM , Cell , spawnCell , Timed(..) , spawnTimed , spawnTimed_ , Merge(..) , spawnMerge1 , spawnMerge2 , spawnMerge3 , spawnMerge4 , spawnMergeT , ObserverIO , newObserverIO , observeIO , observeIO_ , readObservedIO , Source , newSource , pubSource , subSource ) where import RIO import Control.Concurrent.Chan.Unagi.Unboxed (UnagiPrim) import Control.Concurrent.Chan.Unagi.Unboxed qualified as UnagiPrim import Data.Vector.Unboxed qualified as Unboxed import UnliftIO.Concurrent (forkIO, killThread) import UnliftIO.Resource (MonadResource) import UnliftIO.Resource qualified as Resource import Data.StateVar qualified as StateVar data Versioned a = Versioned { vVersion :: Unboxed.Vector Word64 , vData :: a } deriving (Show, Functor, Foldable, Traversable) instance Eq (Versioned a) where a == b = vVersion a == vVersion b instance Ord (Versioned a) where compare a b = compare (vVersion a) (vVersion b) type Var a = TVar (Versioned a) newVar :: MonadUnliftIO m => a -> m (Var a) newVar initial = newTVarIO Versioned { vVersion = Unboxed.singleton 0 , vData = initial } stateVar :: HasInput var => var -> StateVar.StateVar (GetInput var) stateVar var = StateVar.makeStateVar (getInputData var) (\new -> pushInput var \_old -> new) stateVarMap :: HasInput var => (GetInput var -> a) -> (a -> GetInput var -> GetInput var) -> var -> StateVar.StateVar a stateVarMap mapGet mapSet var = StateVar.makeStateVar (fmap mapGet $ getInputData var) (\new -> pushInput var (mapSet new)) {-# INLINEABLE readVar #-} readVar :: (MonadUnliftIO m) => Var a -> m a readVar = fmap vData . readTVarIO {-# INLINEABLE pushVarSTM #-} pushVarSTM :: Var a -> (a -> a) -> STM () pushVarSTM var f = modifyTVar' var \Versioned{vVersion, vData} -> Versioned { vVersion = Unboxed.map (+ 1) vVersion , vData = f vData } {-# INLINEABLE updateVarSTM #-} updateVarSTM :: Var a -> (a -> Maybe a) -> STM () updateVarSTM var f = do current <- readTVar var let oldData = vData current case f oldData of Nothing -> pure () Just newData -> writeTVar var Versioned { vVersion = Unboxed.map (+1) (vVersion current) , vData = newData } class HasInput a where type GetInput a getInput :: a -> Var (GetInput a) instance HasInput (Var a) where type GetInput (Var a) = a getInput = id instance HasInput a => HasInput (a, b) where type GetInput (a, b) = GetInput a getInput = getInput . fst {-# INLINEABLE pushInput #-} pushInput :: (MonadIO m, HasInput var) => var -> (GetInput var -> GetInput var) -> m () pushInput input = atomically . pushInputSTM input {-# INLINEABLE pushInputSTM #-} pushInputSTM :: HasInput var => var -> (GetInput var -> GetInput var) -> STM () pushInputSTM input = pushVarSTM (getInput input) {-# INLINEABLE updateInput #-} updateInput :: ( MonadIO m , HasInput var ) => var -> (GetInput var -> Maybe (GetInput var)) -> m () updateInput input = atomically . updateInputSTM input updateInputSTM :: HasInput var => var -> (GetInput var -> Maybe (GetInput var)) -> STM () updateInputSTM input = updateVarSTM (getInput input) {-# INLINEABLE getInputData #-} getInputData :: (HasInput worker, MonadIO m) => worker -> m (GetInput worker) getInputData = fmap vData . readTVarIO . getInput {-# INLINEABLE getInputDataSTM #-} getInputDataSTM :: (HasInput worker) => worker -> STM (GetInput worker) getInputDataSTM = fmap vData . readTVar . getInput class HasConfig a where type GetConfig a getConfig :: a -> TVar (GetConfig a) instance HasConfig (TVar a) where type GetConfig (TVar a) = a getConfig = id modifyConfig :: (MonadIO m, HasConfig var) => var -> (GetConfig var -> GetConfig var) -> m () modifyConfig config = atomically . modifyConfigSTM config modifyConfigSTM :: HasConfig var => var -> (GetConfig var -> GetConfig var) -> STM () modifyConfigSTM config = modifyTVar' (getConfig config) class HasOutput a where type GetOutput a getOutput :: a -> Var (GetOutput a) instance HasOutput (Var a) where type GetOutput (Var a) = a getOutput = id instance HasOutput b => HasOutput (a, b) where type GetOutput (a, b) = GetOutput b getOutput = getOutput . snd {-# INLINEABLE pushOutput #-} pushOutput :: (MonadIO m, HasOutput var) => var -> (GetOutput var -> GetOutput var) -> m () pushOutput output = atomically . pushOutputSTM output pushOutputSTM :: HasOutput var => var -> (GetOutput var -> GetOutput var) -> STM () pushOutputSTM output f = modifyTVar' (getOutput output) \Versioned{vVersion, vData} -> Versioned { vVersion = Unboxed.map (+1) vVersion , vData = f vData } {-# INLINEABLE updateOutput #-} updateOutput :: (MonadIO m, HasOutput var) => var -> (GetOutput var -> Maybe (GetOutput var)) -> m () updateOutput output = atomically . updateOutputSTM output updateOutputSTM :: HasOutput var => var -> (GetOutput var -> Maybe (GetOutput var)) -> STM () updateOutputSTM output f = do current <- readTVar outputVar let oldData = vData current case f oldData of Nothing -> pure () Just newData -> writeTVar outputVar Versioned { vVersion = Unboxed.map (+1) (vVersion current) , vData = newData } where outputVar = getOutput output {-# INLINEABLE getOutputData #-} getOutputData :: (HasOutput worker, MonadIO m) => worker -> m (GetOutput worker) getOutputData = fmap vData . readTVarIO . getOutput {-# INLINEABLE getOutputDataSTM #-} getOutputDataSTM :: (HasOutput worker) => worker -> STM (GetOutput worker) getOutputDataSTM = fmap vData . readTVar . getOutput -- * Suppply -- | Updatable cell for composite input or costly output. type Cell input output = (Var input, Merge output) spawnCell :: ( MonadUnliftIO m , MonadResource m ) => (input -> output) -> input -> m (Cell input output) spawnCell f initialInput = do input <- newVar initialInput fmap (input,) $ spawnMerge1 f input -- | Timer-driven stateful producer. data Timed config output = Timed { tWorker :: ThreadId , tKey :: Resource.ReleaseKey , tActive :: TVar Bool , tConfig :: TVar config , tOutput :: Var output } instance HasConfig (Timed config output) where type GetConfig (Timed config output) = config getConfig = tConfig instance HasOutput (Timed config output) where type GetOutput (Timed config output) = output getOutput = tOutput spawnTimed_ :: ( MonadUnliftIO m , MonadResource m ) => Bool -> Int -> output -> m output -> m (Timed () output) spawnTimed_ startActive dt initialOutput stepF = spawnTimed startActive (Left dt) (\() -> pure (initialOutput, ())) (\_old () -> stepF >>= \res -> pure (Just res, ()) ) () spawnTimed :: ( MonadUnliftIO m , MonadResource m ) => Bool -> Either Int (config -> Int) -> (config -> m (output, state)) -> (state -> config -> m (Maybe output, state)) -> config -> m (Timed config output) spawnTimed startActive dtF initF stepF initialConfig = do tActive <- newTVarIO startActive tConfig <- newTVarIO initialConfig (initialOutput, initialState) <- initF initialConfig tOutput <- newVar initialOutput tWorker <- forkIO $ step tActive tConfig tOutput initialState tKey <- Resource.register $ killThread tWorker pure Timed{..} where step activeVar configVar output curState = do case dtF of Left static -> threadDelay static Right fromConfig -> readTVarIO configVar >>= threadDelay . fromConfig active <- readTVarIO activeVar if active then do config <- readTVarIO configVar (nextOutput, nextState) <- stepF curState config updateOutput output $ const nextOutput step activeVar configVar output nextState else step activeVar configVar output curState -- | Supply-driven step cell. data Merge o = Merge { mWorker :: ThreadId , mKey :: Resource.ReleaseKey , mOutput :: TVar (Versioned o) } instance HasOutput (Merge o) where type GetOutput (Merge o) = o getOutput = mOutput spawnMerge1 :: ( MonadUnliftIO m , MonadResource m , HasOutput i ) => (GetOutput i -> o) -> i -> m (Merge o) spawnMerge1 f i = do output <- atomically do initial <- readTVar (getOutput i) newTVar Versioned { vVersion = vVersion initial , vData = f (vData initial) } worker <- forkIO $ forever $ atomically do next <- readTVar (getOutput i) old <- readTVar output let nextVersion = vVersion next if nextVersion > vVersion old then writeTVar output Versioned { vVersion = nextVersion , vData = f (vData next) } else retrySTM key <- Resource.register $ killThread worker pure Merge { mWorker = worker , mKey = key , mOutput = output } spawnMerge2 :: ( MonadUnliftIO m , MonadResource m , HasOutput i1 , HasOutput i2 ) => (GetOutput i1 -> GetOutput i2 -> o) -> i1 -> i2 -> m (Merge o) spawnMerge2 f i1 i2 = do output <- atomically do (initial1, initial2) <- (,) <$> readTVar (getOutput i1) <*> readTVar (getOutput i2) newTVar Versioned { vVersion = mkVersion initial1 initial2 , vData = f (vData initial1) (vData initial2) } worker <- forkIO $ forever $ atomically $ do next1 <- readTVar (getOutput i1) next2 <- readTVar (getOutput i2) old <- readTVar output let nextVersion = mkVersion next1 next2 if nextVersion > vVersion old then writeTVar output Versioned { vVersion = nextVersion , vData = f (vData next1) (vData next2) } else retrySTM key <- Resource.register $ killThread worker pure Merge { mWorker = worker , mKey = key , mOutput = output } where mkVersion a b = vVersion a <> vVersion b spawnMerge3 :: ( MonadUnliftIO m , MonadResource m , HasOutput i1 , HasOutput i2 , HasOutput i3 ) => (GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o) -> i1 -> i2 -> i3 -> m (Merge o) spawnMerge3 f i1 i2 i3 = do output <- atomically do (initial1, initial2, initial3) <- (,,) <$> readTVar (getOutput i1) <*> readTVar (getOutput i2) <*> readTVar (getOutput i3) newTVar Versioned { vVersion = mkVersion initial1 initial2 initial3 , vData = f (vData initial1) (vData initial2) (vData initial3) } worker <- forkIO $ forever $ atomically $ do next1 <- readTVar (getOutput i1) next2 <- readTVar (getOutput i2) next3 <- readTVar (getOutput i3) old <- readTVar output let nextVersion = mkVersion next1 next2 next3 if nextVersion > vVersion old then writeTVar output Versioned { vVersion = nextVersion , vData = f (vData next1) (vData next2) (vData next3) } else retrySTM key <- Resource.register $ killThread worker pure Merge { mWorker = worker , mKey = key , mOutput = output } where mkVersion a b c = mconcat [vVersion a, vVersion b, vVersion c] spawnMerge4 :: ( MonadUnliftIO m , MonadResource m , HasOutput i1 , HasOutput i2 , HasOutput i3 , HasOutput i4 ) => (GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o) -> i1 -> i2 -> i3 -> i4 -> m (Merge o) spawnMerge4 f i1 i2 i3 i4 = do output <- atomically do (initial1, initial2, initial3, initial4) <- (,,,) <$> readTVar (getOutput i1) <*> readTVar (getOutput i2) <*> readTVar (getOutput i3) <*> readTVar (getOutput i4) newTVar Versioned { vVersion = mkVersion initial1 initial2 initial3 initial4 , vData = f (vData initial1) (vData initial2) (vData initial3) (vData initial4) } worker <- forkIO $ forever $ atomically $ do next1 <- readTVar (getOutput i1) next2 <- readTVar (getOutput i2) next3 <- readTVar (getOutput i3) next4 <- readTVar (getOutput i4) old <- readTVar output let nextVersion = mkVersion next1 next2 next3 next4 if nextVersion > vVersion old then writeTVar output Versioned { vVersion = nextVersion , vData = f (vData next1) (vData next2) (vData next3) (vData next4) } else retrySTM key <- Resource.register $ killThread worker pure Merge { mWorker = worker , mKey = key , mOutput = output } where mkVersion a b c d = mconcat [vVersion a, vVersion b, vVersion c, vVersion d] {- | Spawn a merge over a homogeneous traversable collection of processes. A merging function will receive a collection of outputs to summarize. -} spawnMergeT :: ( Traversable t , HasOutput input , MonadUnliftIO m , MonadResource m ) => (t (GetOutput input) -> output) -> t input -> m (Merge output) spawnMergeT f inputs = do output <- atomically do initial <- traverse (readTVar . getOutput) inputs newTVar Versioned { vVersion = foldMap vVersion initial , vData = f (fmap vData initial) } worker <- forkIO $ forever $ atomically do next <- traverse (readTVar . getOutput) inputs old <- readTVar output let nextVersion = foldMap vVersion next if nextVersion > vVersion old then writeTVar output Versioned { vVersion = nextVersion , vData = f (fmap vData next) } else retrySTM key <- Resource.register $ killThread worker pure Merge { mWorker = worker , mKey = key , mOutput = output } -- * Demand type ObserverIO a = IORef (Versioned a) newObserverIO :: MonadIO m => a -> m (ObserverIO a) newObserverIO initial = newIORef Versioned { vVersion = mempty , vData = initial } observeIO :: (MonadUnliftIO m, HasOutput output) => output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m a observeIO output currentRef action = do outputV <- readTVarIO (getOutput output) currentV <- readIORef currentRef if vVersion outputV > vVersion currentV then do derived <- action (vData currentV) (vData outputV) atomicWriteIORef currentRef Versioned { vVersion = vVersion outputV , vData = derived } pure derived else pure (vData currentV) observeIO_ :: (MonadUnliftIO m, HasOutput output) => output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m () observeIO_ output currentRef = void . observeIO output currentRef {-# INLINEABLE readObservedIO #-} readObservedIO :: (MonadUnliftIO m) => IORef (Versioned a) -> m a readObservedIO = fmap vData . readIORef -- * PubSub newtype Source a = Source (UnagiPrim.InChan a) newSource :: (MonadUnliftIO m, UnagiPrim a) => m (Source a) newSource = fmap (Source . fst) $ liftIO UnagiPrim.newChan pubSource :: (MonadUnliftIO m, UnagiPrim a) => Source a -> a -> m () pubSource (Source ic) = liftIO . UnagiPrim.writeChan ic subSource :: MonadUnliftIO m => Source a -> m (UnagiPrim.OutChan a) subSource (Source ic) = liftIO $ UnagiPrim.dupChan ic