----------------------------------------------------------------------------- -- | -- Module : FRP.UISF.AuxFunctions -- Copyright : (c) Daniel Winograd-Cort 2014 -- License : see the LICENSE file in the distribution -- -- Maintainer : dwc@cs.yale.edu -- Stability : experimental -- -- Auxiliary functions for use with UISF or other arrows. {-# LANGUAGE Arrows, ScopedTypeVariables, TupleSections #-} module FRP.UISF.AuxFunctions ( -- * Types SEvent, Time, DeltaT, ArrowTime, time, ArrowIO, liftAIO, initialAIO, -- * Useful SF Utilities (Mediators) constA, constSF, edge, accum, unique, hold, now, mergeE, (~++), concatA, runDynamic, foldA, foldSF, maybeA, evMap, -- * Delays and Timers delay, -- | delay is a unit delay. It is exactly the delay from ArrowCircuit. vdelay, fdelay, vcdelay, fcdelay, timer, genEvents, -- * Event buffer Tempo, BufferOperation(..), eventBuffer, eventBuffer', -- (=>>), (->>), (.|.), -- snapshot, snapshot_, -- * Signal Function Conversions -- $conversions -- *** Types Automaton(..), -- *** Conversions -- $conversions2 asyncC, asyncV, asyncE, asyncC' ) where import Prelude hiding ((.), id) import Control.Category import Control.Arrow import Control.Arrow.Operations import Control.Arrow.Transformer.Automaton import Data.Sequence (empty, (<|), (|>), (><), viewl, ViewL(..), viewr, ViewR(..)) import qualified Data.Sequence as Seq import Data.Maybe (listToMaybe) import Control.Concurrent import Data.IORef import Data.Foldable (toList) import Control.DeepSeq -------------------------------------- -- Types -------------------------------------- -- | SEvent is short for \"Stream Event\" and is a type synonym for Maybe. type SEvent = Maybe -- | Time is simply represented as a Double. type Time = Double -- | DeltaT is a type synonym referring to a change in Time. type DeltaT = Double -- | Instances of this class have arrowized access to the time class ArrowTime a where time :: a () Time class Arrow a => ArrowIO a where liftAIO :: (b -> IO c) -> a b c initialAIO :: IO d -> (d -> a b c) -> a b c -------------------------------------- -- Useful SF Utilities (Mediators) -------------------------------------- -- | constA is an arrowized version of const constA :: Arrow a => c -> a b c constA = arr . const -- | constSF is a convenience constSF :: Arrow a => b -> a b d -> a c d constSF s sf = constA s >>> sf -- | edge generates an event whenever the Boolean input signal changes -- from False to True -- in signal processing this is called an ``edge -- detector,'' and thus the name chosen here. edge :: ArrowCircuit a => a Bool (SEvent ()) edge = proc b -> do prev <- delay False -< b returnA -< if not prev && b then Just () else Nothing -- | The signal function (accum v) starts with the value v, but then -- applies the function attached to the first event to that value -- to get the next value, and so on. accum :: ArrowCircuit a => b -> a (SEvent (b -> b)) b accum x = proc f -> do rec b <- delay x -< maybe b ($b) f returnA -< b -- | The signal function unique will produce an event each time its input -- signal changes. unique :: Eq e => ArrowCircuit a => a e (SEvent e) unique = proc e -> do prev <- delay Nothing -< Just e returnA -< if prev == Just e then Nothing else Just e -- | hold is a signal function whose output starts as the value of the -- static argument. This value is held until the first input event -- happens, at which point it changes to the value attached to that -- event, which it then holds until the next event, and so on. hold :: ArrowCircuit a => b -> a (SEvent b) b hold x = arr (fmap (const $)) >>> accum x -- | Now is a signal function that produces one event and then forever -- after produces nothing. It is essentially an impulse function. now :: ArrowCircuit a => a () (SEvent ()) now = arr (const Nothing) >>> delay (Just ()) -- | mergeE merges two events with the given resolution function. mergeE :: (a -> a -> a) -> SEvent a -> SEvent a -> SEvent a mergeE _ Nothing Nothing = Nothing mergeE _ le@(Just _) Nothing = le mergeE _ Nothing re@(Just _) = re mergeE resolve (Just l) (Just r) = Just (resolve l r) -- | A nice infix operator for merging event lists (~++) :: SEvent [a] -> SEvent [a] -> SEvent [a] (~++) = mergeE (++) -- | Returns n samples of type b from the input stream at a time, -- updating after k samples. This function is good for chunking -- data and is a critical component to fftA quantize :: ArrowCircuit a => Int -> Int -> a b (SEvent [b]) quantize n k = proc d -> do rec (ds,c) <- delay ([],0) -< (take n (d:ds), c+1) returnA -< if c >= n && c `mod` k == 0 then Just ds else Nothing -- | Combines the input list of arrows into one arrow that takes a -- list of inputs and returns a list of outputs. concatA :: Arrow a => [a b c] -> a [b] [c] concatA [] = arr $ const [] concatA (sf:sfs) = proc (b:bs) -> do c <- sf -< b cs <- concatA sfs -< bs returnA -< (c:cs) -- | This essentially allows an arrow that processes b to c to take -- [b] and recursively generate cs, combining them all into a -- final output d. foldA :: ArrowChoice a => (c -> d -> d) -> d -> a b c -> a [b] d foldA merge i sf = h where h = proc inp -> case inp of [] -> returnA -< i b:bs -> do c <- sf -< b d <- h -< bs returnA -< merge c d runDynamic :: ArrowChoice a => a b c -> a [b] [c] runDynamic = foldA (:) [] -- | For folding results of a list of signal functions foldSF :: Arrow a => (b -> c -> c) -> c -> [a () b] -> a () c foldSF f c sfs = let inps = replicate (length sfs) () in constA inps >>> concatA sfs >>> arr (foldr f c) --foldSF f b sfs = -- foldr g (constA b) sfs where -- g sfa sfb = -- proc () -> do -- s1 <- sfa -< () -- s2 <- sfb -< () -- returnA -< f s1 s2 maybeA :: ArrowChoice a => a () c -> a b c -> a (Maybe b) c maybeA nothing just = proc eb -> do case eb of Just b -> just -< b Nothing -> nothing -< () evMap :: ArrowChoice a => a b c -> a (SEvent b) (SEvent c) evMap a = maybeA (constA Nothing) (a >>> arr Just) -------------------------------------- -- Delays and Timers -------------------------------------- -- | delay is a unit delay. It is exactly the delay from ArrowCircuit. -- | fdelay is a delay function that delays for a fixed amount of time, -- given as the static argument. It returns a signal function that -- takes the current time and an event stream and delays the event -- stream by the delay amount. -- fdelay guarantees that the order of events in is the same as the -- order of events out and that no event will be skipped. However, -- if events are too densely packed in the signal (compared to the -- clock rate of the underlying arrow), then some events may be -- over delayed. fdelay :: (ArrowTime a, ArrowCircuit a) => DeltaT -> a (SEvent b) (SEvent b) fdelay d = proc e -> do t <- time -< () rec q <- delay empty -< maybe q' (\e' -> q' |> (t+d,e')) e let (ret, q') = case viewl q of EmptyL -> (Nothing, q) (t0,e0) :< qs -> if t >= t0 then (Just e0, qs) else (Nothing, q) returnA -< ret -- | vdelay is a delay function that delays for a variable amount of time. -- It takes the current time, an amount of time to delay, and an event -- stream and delays the event stream by the delay amount. -- vdelay, like fdelay, guarantees that the order of events in is the -- same as the order of events out and that no event will be skipped. -- If the events are too dense or the delay argument drops too quickly, -- some events may be over delayed. vdelay :: (ArrowTime a, ArrowCircuit a) => a (DeltaT, SEvent b) (SEvent b) vdelay = proc (d, e) -> do t <- time -< () rec q <- delay empty -< maybe q' (\e' -> q' |> (t,e')) e let (ret, q') = case viewl q of EmptyL -> (Nothing, q) (t0,e0) :< qs -> if t-d >= t0 then (Just e0, qs) else (Nothing, q) returnA -< ret -- | fcdelay is a continuous version of fdelay. It takes an initial value -- to emit for the first dt seconds. After that, the delay will always -- be accurate, but some data may be ommitted entirely. As such, it is -- not advisable to use fcdelay for event streams where every event must -- be processed (that's what fdelay is for). fcdelay :: (ArrowTime a, ArrowCircuit a) => b -> DeltaT -> a b b fcdelay i dt = proc v -> do t <- time -< () rec q <- delay empty -< q' |> (t+dt, v) -- this list has pairs of (emission time, value) let (ready, rest) = Seq.spanl ((<= t) . fst) q (ret, q') = case viewr ready of EmptyR -> (i, rest) _ :> (t', v') -> (v', (t',v') <| rest) returnA -< ret -- | vcdelay is a continuous version of vdelay. It will always emit the -- value that was produced dt seconds earlier (erring on the side of an -- older value if necessary). Be warned that this version of delay can -- both omit some data entirely and emit the same data multiple times. -- As such, it is usually inappropriate for events (use vdelay). -- vcdelay takes a 'maxDT' argument that stands for the maximum delay -- time that it can handle. This is to prevent a space leak. -- -- Implementation note: Rather than keep a single buffer, we keep two -- sequences that act to produce a sort of lens for a buffer. qlow has -- all the values that are older than what we currently need, and qhigh -- has all of the newer ones. Obviously, as time moves forward and the -- delay amount variably changes, values are moved back and forth between -- these two sequences as necessary. -- This should provide a slight performance boost. vcdelay :: (ArrowTime a, ArrowCircuit a) => DeltaT -> b -> a (DeltaT, b) b vcdelay maxDT i = proc (dt, v) -> do t <- time -< () rec (qlow, qhigh) <- delay (empty,empty) -< (dropMostWhileL ((< t-maxDT) . fst) qlow', qhigh' |> (t, v)) -- this is two lists with pairs of (initial time, value) -- We construct four subsequences:, a, b, c, and d. They are ordered by time and we -- have an invariant that a >< b >< c >< d is the entire buffer ordered by time. let (b,a) = Seq.spanr ((> t-dt) . fst) qlow (c,d) = Seq.spanl ((<= t-dt) . fst) qhigh -- After the spans, the value we are looking for will be in either c or a. (ret, qlow', qhigh') = case viewr c of _ :> (t', v') -> (v', qlow >< c, d) EmptyR -> case viewr a of _ :> (t', v') -> (v', a, b >< qhigh) EmptyR -> (i, a, b >< qhigh) returnA -< ret where -- This function acts like a wrapper for Seq.dropWhileL that will never -- leave the input queue empty (unless it started that way). At worst, -- it will leave the queue with its rightmost (latest in time) element. dropMostWhileL f q = if Seq.null q then empty else case viewl dq of EmptyL -> Seq.singleton $ Seq.index q (Seq.length q - 1) _ -> dq where dq = Seq.dropWhileL f q -- | timer is a variable duration timer. -- This timer takes the current time as well as the (variable) time between -- events and returns an SEvent steam. When the second argument is non-positive, -- the output will be a steady stream of events. As long as the clock speed is -- fast enough compared to the timer frequency, this should give accurate and -- predictable output and stay synchronized with any other timer and with -- time itself. timer :: (ArrowTime a, ArrowCircuit a) => a DeltaT (SEvent ()) timer = proc dt -> do now <- time -< () rec last <- delay 0 -< t' let ret = now >= last + dt t' = latestEventTime last dt now returnA -< if ret then Just () else Nothing where latestEventTime last dt now | dt <= 0 = now latestEventTime last dt now = if now > last + dt then latestEventTime (last+dt) dt now else last -- | genEvents is a timer that instead of returning unit events -- returns the next element of the input list. When the input -- list is empty, the output stream becomes all Nothing. genEvents :: (ArrowTime a, ArrowCircuit a) => [b] -> a DeltaT (SEvent b) genEvents lst = proc dt -> do e <- timer -< dt rec l <- delay lst -< maybe l (const $ drop 1 l) e returnA -< maybe Nothing (const $ listToMaybe l) e -------------------------------------- -- Event buffer -------------------------------------- -- | Tempo is just a Double. type Tempo = Double -- | The BufferOperation data type wraps up the data and operational commands -- to control an 'eventbuffer'. data BufferOperation b = NoBOp -- ^ No Buffer Operation | ClearBuffer -- ^ Erase the buffer | SkipAheadInBuffer DeltaT -- ^ Skip ahead a certain amount of time in the buffer | MergeInBuffer [(DeltaT, b)] -- ^ Merge data into the buffer | AppendToBuffer [(DeltaT, b)] -- ^ Append data to the end of the buffer | SetBufferPlayStatus Bool (BufferOperation b) -- ^ Set a new play status (True = Playing, False = Paused) | SetBufferTempo Tempo (BufferOperation b) -- ^ Set the buffer's tempo -- | eventBuffer allows for a timed series of events to be prepared and -- emitted. The streaming input is a BufferOperation, described above. -- Note that the default play status is playing and the default tempo -- is 1. Just as MIDI files have events timed based -- on ticks since the last event, the events here are timed based on -- seconds since the last event. If an event is to occur 0.0 seconds -- after the last event, then it is assumed to be played at the same -- time as the last event, and all simultaneous events are emitted -- at the same timestep. In addition to any events emitted, a -- streaming Bool is emitted that is True if the buffer is empty and -- False if the buffer is full (meaning that events will still come). eventBuffer :: (ArrowTime a, ArrowCircuit a) => a (BufferOperation b) (SEvent [b], Bool) eventBuffer = arr (,()) >>> second time >>> eventBuffer' eventBuffer' :: ArrowCircuit a => a (BufferOperation b, Time) (SEvent [b], Bool) eventBuffer' = proc (bo', t) -> do let (bo, doPlay', tempo') = collapseBO bo' doPlay <- hold True -< doPlay' tempo <- hold 1 -< tempo' rec tprev <- delay 0 -< t --used to calculate dt, the change in time buffer <- delay [] -< buffer''' --the buffer let dt = tempo * (t-tprev) --dt will never be negative buffer' = if doPlay then subTime buffer dt else buffer buffer'' = update buffer' bo --update the buffer based on the operation (nextMsgs, buffer''') = if doPlay then getNextEvent buffer'' --get any events that are ready else (Nothing, buffer'') returnA -< (nextMsgs, null buffer''') where subTime :: [(DeltaT, b)] -> DeltaT -> [(DeltaT, b)] subTime [] _ = [] subTime ((bt,b):bs) dt = if bt < dt then (0,b):subTime bs (dt-bt) else (bt-dt,b):bs getNextEvent :: [(DeltaT, b)] -> (SEvent [b], [(DeltaT, b)]) getNextEvent buffer = let (es,rest) = span ((<=0).fst) buffer nextEs = map snd es in if null buffer then (Nothing, []) else (Just nextEs, rest) update :: [(DeltaT, b)] -> BufferOperation b -> [(DeltaT, b)] update b NoBOp = b update _ ClearBuffer = [] update b (SkipAheadInBuffer dt) = skipAhead b dt update b (MergeInBuffer b') = merge b b' update b (AppendToBuffer b') = b ++ b' update _ _ = error "The impossible happened in eventBuffer" merge :: [(DeltaT, b)] -> [(DeltaT, b)] -> [(DeltaT, b)] merge b [] = b merge [] b = b merge ((bt1,b1):bs1) ((bt2,b2):bs2) = if bt1 < bt2 then (bt1,b1):merge bs1 ((bt2-bt1,b2):bs2) else (bt2,b2):merge ((bt1-bt2,b1):bs1) bs2 skipAhead :: [(DeltaT, b)] -> DeltaT -> [(DeltaT, b)] skipAhead [] _ = [] skipAhead b dt | dt <= 0 = b skipAhead ((bt,b):bs) dt = if bt < dt then skipAhead bs (dt-bt) else (bt-dt,b):bs collapseBO :: BufferOperation b -> (BufferOperation b, Maybe Bool, Maybe Tempo) collapseBO (SetBufferPlayStatus b bo) = let (o, _, t) = collapseBO bo in (o, Just b, t) collapseBO (SetBufferTempo t bo) = let (o, b, _) = collapseBO bo in (o, b, Just t) collapseBO bo = (bo, Nothing, Nothing) -------------------------------------- -- Yampa-style utilities -------------------------------------- (=>>) :: SEvent a -> (a -> b) -> SEvent b (=>>) = flip fmap (->>) :: SEvent a -> b -> SEvent b (->>) = flip $ fmap . const (.|.) :: SEvent a -> SEvent a -> SEvent a (.|.) = flip $ flip maybe Just snapshot :: SEvent a -> b -> SEvent (a,b) snapshot = flip $ fmap . flip (,) snapshot_ :: SEvent a -> b -> SEvent b snapshot_ = flip $ fmap . const -- same as ->> -------------------------------------- -- Signal Function Conversions -------------------------------------- -- $conversions -- Due to the internal IO, ArrowIO arrows are -- not necessarily pure. Thus, when we run them, we say that they run \"in -- real time\". This means that the time between two samples can vary and is -- inherently unpredictable. -- -- However, sometimes we have a pure computation that we would like to run -- on a simulated clock. This computation will expect to produce values at -- specific intervals, and because it's pure, that expectation can sort of be -- satisfied. For this, we would use asynchV (V for virtual). -- -- The other functions in this section are for other forms of asynchrony. -- There is one for Event-based asynchrony and two for continuous. -- $conversions2 -- The following function is for lifting an Automaton to an ArrowIO by -- appropriately converting the "simulated time" Automaton into realtime. -- | The clockrate is the simulated rate of the input signal function. -- The buffer is the amount of time the given signal function is -- allowed to get ahead of real time. The threadHandler is where the -- ThreadId of the forked thread is sent. -- -- The output signal function takes and returns values in real time. -- The input must be paired with time, and the return values are the -- list of bs generated in the given time step, each time stamped. -- Note that the returned list may be long if the clockrate is much -- faster than real time and potentially empty if it's slower. -- Note also that the caller can check the time stamp on the element -- at the end of the list to see if the inner, \"simulated\" signal -- function is performing as fast as it should. asyncV :: (ArrowIO a, NFData c) => Double -- ^ Clockrate -> DeltaT -- ^ Amount of time to buffer -> (ThreadId -> a () ()) -- ^ The thread handler -> (Automaton (->) b c) -- ^ The automaton to convert to realtime -> a (b, Time) [(c, Time)] asyncV clockrate buffer threadHandler sf = initialAIO iod darr where iod = do inp <- newEmptyMVar out <- newIORef empty timevar <- newEmptyMVar tid <- forkIO $ worker inp out timevar 1 1 sf return (tid, inp, out, timevar) darr (tid, inp, out, timevar) = proc (b,t) -> do _ <- threadHandler tid -< () _ <- liftAIO (\b -> tryTakeMVar inp >> putMVar inp b) -< b -- send the worker the new input _ <- liftAIO (tryPutMVar timevar) -< t -- update the time for the worker c <- liftAIO (atomicModifyIORef out) -< Seq.spanl (\(_,t0) -> t >= t0) --collect ready results returnA -< toList c -- worker processes the inner, "simulated" signal function. worker inp out timevar t count (Automaton sf) = do b <- readMVar inp -- get the latest input let (c, sf') = sf b -- do the calculation s <- deepseq c $ atomicModifyIORef out (\s -> (s |> (c, fromIntegral count/clockrate), s)) t' <- if Seq.length s > 0 && snd (seqLastElem s) >= t+buffer then takeMVar timevar else return t worker inp out timevar t' (count+1) sf' seqLastElem s = Seq.index s (Seq.length s - 1) -- | The asyncE function takes a pure signal function (an Automaton) and converts -- it into an asynchronous signal function usable in a MonadIO signal -- function context. The output MSF takes events of type a, feeds them to -- the asynchronously running input SF, and returns events with the output -- b whenever they are ready. The input SF is expected to run slowly -- compared to the output MSF, but it is capable of running just as fast. -- The input stream is a value, an option to clear any buffered values, or -- nothing, and the output stream is either a result value, a AOCalculating -- indicating that the asynchronous function is calculating and giving the -- buffer size, or nothing. asyncE :: (ArrowIO a, ArrowLoop a, ArrowCircuit a, ArrowChoice a, NFData c) => (ThreadId -> a () ()) -- ^ The thread handler -> (Automaton (->) b c) -- ^ The automaton to convert to asynchronize -> a (SEvent b) (SEvent c) asyncE threadHandler sf = {- delay AINoValue >>> -} initialAIO iod darr where iod = do inp <- newIORef empty out <- newIORef empty proceed <- newEmptyMVar tid <- forkIO $ worker proceed inp out sf return (tid, proceed, inp, out) -- count should start at 0 darr (tid, proceed, inp, out) = proc eb -> do _ <- threadHandler tid -< () case eb of Just b -> liftAIO (\b -> atomicModifyIORef inp (\s -> (s |> b, ())) >> tryPutMVar proceed ()) -< b Nothing -> returnA -< False c <- liftAIO (const $ atomicModifyIORef out seqRestHead) -< () returnA -< c -- worker processes the inner, "simulated" signal function. -- worker :: MVar () -> IORef (Seq a) -> IORef (Seq b) -> Automaton a b -> IO () worker proceed inp out (Automaton sf) = do eb <- atomicModifyIORef inp seqRestHead case eb of Nothing -> takeMVar proceed >> worker proceed inp out (Automaton sf) Just b -> do let (c, sf') = sf b -- do the calculation deepseq c $ atomicModifyIORef out (\s -> (s |> c, ())) worker proceed inp out sf' seqRestHead s = case viewl s of EmptyL -> (s, Nothing) a :< s' -> (s', Just a) -- | asyncC is the continuous async function. asyncC :: (ArrowIO a, NFData c) => (ThreadId -> a () ()) -- ^ The thread handler -> (Automaton (->) b c) -- ^ The automaton to convert to realtime -> a b [c] --asyncC th sf = asyncC' th (const . return $ (), return) (first sf) asyncC threadHandler sf = initialAIO iod darr where iod = do inp <- newEmptyMVar out <- newIORef empty tid <- forkIO $ worker inp out sf return (tid, inp, out) darr (tid, inp, out) = proc b -> do _ <- threadHandler tid -< () _ <- liftAIO (\b -> tryTakeMVar inp >> putMVar inp b) -< b -- send the worker the new input c <- liftAIO (\_ -> atomicModifyIORef out (\s -> (empty,s))) -< () --collect ready results returnA -< toList c -- worker processes the inner, "simulated" signal function. worker inp out (Automaton sf) = do b <- readMVar inp -- get the latest input let (c, sf') = sf b -- do the calculation deepseq c $ atomicModifyIORef out (\s -> (s |> c, ())) worker inp out sf' -- | A version of asyncC that does actions on either end of the automaton asyncC' :: (ArrowIO a, ArrowLoop a, ArrowCircuit a, ArrowChoice a, NFData b) => (ThreadId -> a () ()) -- ^ The thread handler -> (b -> IO d, e -> IO ()) -- ^ Effectful input and output channels for the automaton -> (Automaton (->) (b,d) (c,e)) -- ^ The automaton to convert to asynchronize -> a b [c] asyncC' threadHandler (iAction, oAction) sf = initialAIO iod darr where iod = do inp <- newIORef undefined start <- newEmptyMVar out <- newIORef empty tid <- forkIO $ takeMVar start >> worker inp out sf return (tid, inp, out, start) darr (tid, inp, out, start) = proc b -> do _ <- threadHandler tid -< () _ <- liftAIO $ (\b -> deepseq b $ writeIORef inp b) -< b -- send the worker the new input c <- initialAIO (putMVar start ()) (const $ liftAIO (\_ -> atomicModifyIORef' out (\s -> (empty,s)))) -< () --collect ready results returnA -< toList c -- worker processes the inner, "simulated" signal function. worker inp out (Automaton sf) = do b <- readIORef inp -- get the latest input d <- iAction b let ((c,e), sf') = sf (b,d) -- do the calculation oAction e atomicModifyIORef' out (\s -> (s |> c, ())) worker inp out sf'