{-# LANGUAGE ScopedTypeVariables, CPP #-}
module Transient.Indeterminism (
choose, choose', collect, collect', group, groupByTime
) where
import Transient.Internals hiding (retry)
import Data.IORef
import Control.Applicative
import Data.Monoid
import Control.Concurrent
import Data.Typeable
import Control.Monad.State
import GHC.Conc
import Data.Time.Clock
import Control.Exception
#ifndef ETA_VERSION
import Data.Atomics
#endif
choose :: Show a => [a] -> TransIO a
choose []= empty
choose xs = do
evs <- liftIO $ newIORef xs
r <- parallel $ do
es <- atomicModifyIORefCAS evs $ \es -> let tes= tail es in (tes,es)
case es of
[x] -> x `seq` return $ SLast x
x:_ -> x `seq` return $ SMore x
checkFinalize r
choose' :: [a] -> TransIO a
choose' xs = foldl (<|>) empty $ map (async . return) xs
group :: Int -> TransIO a -> TransIO [a]
group num proc = do
v <- liftIO $ newIORef (0,[])
x <- proc
mn <- liftIO $ atomicModifyIORefCAS v $ \(n,xs) ->
let n'=n +1
in if n'== num
then ((0,[]), Just $ x:xs)
else ((n', x:xs),Nothing)
case mn of
Nothing -> stop
Just xs -> return xs
groupByTime :: Integer -> TransIO a -> TransIO [a]
groupByTime time proc = do
t <- liftIO getCurrentTime
v <- liftIO $ newIORef (0,t,[])
x <- proc
t' <- liftIO getCurrentTime
mn <- liftIO $ atomicModifyIORefCAS v $ \(n,t,xs) -> let n'=n +1
in
if diffUTCTime t' t < fromIntegral time
then ((n',t, x:xs),Nothing)
else ((0 ,t',[]), Just $ x:xs)
case mn of
Nothing -> stop
Just xs -> return xs
collect :: Int -> TransIO a -> TransIO [a]
collect n = collect' n 0
collect' :: Int -> Int -> TransIO a -> TransIO [a]
collect' n t search= do
addThreads 1
rv <- liftIO $ newEmptyMVar
results <- liftIO $ newIORef (0,[])
let worker = do
r <- abduce >> search
liftIO $ putMVar rv $ Just r
stop
timer= do
when (t > 0) . async $ threadDelay t >>putMVar rv Nothing
empty
monitor= liftIO loop
where
loop = do
mr <- takeMVar rv
(n',rs) <- readIORef results
case mr of
Nothing -> return rs
Just r -> do
let n''= n' +1
let rs'= r:rs
writeIORef results (n'',rs')
t' <- getCurrentTime
if (n > 0 && n'' >= n)
then return (rs')
else loop
`catch` \(e :: BlockedIndefinitelyOnMVar) ->
readIORef results >>= return . snd
r <- oneThread $ worker <|> timer <|> monitor
return r