{-# LANGUAGE ScopedTypeVariables, CPP #-}
module Transient.Indeterminism (
choose, choose', chooseStream, collect, collect', group, groupByTime
) where
import Transient.Internals hiding (retry)
import Data.IORef
import Control.Applicative
import Data.Monoid
import Control.Concurrent
import Data.Typeable
import Control.Monad.State
import GHC.Conc
import Data.Time.Clock
import Control.Exception
#ifndef ETA_VERSION
import Data.Atomics
#endif
choose :: [a] -> TransIO a
choose []= empty
choose xs = chooseStream xs >>= checkFinalize
chooseStream :: [a] -> TransIO (StreamData a)
chooseStream []= empty
chooseStream xs = do
evs <- liftIO $ newIORef xs
parallel $ do
es <- atomicModifyIORefCAS evs $ \es -> let tes= tail es in (tes,es)
case es of
[x] -> x `seq` return $ SLast x
x:_ -> x `seq` return $ SMore x
choose' :: [a] -> TransIO a
choose' xs = foldl (<|>) empty $ map (async . return) xs
group :: Int -> TransIO a -> TransIO [a]
group num proc = do
v <- liftIO $ newIORef (0,[])
x <- proc
mn <- liftIO $ atomicModifyIORefCAS v $ \(n,xs) ->
let n'=n +1
in if n'== num
then ((0,[]), Just $ x:xs)
else ((n', x:xs),Nothing)
case mn of
Nothing -> stop
Just xs -> return xs
collect :: Int -> TransIO a -> TransIO [a]
collect n = collect' n 0
collect' :: Int -> Int -> TransIO a -> TransIO [a]
collect' n t search= do
addThreads 1
rv <- liftIO $ newEmptyMVar
results <- liftIO $ newIORef (0,[])
let worker = do
r <- abduce >> search
liftIO $ putMVar rv $ Just r
stop
timer= do
when (t > 0) . async $ threadDelay t >> putMVar rv Nothing
empty
monitor= liftIO loop
where
loop = do
mr <- takeMVar rv
(n',rs) <- readIORef results
case mr of
Nothing -> return rs
Just r -> do
let n''= n' +1
let rs'= r:rs
writeIORef results (n'',rs')
t' <- getCurrentTime
if (n > 0 && n'' >= n)
then return (rs')
else loop
`catch` \(e :: BlockedIndefinitelyOnMVar) ->
readIORef results >>= return . snd
oneThread $ timer <|> worker <|> monitor
burst :: Int -> TransIO a -> TransIO (StreamData a)
burst timeout comp= do
r <- oneThread comp
return (SMore r) <|> (async (threadDelay timeout) >> return SDone)
groupByTime :: Monoid a => Int -> TransIO a -> TransIO a
groupByTime timeout comp= do
v <- liftIO $ newIORef mempty
gather v <|> run v
where
run v = do
x <- comp
liftIO $ atomicModifyIORefCAS v $ \xs -> (xs <> x,())
empty
gather v= waitEvents $ do
threadDelay timeout
atomicModifyIORefCAS v $ \xs -> (mempty , xs)