module Engine.Worker ( Versioned(..) , Var , newVar , readVar , stateVar , stateVarMap , HasInput(..) , pushInput , pushInputSTM , updateInput , updateInputSTM , getInputData , getInputDataSTM , HasConfig(..) , modifyConfig , modifyConfigSTM , HasOutput(..) , pushOutput , pushOutputSTM , updateOutput , updateOutputSTM , getOutputData , getOutputDataSTM , Cell(..) , spawnCell , Timed(..) , spawnTimed , spawnTimed_ , Merge(..) , spawnMerge1 , spawnMerge2 , spawnMerge3 , spawnMerge4 , ObserverIO , newObserverIO , observeIO , observeIO_ , readObservedIO , Source , newSource , pubSource , subSource , HasWorker(..) , register , registerCollection , registered , registeredCollection ) where import RIO import Control.Concurrent.Chan.Unagi.Unboxed (UnagiPrim) import Control.Concurrent.Chan.Unagi.Unboxed qualified as UnagiPrim import Data.Vector.Unboxed qualified as Unboxed import UnliftIO.Concurrent (forkIO, killThread) import UnliftIO.Resource (MonadResource) import UnliftIO.Resource qualified as Resource import Data.StateVar qualified as StateVar data Versioned a = Versioned { vVersion :: Unboxed.Vector Word64 , vData :: a } deriving (Show, Functor, Foldable, Traversable) instance Eq (Versioned a) where a == b = vVersion a == vVersion b instance Ord (Versioned a) where compare a b = compare (vVersion a) (vVersion b) type Var a = TVar (Versioned a) newVar :: MonadUnliftIO m => a -> m (Var a) newVar initial = newTVarIO Versioned { vVersion = Unboxed.singleton 0 , vData = initial } stateVar :: HasInput var => var -> StateVar.StateVar (GetInput var) stateVar var = StateVar.makeStateVar (getInputData var) (\new -> pushInput var \_old -> new) stateVarMap :: HasInput var => (GetInput var -> a) -> (a -> GetInput var -> GetInput var) -> var -> StateVar.StateVar a stateVarMap mapGet mapSet var = StateVar.makeStateVar (fmap mapGet $ getInputData var) (\new -> pushInput var (mapSet new)) {-# INLINEABLE readVar #-} readVar :: (MonadUnliftIO m) => Var a -> m a readVar = fmap vData . readTVarIO {-# INLINEABLE pushVarSTM #-} pushVarSTM :: Var a -> (a -> a) -> STM () pushVarSTM var f = modifyTVar' var \Versioned{vVersion, vData} -> Versioned { vVersion = Unboxed.map (+ 1) vVersion , vData = f vData } {-# INLINEABLE updateVarSTM #-} updateVarSTM :: Var a -> (a -> Maybe a) -> STM () updateVarSTM var f = do current <- readTVar var let oldData = vData current case f oldData of Nothing -> pure () Just newData -> writeTVar var Versioned { vVersion = Unboxed.map (+1) (vVersion current) , vData = newData } class HasInput a where type GetInput a getInput :: a -> Var (GetInput a) instance HasInput (TVar (Versioned a)) where type GetInput (TVar (Versioned a)) = a getInput = id {-# INLINEABLE pushInput #-} pushInput :: (MonadIO m, HasInput var) => var -> (GetInput var -> GetInput var) -> m () pushInput input = atomically . pushInputSTM input {-# INLINEABLE pushInputSTM #-} pushInputSTM :: HasInput var => var -> (GetInput var -> GetInput var) -> STM () pushInputSTM input = pushVarSTM (getInput input) {-# INLINEABLE updateInput #-} updateInput :: ( MonadIO m , HasInput var ) => var -> (GetInput var -> Maybe (GetInput var)) -> m () updateInput input = atomically . updateInputSTM input updateInputSTM :: HasInput var => var -> (GetInput var -> Maybe (GetInput var)) -> STM () updateInputSTM input = updateVarSTM (getInput input) {-# INLINEABLE getInputData #-} getInputData :: (HasInput worker, MonadIO m) => worker -> m (GetInput worker) getInputData = fmap vData . readTVarIO . getInput {-# INLINEABLE getInputDataSTM #-} getInputDataSTM :: (HasInput worker) => worker -> STM (GetInput worker) getInputDataSTM = fmap vData . readTVar . getInput class HasConfig a where type GetConfig a getConfig :: a -> TVar (GetConfig a) instance HasConfig (TVar a) where type GetConfig (TVar a) = a getConfig = id modifyConfig :: (MonadIO m, HasConfig var) => var -> (GetConfig var -> GetConfig var) -> m () modifyConfig config = atomically . modifyConfigSTM config modifyConfigSTM :: HasConfig var => var -> (GetConfig var -> GetConfig var) -> STM () modifyConfigSTM config = modifyTVar' (getConfig config) class HasOutput a where type GetOutput a getOutput :: a -> Var (GetOutput a) instance HasOutput (TVar (Versioned a)) where type GetOutput (TVar (Versioned a)) = a getOutput = id {-# INLINEABLE pushOutput #-} pushOutput :: (MonadIO m, HasOutput var) => var -> (GetOutput var -> GetOutput var) -> m () pushOutput output = atomically . pushOutputSTM output pushOutputSTM :: HasOutput var => var -> (GetOutput var -> GetOutput var) -> STM () pushOutputSTM output f = modifyTVar' (getOutput output) \Versioned{vVersion, vData} -> Versioned { vVersion = Unboxed.map (+1) vVersion , vData = f vData } {-# INLINEABLE updateOutput #-} updateOutput :: (MonadIO m, HasOutput var) => var -> (GetOutput var -> Maybe (GetOutput var)) -> m () updateOutput output = atomically . updateOutputSTM output updateOutputSTM :: HasOutput var => var -> (GetOutput var -> Maybe (GetOutput var)) -> STM () updateOutputSTM output f = do current <- readTVar outputVar let oldData = vData current case f oldData of Nothing -> pure () Just newData -> writeTVar outputVar Versioned { vVersion = Unboxed.map (+1) (vVersion current) , vData = newData } where outputVar = getOutput output {-# INLINEABLE getOutputData #-} getOutputData :: (HasOutput worker, MonadIO m) => worker -> m (GetOutput worker) getOutputData = fmap vData . readTVarIO . getOutput {-# INLINEABLE getOutputDataSTM #-} getOutputDataSTM :: (HasOutput worker) => worker -> STM (GetOutput worker) getOutputDataSTM = fmap vData . readTVar . getOutput -- * Suppply -- | Updatable cell for composite input or costly output. data Cell input output = Cell { cWorker :: ThreadId , cInput :: Var input , cOutput :: Var output } instance HasInput (Cell i o) where type GetInput (Cell i o) = i getInput = cInput instance HasOutput (Cell i o) where type GetOutput (Cell i o) = o getOutput = cOutput spawnCell :: MonadUnliftIO m => (input -> output) -> input -> m (Cell input output) spawnCell f initial = do input <- newVar initial output <- newVar (f initial) worker <- forkIO $ forever $ atomically do next <- readTVar input old <- readTVar output if Unboxed.head (vVersion next) > Unboxed.head (vVersion old) then writeTVar output Versioned { vVersion = vVersion next , vData = f (vData next) } else retrySTM pure Cell { cWorker = worker , cInput = input , cOutput = output } -- | Timer-driven stateful producer. data Timed config output = Timed { tWorker :: ThreadId , tActive :: TVar Bool , tConfig :: TVar config , tOutput :: Var output } instance HasConfig (Timed config output) where type GetConfig (Timed config output) = config getConfig = tConfig instance HasOutput (Timed config output) where type GetOutput (Timed config output) = output getOutput = tOutput spawnTimed_ :: MonadUnliftIO m => Bool -> Int -> output -> m output -> m (Timed () output) spawnTimed_ startActive dt initialOutput stepF = spawnTimed startActive (Left dt) (\() -> pure (initialOutput, ())) (\_old () -> stepF >>= \res -> pure (Just res, ()) ) () spawnTimed :: MonadUnliftIO m => Bool -> Either Int (config -> Int) -> (config -> m (output, state)) -> (state -> config -> m (Maybe output, state)) -> config -> m (Timed config output) spawnTimed startActive dtF initF stepF initialConfig = do tActive <- newTVarIO startActive tConfig <- newTVarIO initialConfig (initialOutput, initialState) <- initF initialConfig tOutput <- newVar initialOutput tWorker <- forkIO $ step tActive tConfig tOutput initialState pure Timed{..} where step activeVar configVar output curState = do case dtF of Left static -> threadDelay static Right fromConfig -> readTVarIO configVar >>= threadDelay . fromConfig active <- readTVarIO activeVar if active then do config <- readTVarIO configVar (nextOutput, nextState) <- stepF curState config updateOutput output $ const nextOutput step activeVar configVar output nextState else step activeVar configVar output curState -- | Supply-driven step cell. data Merge o = Merge { mWorker :: ThreadId , mOutput :: TVar (Versioned o) } instance HasOutput (Merge o) where type GetOutput (Merge o) = o getOutput = mOutput spawnMerge1 :: ( MonadUnliftIO m , HasOutput i ) => (GetOutput i -> o) -> i -> m (Merge o) spawnMerge1 f i = do output <- atomically do initial <- readTVar (getOutput i) newTVar Versioned { vVersion = vVersion initial , vData = f (vData initial) } worker <- forkIO $ forever $ atomically do next <- readTVar (getOutput i) old <- readTVar output let nextVersion = vVersion next if nextVersion > vVersion old then writeTVar output Versioned { vVersion = nextVersion , vData = f (vData next) } else retrySTM pure Merge { mWorker = worker , mOutput = output } spawnMerge2 :: ( MonadUnliftIO m , HasOutput i1 , HasOutput i2 ) => (GetOutput i1 -> GetOutput i2 -> o) -> i1 -> i2 -> m (Merge o) spawnMerge2 f i1 i2 = do output <- atomically do (initial1, initial2) <- (,) <$> readTVar (getOutput i1) <*> readTVar (getOutput i2) newTVar Versioned { vVersion = mkVersion initial1 initial2 , vData = f (vData initial1) (vData initial2) } worker <- forkIO $ forever $ atomically $ do next1 <- readTVar (getOutput i1) next2 <- readTVar (getOutput i2) old <- readTVar output let nextVersion = mkVersion next1 next2 if nextVersion > vVersion old then writeTVar output Versioned { vVersion = nextVersion , vData = f (vData next1) (vData next2) } else retrySTM pure Merge { mWorker = worker , mOutput = output } where mkVersion a b = vVersion a <> vVersion b spawnMerge3 :: ( MonadUnliftIO m , HasOutput i1 , HasOutput i2 , HasOutput i3 ) => (GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> o) -> i1 -> i2 -> i3 -> m (Merge o) spawnMerge3 f i1 i2 i3 = do output <- atomically do (initial1, initial2, initial3) <- (,,) <$> readTVar (getOutput i1) <*> readTVar (getOutput i2) <*> readTVar (getOutput i3) newTVar Versioned { vVersion = mkVersion initial1 initial2 initial3 , vData = f (vData initial1) (vData initial2) (vData initial3) } worker <- forkIO $ forever $ atomically $ do next1 <- readTVar (getOutput i1) next2 <- readTVar (getOutput i2) next3 <- readTVar (getOutput i3) old <- readTVar output let nextVersion = mkVersion next1 next2 next3 if nextVersion > vVersion old then writeTVar output Versioned { vVersion = nextVersion , vData = f (vData next1) (vData next2) (vData next3) } else retrySTM pure Merge { mWorker = worker , mOutput = output } where mkVersion a b c = mconcat [vVersion a, vVersion b, vVersion c] spawnMerge4 :: ( MonadUnliftIO m , HasOutput i1 , HasOutput i2 , HasOutput i3 , HasOutput i4 ) => (GetOutput i1 -> GetOutput i2 -> GetOutput i3 -> GetOutput i4 -> o) -> i1 -> i2 -> i3 -> i4 -> m (Merge o) spawnMerge4 f i1 i2 i3 i4 = do output <- atomically do (initial1, initial2, initial3, initial4) <- (,,,) <$> readTVar (getOutput i1) <*> readTVar (getOutput i2) <*> readTVar (getOutput i3) <*> readTVar (getOutput i4) newTVar Versioned { vVersion = mkVersion initial1 initial2 initial3 initial4 , vData = f (vData initial1) (vData initial2) (vData initial3) (vData initial4) } worker <- forkIO $ forever $ atomically $ do next1 <- readTVar (getOutput i1) next2 <- readTVar (getOutput i2) next3 <- readTVar (getOutput i3) next4 <- readTVar (getOutput i4) old <- readTVar output let nextVersion = mkVersion next1 next2 next3 next4 if nextVersion > vVersion old then writeTVar output Versioned { vVersion = nextVersion , vData = f (vData next1) (vData next2) (vData next3) (vData next4) } else retrySTM pure Merge { mWorker = worker , mOutput = output } where mkVersion a b c d = mconcat [vVersion a, vVersion b, vVersion c, vVersion d] -- * Demand type ObserverIO a = IORef (Versioned a) newObserverIO :: MonadIO m => a -> m (ObserverIO a) newObserverIO initial = newIORef Versioned { vVersion = mempty , vData = initial } observeIO :: (MonadUnliftIO m, HasOutput output) => output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m a observeIO output currentRef action = do outputV <- readTVarIO (getOutput output) currentV <- readIORef currentRef if vVersion outputV > vVersion currentV then do derived <- action (vData currentV) (vData outputV) atomicWriteIORef currentRef Versioned { vVersion = vVersion outputV , vData = derived } pure derived else pure (vData currentV) observeIO_ :: (MonadUnliftIO m, HasOutput output) => output -> ObserverIO a -> (a -> GetOutput output -> m a) -> m () observeIO_ output currentRef = void . observeIO output currentRef {-# INLINEABLE readObservedIO #-} readObservedIO :: (MonadUnliftIO m) => IORef (Versioned a) -> m a readObservedIO = fmap vData . readIORef -- * PubSub newtype Source a = Source (UnagiPrim.InChan a) newSource :: (MonadUnliftIO m, UnagiPrim a) => m (Source a) newSource = fmap (Source . fst) $ liftIO UnagiPrim.newChan pubSource :: (MonadUnliftIO m, UnagiPrim a) => Source a -> a -> m () pubSource (Source ic) = liftIO . UnagiPrim.writeChan ic subSource :: MonadUnliftIO m => Source a -> m (UnagiPrim.OutChan a) subSource (Source ic) = liftIO $ UnagiPrim.dupChan ic -- * Utils class HasWorker a where getWorker :: a -> ThreadId instance HasWorker (Cell i o) where getWorker = cWorker instance HasWorker (Timed c o) where getWorker = tWorker instance HasWorker (Merge o) where getWorker = mWorker register :: ( MonadResource m , HasWorker process ) => process -> m Resource.ReleaseKey register = Resource.register . killThread . getWorker registerCollection :: ( MonadResource m , HasWorker process , Foldable t ) => t process -> m Resource.ReleaseKey registerCollection = Resource.register . traverse_ (killThread . getWorker) registered :: (MonadResource m, HasWorker a) => m a -> m (Resource.ReleaseKey, a) registered spawn = do worker <- spawn key <- Resource.register $ killThread (getWorker worker) pure (key, worker) registeredCollection :: ( MonadResource m , HasWorker process , Traversable t ) => (input -> m process) -> t input -> m (Resource.ReleaseKey, t process) registeredCollection spawn inputs = do workers <- traverse spawn inputs key <- Resource.register $ traverse_ (killThread . getWorker) workers pure (key, workers)