{-# LANGUAGE Trustworthy, DeriveDataTypeable, DeriveFunctor, ScopedTypeVariables, RecursiveDo #-}

module FRP.Reactivity.AlternateEvent (Event(..), eFromML, nonDispatchedIO, runEvent,
-- ** A minimal set of combinators
EventStream(..), filterMaybes, delay, delays,
-- ** Optimizer rules
fmap', scan') where

import qualified Data.Map as M
import Data.Unique
import Data.IORef
import Data.Typeable
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Reader
import Control.Applicative
import Control.Concurrent
import Control.Monad.Catch
import System.IO.Unsafe
import Data.Time.Clock.POSIX
import Data.Maybe
import FRP.Reactivity.Measurement
import FRP.Reactivity.RPC

-- | Event streams are here presented using the publisher-subscriber model (push-based handling
--   in contrast to the pull-based handling of 'MeasurementWrapper'). Such an event
--   is represented by a subscription function and a callback. The subscription function finishes fast allowing
--   the caller to continue.
--
--   The motivation for introducing this data type is that, while the 'Measurement'/'MeasurementWrapper'
--   system is fast, its intensive use of memory cells that need to be garbage collected means
--   that it may not be fast enough for some purposes.
newtype Event t = Event { unEvent :: (t -> POSIXTime -> RPC ()) -> RPC (RPC ()) } deriving (Typeable, Functor)

-- avoid inlining until rules have fired
{-# INLINE[0] eFromML #-}
-- | Extracts an 'Event' from a list of measurements. No attempt has been made to sync up
-- the time order of different calls to 'eFromML'. For this reason, it is not true that
-- "eFromML ml `mplus` eFromML ml2" equals "eFromML (ml `merge` ml2)"; please do any necessary
-- syncing before calling 'eFromML'.
eFromML :: [Measurement t] -> Event t
eFromML ls = Event (\f -> liftM (_nonDispatchedIO . killThread) $ rpcFork (mapM_ (\meas -> _nonDispatchedIO (getValue meas) >>= uncurry f) ls))

instance Monad Event where
	return x = Event (\f -> f x 0 >> return (return ()))
	Event f >>= g = Event (\h -> do
		-- Lots of machinery to track all subscriptions...
		ref <- _nonDispatchedIO (newIORef (return ()))
		liftM (\m -> m >> join (_nonDispatchedIO (readIORef ref))) (f (get2 ref h))) where
		get h t x' t' = h x' (max t t')
		get2 ref h x t = unEvent (g x) (get h t) >>= \m -> _nonDispatchedIO (atomicModifyIORef ref (\m2 -> (m2 >> m, ())))
	fail _ = mzero

-- All the properties (associativity, unitality etc.) fall out of the monad axioms.
instance MonadPlus Event where
	mzero = Event (\_ -> return (return ())) -- Return immediately
	mplus e e2 = Event (\f -> liftM2 (>>) (unEvent e f) (unEvent e2 f))

instance Monoid (Event t) where
	mempty = mzero
	mappend = mplus

instance MonadIO Event where
	liftIO m = Event (\f -> liftIO m >>= \x ->
		_nonDispatchedIO (measure (return ())) >>= \meas ->
		f x (time meas) >> return (return ()))

-- I/O can be useful...
nonDispatchedIO m = Event (\f -> _nonDispatchedIO (measure m >>= getValue) >>= uncurry f >> return (return ()))

-- | Run an event -- see .Basic module for a way to run with a Windows event loop.
runEvent :: Event t -> (t -> POSIXTime -> IO ()) -> IO ()
runEvent e f = void $ do
	mv <- newEmptyMVar
	forkIO $ void $ runReaderT (unRPC $ unEvent e (\x t -> liftIO (f x t))) (False, mv)
	rpcServer mv

instance Applicative Event where
	pure = return
	(<*>) = ap

instance Alternative Event where
	empty = mzero
	(<|>) = mplus

class (MonadPlus e) => EventStream e where
	-- | Prooty self-explanatory.
	eventFromList :: [(t, POSIXTime)] -> e t
	-- | The "scan" primitive is analogous to "scanl" for lists.
	scan :: (t -> u -> (t, v)) -> t -> e u -> e v
	-- | A primitive like "switch" is the main way of implementing behaviors that can be switched
	-- in and out as required.
	switch :: e (e t) -> e t
	-- | The main use of "withRemainder" is to implement manual aging of inputs. It helps prevent space
	-- and time leaks.
	withRemainder :: e t -> e (t, e t)
	-- | Construct a channel in order to receive external events.
	channel :: IO (t -> IO (), e t)
	-- | Gets the current time along with every occurrence.
	adjoinTime :: e t -> e (t, POSIXTime)

retryLoop ref = -- It's a comin'
	_nonDispatchedIO (readIORef ref) >>= either (\_ -> _nonDispatchedIO yield >> retryLoop ref) id

eventSwitch e = Event (\f -> do
	-- Just keep track of the most recent subscription...
	ref <- _nonDispatchedIO (newIORef (Right (return ())))
	liftM (\m -> m >> retryLoop ref) (unEvent e (handler f ref))) where
	handler f ref e' t = do
		ei <- _nonDispatchedIO (atomicModifyIORef ref (\m -> (Left (), m)))
		either
			(\_ -> _nonDispatchedIO yield >> handler f ref e' t)
			(\m -> do
			-- Unsubscribe from it...
			m
			-- ... and switch in the new one
			m' <- rpcFinally
				(unEvent e' f)
				(_nonDispatchedIO (writeIORef ref (Right (return ()))))
			_nonDispatchedIO (writeIORef ref (Right m')))
			ei

instance EventStream Event where
	eventFromList = eFromML . fromList

	scan f x e = Event (\g -> do
		ref <- _nonDispatchedIO (newIORef x)
		unEvent e (\y t -> _nonDispatchedIO (atomicModifyIORef ref (\x -> let (x', z) = f x y in (x', z))) >>= \z -> g z t))

	switch = eventSwitch

	withRemainder e = Event (\f -> unEvent e (\x t -> f (x, e) t))

	channel = do
		ref <- newIORef M.empty
		return (add ref, e ref)
		where
		-- Machinery to keep track of subscriptions. This will maintain a set of callbacks
		-- that want to receive messages. When a subscriber unsubscribes, it will be
		-- removed from the set.
		add ref x = measure (return ()) >>= \meas -> readIORef ref >>= \mp -> mapM_ (\f -> f x (time meas)) mp
		unsub ref un = _nonDispatchedIO $ atomicModifyIORef ref (\mp -> (M.delete un mp, ()))
		e ref = Event (\f -> _nonDispatchedIO newUnique >>= \un ->
			RPC ask >>= \(_, mv) ->
			_nonDispatchedIO (atomicModifyIORef ref (\mp -> (M.insert un (\x t -> runReaderT (unRPC (f x t)) (False, mv)) mp, ())))
			>> return (unsub ref un))

	adjoinTime e = Event (\f -> unEvent e (\x t -> f (x, t) t))

filterMaybes :: (MonadPlus m) => m (Maybe t) -> m t
filterMaybes e = e >>= \x -> guard (isJust x) >> return (fromJust x)

delay t e = filterMaybes $ adjoinTime (return Nothing `mplus` fmap Just e) >>= \(x, t') -> eventFromList [(x,t+t')]

delays :: (EventStream e, MonadIO e) => e (t, POSIXTime) -> e t
delays e = e >>= \(x, t) -> eventFromList [(x, t)]

----------------------------------------
-- Optimization

{-# INLINE fmap' #-}
fmap' :: (Measurement t -> Measurement u) -> [Measurement t] -> Event u
fmap' f e = scan' (\_ x -> let y = f x in (y, y)) (fmap (const undefined) (head e)) e

{-# INLINE scan' #-}
scan' f x e = delays $ fmap (\x -> (copoint x, time x)) $ scan (\y z -> f y (unsafePerformIO $ assertMeasurement (return z)))
	x
	(adjoinTime (eFromML e))

{-# RULES
"eFromML/map" forall f e. eFromML (map f e) = fmap' f e
"eFromML/scanl" forall f x e. eFromML (scanl f x e) = scan' (\x y -> let z = f x y in (z, z)) x e
"eFromML/mzero" eFromML [] = mzero
  #-}