{-# LANGUAGE ScopedTypeVariables, BangPatterns #-} -- |Utility functions for working with 'Topic's. These functions are -- primarily combinators for fusing two 'Topic's in various ways. module Ros.Topic.Util where import Prelude hiding (dropWhile, filter, splitAt, mapM) import Control.Applicative import Control.Arrow ((***), second) import Control.Concurrent hiding (yield) import Control.Concurrent.STM import Control.Monad ((<=<), when, replicateM) import Control.Monad.IO.Class import Data.AdditiveGroup (AdditiveGroup, (^+^), (^-^), Sum(..)) import Data.Monoid (Monoid) import Data.Sequence ((|>), viewl, ViewL(..)) import qualified Data.Sequence as S import qualified Data.Foldable as F import Ros.Rate (rateLimiter) import Ros.Topic hiding (mapM_) -- |Produce an infinite list from a 'Topic'. toList :: Topic IO a -> IO [a] toList t0 = do c <- newChan let feed t = do (x, t') <- runTopic t writeChan c x feed t' _ <- forkIO $ feed t0 getChanContents c -- |Produce a 'Topic' from an infinite list. fromList :: Monad m => [a] -> Topic m a fromList (x:xs) = Topic $ return (x, fromList xs) fromList [] = error "Ran out of list elements" -- |Tee a 'Topic' into two duplicate 'Topic's. Each returned 'Topic' -- will receive all the values of the original 'Topic' while any -- side-effect produced by each step of the original 'Topic' will -- occur only once. -- -- This version of @tee@ lazily pulls data from the original 'Topic' -- when it is first required by a consumer of either of the returned -- 'Topic's. This behavior is crucial when lazily consuming the data -- stream is preferred. For instance, using 'interruptible' with 'tee' -- will allow for a chunk of data to be abandoned before being fully -- consumed as long as neither consumer has forced its way too far -- down the stream. -- -- This function is useful when two consumers must see all the same -- elements from a 'Topic'. If the 'Topic' was instead 'share'd, then -- one consumer might get the first value from the 'Topic' before the -- second consumer's buffer is created since buffer creation is lazy. tee :: Topic IO a -> IO (Topic IO a, Topic IO a) tee t0 = do c1 <- newTChanIO c2 <- newTChanIO signal <- newTVarIO True let feed c = do atomically $ do f <- isEmptyTChan c when f (writeTVar signal False) atomically $ readTChan c produce t = do atomically $ readTVar signal >>= flip when retry (x,t') <- runTopic t atomically $ writeTChan c1 x >> writeTChan c2 x >> writeTVar signal True produce t' _ <- forkIO $ produce t0 return (repeatM (feed c1), repeatM (feed c2)) -- |This version of @tee@ eagerly pulls data from the -- original 'Topic' as soon as it is available. This behavior is -- undesirable when lazily consuming the data stream is preferred. For -- instance, using 'interruptible' with 'teeEager' will likely not -- work well. However, 'teeEager' may have slightly better performance -- than 'tee'. teeEager :: Topic IO a -> IO (Topic IO a, Topic IO a) teeEager t = do c1 <- newChan c2 <- newChan let feed c = do x <- readChan c return (x, Topic $ feed c) _ <- forkIO . forever . join $ (\x -> writeChan c1 x >> writeChan c2 x) <$> t return (Topic $ feed c1, Topic $ feed c2) -- |Fan out one 'Topic' out to a number of duplicate 'Topic's, each of -- which will produce the same values. Side effects caused by the -- original 'Topic''s production will occur only once. This is useful -- when a known number of consumers must see exactly all the same -- elements. fan :: Int -> Topic IO a -> IO [Topic IO a] fan n t0 = do cs <- replicateM n newTChanIO signal <- newTVarIO True let feed c = do atomically $ do f <- isEmptyTChan c when f (writeTVar signal False) atomically $ readTChan c produce t = do atomically $ readTVar signal >>= flip when retry (x,t') <- runTopic t atomically $ mapM_ (flip writeTChan x) cs >> writeTVar signal True produce t' _ <- forkIO $ produce t0 return $ map (repeatM . feed) cs -- |Make a 'Topic' shareable among multiple consumers. Each consumer -- of a Topic gets its own read buffer automatically as soon as it -- starts pulling items from the Topic. Without calling one of -- 'share', 'tee', or 'fan' on a Topic, the Topic's values will be -- split among all consumers (e.g. consumer /A/ gets half the values -- produced by the 'Topic', while consumer /B/ gets the other half -- with some unpredictable interleaving). Note that Topics returned by -- the @Ros.Node.subscribe@ are already shared. share :: Topic IO a -> IO (Topic IO a) share t0 = do cs <- newTVarIO [] -- A list for the individual client buffers signal <- newTVarIO True let addClient = atomically $ do cs0 <- readTVar cs c <- newTChan writeTVar cs (c:cs0) return c feed c = do atomically $ do f <- isEmptyTChan c when f (writeTVar signal False) atomically $ readTChan c produce t = do atomically $ readTVar signal >>= flip when retry (x,t') <- runTopic t atomically $ do cs' <- readTVar cs mapM_ (flip writeTChan x) cs' writeTVar signal True produce t' _ <- forkIO $ produce t0 return . Topic $ addClient >>= runTopic . repeatM . feed -- |The application @topicRate rate t@ runs 'Topic' @t@ no faster than -- @rate@ Hz. topicRate :: (Functor m, MonadIO m) => Double -> Topic m a -> Topic m a topicRate p t0 = Topic $ do delay <- liftIO $ rateLimiter p (return ()) (x,t') <- runTopic t0 let go t = Topic $ liftIO delay >> second go <$> runTopic t return (x, go t') -- |Splits a 'Topic' into two 'Topic's: the elements of the first -- 'Topic' all satisfy the given predicate, while none of the elements -- of the second 'Topic' do. partition :: (a -> Bool) -> Topic IO a -> IO (Topic IO a, Topic IO a) partition p = fmap (filter p *** filter (not . p)) . tee -- |Returns a 'Topic' whose values are consecutive values from the -- original 'Topic'. consecutive :: Monad m => Topic m a -> Topic m (a,a) consecutive = metamorph startup where startup x = skip (go x) go x y = yield (x,y) (go y) -- consecutive t = Topic $ do (x, t') <- runTopic t -- runTopic $ go x t' -- where go x t' = Topic$ do (y, t'') <- runTopic t' -- return ((x,y), go y t'') -- |Interleave two 'Topic's. Items from each component 'Topic' will be -- tagged with an 'Either' constructor and added to the combined -- 'Topic' as they become available. (<+>) :: Topic IO a -> Topic IO b -> Topic IO (Either a b) (<+>) t1 t2 = Topic $ do c <- newChan let aux = do x <- readChan c return (x, Topic aux) feed t = do (x,t') <- runTopic t writeChan c x feed t' _ <- forkIO $ feed (fmap Left t1) _ <- forkIO $ feed (fmap Right t2) aux infixl 7 <+> -- |Returns a 'Topic' that produces a new pair every time either of -- the component 'Topic's produces a new value. The value of the -- other element of the pair will be the newest available value. The -- resulting 'Topic' will produce a new value at the rate of the -- faster component 'Topic', and may contain duplicate consecutive -- elements. everyNew :: Topic IO a -> Topic IO b -> Topic IO (a,b) everyNew t1 t2 = Topic $ warmup =<< runTopic (t1 <+> t2) where warmup (Left x, t) = warmupR x =<< runTopic t warmup (Right y, t) = warmupL y =<< runTopic t warmupR _ (Left x, t) = warmupR x =<< runTopic t warmupR x (Right y, t) = return ((x,y), Topic $ runTopic t >>= go x y) warmupL _ (Right y, t) = warmupL y =<< runTopic t warmupL y (Left x, t) = return ((x,y), Topic $ runTopic t >>= go x y) go _ y (Left x, t) = return ((x,y), Topic $ runTopic t >>= go x y) go x _ (Right y, t) = return ((x,y), Topic $ runTopic t >>= go x y) -- |Returns a 'Topic' that produces a new pair every time both of the -- component 'Topic's have produced a new value. The composite -- 'Topic' will produce pairs at the rate of the slower component -- 'Topic' consisting of the most recent value from each 'Topic'. bothNew :: Topic IO a -> Topic IO b -> Topic IO (a,b) bothNew t1 t2 = Topic $ warmup =<< runTopic (t1 <+> t2) where warmup (v,t) = go v =<< runTopic t go (Left _) (l@(Left _), t) = go l =<< runTopic t go (Left x) (Right y, t) = return ((x,y), Topic $ warmup =<< runTopic t) go (Right _) (r@(Right _), t) = go r =<< runTopic t go (Right y) (Left x, t) = return ((x,y), Topic $ warmup =<< runTopic t) -- |Returns a 'Topic' that produces a new pair every time a value of -- first topic produces a new value, followed by a new value from the -- second topic. This can be used for sampling the first topic with -- the second topic. firstThenSecond :: Topic IO a -> Topic IO b -> Topic IO (a,b) firstThenSecond t1 t2 = leftThenRight (t1 <+> t2) -- |Produces a value when a Left value is followed by a Right value. leftThenRight :: (Monad m) => Topic m (Either a b) -> Topic m (a,b) leftThenRight t1 = Topic $ warmup =<< runTopic t1 where warmup (v,t) = go v =<< runTopic t go (Left x) (Right y, t) = return ((x,y), Topic $ warmup =<< runTopic t) go _ (x, t) = go x =<< runTopic t -- |Merge two 'Topic's into one. The items from each component -- 'Topic' will be added to the combined 'Topic' as they become -- available. merge :: Topic IO a -> Topic IO a -> Topic IO a merge t1 t2 = either id id <$> t1 <+> t2 -- |Apply a function to each consecutive pair of elements from a 'Topic'. finiteDifference :: (Functor m, Monad m) => (a -> a -> b) -> Topic m a -> Topic m b finiteDifference f = fmap (uncurry f) . consecutive -- |Compute a running \"average\" of a 'Topic' using a user-provided -- normalization function applied to the sum of products. The -- arguments are a constat @alpha@ that is used to scale the current -- average, a constant @invAlpha@ used to scale the newest value, a -- function for adding two scaled values, a function for scaling -- input values, a function for normalizing the sum of scaled values, -- and finally the stream to average. Parameterizing over all the -- arithmetic to this extent allows for the use of denormalizing -- scaling factors, as might be used to keep all arithmetic -- integral. An example would be scaling the average by the integer -- 7, the new value by the integer 1, then normalizing by dividing -- the sum of scaled values by 8. weightedMeanNormalized :: Monad m => n -> n -> (b -> b -> c) -> (n -> a -> b) -> (c -> a) -> Topic m a -> Topic m a weightedMeanNormalized alpha invAlpha plus scale normalize = Topic . warmup where warmup = uncurry go <=< runTopic go avg t = do (x,t') <- runTopic t let !avg' = normalize $ plus (scale alpha avg) (scale invAlpha x) return (avg', Topic $ go avg' t') {-# INLINE weightedMeanNormalized #-} -- |Perform numerical integration of a 'Topic' using Simpson's rule -- applied at three consecutive points. This requires a function for -- adding values from the 'Topic', and a function for scaling values -- by a fractional number. simpsonsRule :: (Monad m, Fractional n) => (a -> a -> a) -> (n -> a -> a) -> Topic m a -> Topic m a simpsonsRule plus scale t0 = Topic $ do ([x,y], t') <- splitAt 2 t0 go x y t' where go x y t = do (z,t') <- runTopic t return (simpson x y z, Topic $ go y z t') simpson a mid b = scale c $ plus (plus a (scale 4 mid)) b c = 1 / 3 {-# INLINE simpsonsRule #-} -- |Compute a running \"average\" of a 'Topic'. The application -- @weightedMean alpha plus scale t@ sums the product of @alpha@ and -- the current average with the product of @1 - alpha@ and the newest -- value produced by 'Topic' @t@. The addition and scaling operations -- are performed using the supplied @plus@ and @scale@ functions. weightedMean :: (Monad m, Num n) => n -> (a -> a -> a) -> (n -> a -> a) -> Topic m a -> Topic m a weightedMean alpha plus scale = weightedMean2 alpha (1 - alpha) plus scale {-# INLINE weightedMean #-} -- |Compute a running \"average\" of a 'Topic'. The application -- @weightedMean2 alpha invAlpha plus scale t@ sums the product of -- @alpha@ and the current average with the product of @invAlpha@ and -- the newest value produced by 'Topic' @t@. The addition and scaling -- operations are performed using the supplied @plus@ and @scale@ -- functions. weightedMean2 :: Monad m => n -> n -> (a -> a -> a) -> (n -> a -> a) -> Topic m a -> Topic m a weightedMean2 alpha invAlpha plus scale = Topic . warmup where warmup = uncurry go <=< runTopic go avg t = do (x, t') <- runTopic t let !savg = scale alpha avg !sx = scale invAlpha x !avg' = plus savg sx return (avg', Topic $ go avg' t') {-# INLINE weightedMean2 #-} -- |Use a 'Topic' of functions to filter a 'Topic' of values. The -- application @filterBy t1 t2@ causes each function from 'Topic' @t1@ -- to be applied to values produced by @t2@ until it returns -- 'True'. At that point, the 'filterBy' application produces the -- accepted value of the @t2@ and moves on to the next function from -- @t1@ which is applied to the rest of @t2@ in the same manner. filterBy :: Monad m => Topic m (a -> Bool) -> Topic m a -> Topic m a filterBy tf tx = Topic $ do (f, tf') <- runTopic tf (x, tx') <- uncons $ dropWhile (not . f) tx return (x, filterBy tf' tx') -- |Produce elements of the first 'Topic' no faster than elements of -- the second 'Topic' are produced. gate :: (Applicative m, Monad m) => Topic m a -> Topic m b -> Topic m a gate t1 t2 = const <$> t1 <*> t2 -- |Flatten a 'Topic' of 'F.Foldable' values. For example, turn a -- @Topic m [a]@ of finite lists into a @Topic a@ by taking each -- element from each list in sequence. concats :: (Monad m, F.Foldable f) => Topic m (f a) -> Topic m a concats t = Topic $ do (x, t') <- runTopic t F.foldr (\x' z -> return (x', Topic z)) (runTopic $ concats t') x -- |Flatten a 'Topic' of 'F.Foldable' values such that old values are -- discarded as soon as the original 'Topic' produces a new -- 'F.Foldable'. interruptible :: F.Foldable t => Topic IO (t a) -> Topic IO a interruptible s = Topic $ do feeder <- newEmptyMVar -- Active feeder thread latestItem <- newEmptyMVar -- Next available item signal <- newEmptyMVar -- Demand signal let feedItems ys = do ft <- tryTakeMVar feeder maybe (return ()) killThread ft t <- forkIO $ F.traverse_ (\y -> takeMVar signal >> putMVar latestItem y) ys putMVar feeder t watchForItems t = do (x,t') <- runTopic t feedItems x watchForItems t' getAll = do putMVar signal () x <- takeMVar latestItem return (x, Topic getAll) _ <- forkIO $ watchForItems s getAll -- |Pull elements from a 'Topic' in a new thread. This allows 'IO' -- 'Topic's to run at different rates even if they are consumed by a -- single thread. forkTopic :: Topic IO a -> IO (Topic IO a) forkTopic t = do c <- newChan _ <- forkIO . forever . join $ fmap (writeChan c) t let feed = Topic $ (\x -> (x,feed)) <$> readChan c return feed -- |Sliding window over a 'Monoid'. @slidingWindow n t@ slides a -- window of width @n@ along 'Topic' @t@. As soon as at least @n@ -- elements have been produced by @t@, the output 'Topic' starts -- producing the 'mconcat' of the elements in the window. slidingWindow :: (Monad m, Monoid a) => Int -> Topic m a -> Topic m a slidingWindow n = metamorph (fill S.empty) where fill w x | S.length w < n - 1 = skip . fill $ w |> x | otherwise = let w' = w |> x in yield (F.fold w') (go w') go w x = let w' = dropOldest w |> x in yield (F.fold w') (go w') dropOldest w = case viewl w of EmptyL -> S.empty _ :< w' -> w' -- |Sliding window over an 'AdditiveGroup'. @slidingWindowG n t@ -- slides a window of width @n@ along 'Topic' @t@. As soon as at least -- @n@ elements have been produced by @t@, the output 'Topic' starts -- producing the total sum of the elements of the window. This -- function is more efficient than 'slidingWindow' because the group -- inverse operation is used to remove elements falling behind the -- window from the running sum. slidingWindowG :: (Monad m, AdditiveGroup a) => Int -> Topic m a -> Topic m a slidingWindowG n = metamorph (fill S.empty) where fill w x | S.length w < n - 1 = skip . fill $ w |> x | otherwise = let w' = w |> x s = getSum . F.fold . fmap Sum $ w' in yield s (go s w') go s w x = case viewl w of EmptyL -> yield x $ go x (S.singleton x) y :< w' -> let s' = s ^+^ x ^-^ y in yield s' $ go s' (w' |> x) -- |A way of pushing a monadic action into and along a 'Topic'. The -- application @topicOn proj inj trans t@ extracts a function from -- @trans@ that is then applied to the result of applying @proj@ to -- each value of 'Topic' @t@. The result of that application is -- supplied to the result of applying @inj@ to the same values from -- @t@ to produce a value for the output 'Topic'. A typical use case -- is projecting out a field from the original 'Topic' @t@ using -- @proj@ so that it may be modified by @trans@ and then injected back -- into the original structure using @inj@. topicOn :: (Applicative m, Monad m) => (a -> b) -> (a -> c -> d) -> m (b -> m c) -> Topic m a -> Topic m d topicOn proj inj trans t = Topic $ do f <- trans runTopic $ mapM (\x -> inj x `fmap` f (proj x)) t -- |@subsample n t@ subsamples topic 't' by dropping 'n' elements for -- every element produced by the result topic. subsample :: Monad m => Int -> Topic m b -> Topic m b subsample n = metamorph $ go n where go 0 x = yield x (go n) go i _ = skip (go (i - 1))