module Reactive.Bacon.EventStream.IO where import Reactive.Bacon.Core import Reactive.Bacon.EventStream.Monadic import Reactive.Bacon.EventStream import Reactive.Bacon.PushStream import Data.IORef import Control.Concurrent(forkIO) import Control.Monad -- | startProcess is a function whose params are "event sink" and "stop sign" fromStoppableProcess :: ((Event a -> IO ()) -> IO Bool -> IO ()) -> IO (EventStream a, IO ()) fromStoppableProcess startProcess = do (stream, pushEvent) <- newPushStream stopSignal <- newIORef False let getStopState = (readIORef stopSignal) startProcess (guardedPush pushEvent getStopState) getStopState return (stream, (writeIORef stopSignal True)) where guardedPush pushEvent getStopState event = do stop <- getStopState unless stop $ pushEvent event fromNonStoppableProcess :: ((Event a -> IO ()) -> IO ()) -> IO (EventStream a) fromNonStoppableProcess startProcess = do (stream, pushEvent) <- newPushStream startProcess (pushEvent) return stream fromIO :: IO a -> IO (EventStream a) fromIO action = fromNonStoppableProcess $ \sink -> void $ forkIO $ action >>= sink . Next >> sink End