module Transient.Internals where
import Control.Applicative
import Control.Monad.State
import Data.Dynamic
import qualified Data.Map as M
import Data.Monoid
import Debug.Trace
import System.IO.Unsafe
import Unsafe.Coerce
import Control.Exception
import Control.Concurrent
import Control.Concurrent.STM
import System.Mem.StableName
import Data.Maybe
import GHC.Conc
import Data.List
import Data.IORef
import System.Environment
(!>) :: Show a => b -> a -> b
(!>) x y= trace (show y) x
infixr 0 !>
data TransIO x = Transient {runTrans :: StateT EventF IO (Maybe x)}
type SData= ()
type EventId= Int
type TransientIO= TransIO
data EventF = forall a b . EventF{meffects :: Effects
,event :: Maybe SData
,xcomp :: TransIO a
,fcomp :: [b -> TransIO b]
,mfData :: M.Map TypeRep SData
,mfSequence :: Int
,threadId :: ThreadId
,freeTh :: Bool
,parent :: Maybe EventF
,children :: TVar[EventF]
,maxThread :: Maybe (IORef Int)
}
deriving Typeable
type Effects= forall a b c.TransIO a -> TransIO a -> (a -> TransIO b)
-> StateIO (StateIO (Maybe c) -> StateIO (Maybe c), Maybe a)
instance MonadState EventF TransIO where
get = Transient $ get >>= return . Just
put x= Transient $ put x >> return (Just ())
state f = Transient $ do
s <- get
let ~(a, s') = f s
put s'
return $ Just a
type StateIO= StateT EventF IO
runTransient :: TransIO x -> IO (Maybe x, EventF)
runTransient t= do
th <- myThreadId
let eventf0= EventF baseEffects Nothing empty [] M.empty 0
th False Nothing (unsafePerformIO $ newTVarIO []) Nothing
runStateT (runTrans t) eventf0
getCont :: TransIO EventF
getCont = Transient $ Just <$> get
runCont :: EventF -> StateIO (Maybe a)
runCont (EventF _ _ x fs _ _ _ _ _ _ _)= runTrans $ do
r <- (unsafeCoerce x)
(compose fs r)
getContinuations :: StateIO [a -> TransIO b]
getContinuations= do
EventF _ _ _ fs _ _ _ _ _ _ _ <- get
return $ unsafeCoerce fs
compose []= const empty
compose (f: fs)= \x -> f x >>= compose fs
runClosure :: EventF -> StateIO (Maybe a)
runClosure (EventF _ _ x _ _ _ _ _ _ _ _) = unsafeCoerce $ runTrans x
runContinuation :: EventF -> a -> StateIO (Maybe b)
runContinuation (EventF _ _ _ fs _ _ _ _ _ _ _) =
runTrans . (unsafeCoerce $ compose $ fs)
setContinuation :: TransIO a -> (a -> TransIO b) -> [c -> TransIO c] -> StateIO ()
setContinuation b c fs = do
(EventF eff ev _ _ d e f g h i j) <- get
put $ EventF eff ev b ( unsafeCoerce c: fs) d e f g h i j
runContinuations :: [a -> TransIO b] -> c -> TransIO d
runContinuations fs x= (compose $ unsafeCoerce fs) x
instance Functor TransIO where
fmap f mx=
do
x <- mx
return $ f x
instance Applicative TransIO where
pure a = Transient . return $ Just a
f <*> g = Transient $ do
rf <- liftIO $ newIORef (Nothing,[])
rg <- liftIO $ newIORef (Nothing,[])
fs <- getContinuations
let
hasWait (_:Wait:_)= True
hasWait _ = False
appf k = Transient $ do
Log rec _ full <- getData `onNothing` return (Log False [] [])
liftIO $ writeIORef rf (Just k,full)
(x, full2)<- liftIO $ readIORef rg
when (hasWait full2) $
let full'= head full2: full
in setData $ Log rec full' full'
return $ Just k <*> x
appg x = Transient $ do
Log rec _ full <- getData `onNothing` return (Log False [] [])
liftIO $ writeIORef rg $ (Just x, full)
(k,full1) <- liftIO $ readIORef rf
when (hasWait full) $
let full'= head full: full1
in setData $ Log rec full' full'
return $ k <*> Just x
setContinuation f appf fs
k <- runTrans f
was <- getData `onNothing` return NoRemote
when (was == WasParallel) $ setData NoRemote
Log recovery _ full <- getData `onNothing` return (Log False [] [])
if was== WasRemote || (not recovery && was == NoRemote && isNothing k)
then do
restoreStack fs
return Nothing
else do
liftIO $ writeIORef rf (k,full)
mfdata <- gets mfData
seq <- gets mfSequence
setContinuation g appg fs
x <- runTrans g
Log recovery _ full' <- getData `onNothing` return (Log False [] [])
liftIO $ writeIORef rg (x,full')
restoreStack fs
return $ k <*> x
restoreStack fs=
modify $ \(EventF eff _ f _ a b c d parent children g1) ->
EventF eff Nothing f fs a b c d parent children g1
data IDynamic= IDyns String | forall a.(Read a, Show a,Typeable a) => IDynamic a
instance Show IDynamic where
show (IDynamic x)= show $ show x
show (IDyns s)= show s
instance Read IDynamic where
readsPrec n str= map (\(x,s) -> (IDyns x,s)) $ readsPrec n str
type Recover= Bool
type CurrentPointer= [LogElem]
type LogEntries= [LogElem]
data LogElem= Wait | Exec | Var IDynamic deriving (Read,Show)
data Log= Log Recover CurrentPointer LogEntries deriving Typeable
instance Alternative TransIO where
empty = Transient $ return Nothing
(<|>) = mplus
data RemoteStatus= WasRemote | WasParallel | NoRemote deriving (Typeable, Eq, Show)
instance MonadPlus TransIO where
mzero= empty
mplus x y= Transient $ do
mx <- runTrans x
was <- getData `onNothing` return NoRemote
if was== WasRemote
then return Nothing
else case mx of
Nothing -> runTrans y
justx -> return justx
stop :: Alternative m => m a
stop= empty
infixr 1 <** , <***
(<**) :: TransIO a -> TransIO b -> TransIO a
(<**) ma mb= Transient $ do
fs <- getContinuations
setContinuation ma (\x -> mb >> return x) fs
a <- runTrans ma
runTrans mb
restoreStack fs
return a
atEnd= (<**)
(<***) :: TransIO a -> TransIO b -> TransIO a
(<***) ma mb= Transient $ do
a <- runTrans ma
runTrans mb
return a
atEnd' = (<***)
instance Monoid a => Monoid (TransIO a) where
mappend x y = mappend <$> x <*> y
mempty= return mempty
setEventCont :: TransIO a -> (a -> TransIO b) -> StateIO EventF
setEventCont x f = do
st@(EventF eff e _ fs d n r applic ch rc bs) <- get
let cont= EventF eff e x ( unsafeCoerce f : fs) d n r applic ch rc bs
put cont
return cont
resetEventCont mx _=do
st@(EventF eff e _ fs d n r nr ch rc bs) <- get
let f= \mx -> case mx of
Nothing -> empty
Just x -> (unsafeCoerce $ head fs) x
put $ EventF eff e (f mx) ( tailsafe fs) d n r nr ch rc bs
return id
tailsafe []=[]
tailsafe (x:xs)= xs
baseEffects :: Effects
baseEffects x x' f' = do
c <-setEventCont x' f'
mk <- runTrans x
t <- resetEventCont mk c
return (t,mk)
instance Monad TransIO where
return x = Transient $ return $ Just x
x >>= f = Transient $ do
(t,mk) <- baseEffects x x f
t $ case mk of
Just k -> runTrans (f k)
Nothing -> return Nothing
instance MonadIO TransIO where
liftIO x = Transient $ liftIO x >>= return . Just
waitQSemB sem= atomicModifyIORef sem $ \n -> if n > 0 then(n1,True) else (n,False)
signalQSemB sem= atomicModifyIORef sem $ \n -> (n + 1,())
threads :: Int -> TransIO a -> TransIO a
threads n proc= Transient $ do
msem <- gets maxThread
sem <- liftIO $ newIORef n
modify $ \s -> s{maxThread= Just sem}
r <- runTrans proc
modify $ \s -> s{maxThread = msem}
return r
oneThread :: TransientIO a -> TransientIO a
oneThread comp= do
chs <- liftIO $ newTVarIO []
r <- comp
modify $ \ s -> s{children= chs}
killChilds
return r
addThreads' :: Int -> TransIO ()
addThreads' n= Transient $ do
msem <- gets maxThread
case msem of
Just sem -> liftIO $ modifyIORef sem $ \n' -> n + n'
Nothing -> do
sem <- liftIO (newIORef n)
modify $ \ s -> s{maxThread= Just sem}
return $ Just ()
addThreads n= Transient $ do
msem <- gets maxThread
case msem of
Nothing -> return ()
Just sem -> liftIO $ modifyIORef sem $ \n' -> if n' > n then n' else n
return $ Just ()
freeThreads :: TransIO a -> TransIO a
freeThreads proc= Transient $ do
st <- get
put st{freeTh= True}
r <- runTrans proc
modify $ \s -> s{freeTh= freeTh st}
return r
hookedThreads :: TransIO a -> TransIO a
hookedThreads proc= Transient $ do
st <- get
put st{freeTh= False}
r <- runTrans proc
modify $ \st -> st{freeTh= freeTh st}
return r
killChilds :: TransientIO()
killChilds= Transient $ do
cont <- get
liftIO $ killChildren cont
return $ Just ()
getData :: (MonadState EventF m,Typeable a) => m (Maybe a)
getData = resp where
resp= gets mfData >>= \list ->
case M.lookup ( typeOf $ typeResp resp ) list of
Just x -> return . Just $ unsafeCoerce x
Nothing -> return Nothing
typeResp :: m (Maybe x) -> x
typeResp= undefined
getSData :: Typeable a => TransIO a
getSData= Transient getData
setData :: (MonadState EventF m, Typeable a) => a -> m ()
setData x=
let t= typeOf x in modify $ \st -> st{mfData= M.insert t (unsafeCoerce x) (mfData st)}
setSData :: (MonadState EventF m, Typeable a) => a -> m ()
setSData = setData
delSessionData x=
modify $ \st -> st{mfData= M.delete (typeOf x ) (mfData st)}
delSData :: ( MonadState EventF m,Typeable a) => a -> m ()
delSData= delSessionData
genId :: MonadState EventF m => m Int
genId= do
st <- get
let n= mfSequence st
put st{mfSequence= n+1}
return n
getPrevId :: MonadState EventF m => m Int
getPrevId= do
n <- gets mfSequence
return n
instance Read SomeException where
readsPrec n str=
let [(s , r)]= read str in [(SomeException $ ErrorCall s,r)]
data StreamData a= SMore a | SLast a | SDone | SError SomeException deriving (Typeable, Show,Read)
waitEvents :: IO b -> TransIO b
waitEvents io= do
SMore r <- parallel (SMore <$> io)
return r
waitEvents' :: IO b -> TransIO b
waitEvents' io= do
SMore r <- parallel (SMore <$> io)
return r
async :: IO b -> TransIO b
async io= do
SLast r <- parallel (SLast <$>io)
return r
spawn :: IO b -> TransIO b
spawn io= freeThreads $ do
SMore r <- parallel (SMore <$>io)
return r
parallel :: IO (StreamData b) -> TransIO (StreamData b)
parallel ioaction= Transient $ do
cont <- get
case event cont of
j@(Just _) -> do
put cont{event=Nothing}
return $ unsafeCoerce j
Nothing -> do
liftIO $ loop cont ioaction
setData WasParallel
return Nothing
loop :: EventF -> IO (StreamData t) -> IO ()
loop (cont'@(EventF eff e x fs a b c d _ childs g)) rec = do
chs <- liftIO $ newTVarIO []
let cont = EventF eff e x fs a b c d (Just cont') chs g
iocont dat= do
runStateT (runCont cont) cont{event= Just $ unsafeCoerce dat}
return ()
loop'= forkMaybe False cont $ do
mdat <- threadDelay 0 >> rec
case mdat of
se@(SError _) -> iocont se
SDone -> iocont SDone
last@(SLast _) -> iocont last
more@(SMore _) -> do
forkMaybe False cont $ iocont more
loop'
loop'
return ()
where
forkMaybe True cont proc = forkMaybe' True cont proc
forkMaybe False cont proc = do
dofork <- case maxThread cont of
Nothing -> return True
Just sem -> do
dofork <- waitQSemB sem
if dofork then return True else return False
forkMaybe' dofork cont proc
forkMaybe' dofork cont proc=
if dofork
then do
forkFinally1 (do
th <- myThreadId
hangThread cont' cont{threadId=th}
proc)
$ \me -> do
case me of
Left e -> do
when (fromException e /= Just ThreadKilled)$ liftIO $ print e
killChildren cont
Right _ -> when(not $ freeTh cont') $ do
th <- myThreadId
mparent <- free th cont
return ()
case maxThread cont of
Just sem -> signalQSemB sem
Nothing -> return ()
return ()
else proc
forkFinally1 :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally1 action and_then =
mask $ \restore ->
forkIO $ try (restore action) >>= and_then
free th env= do
if isNothing $ parent env
then return Nothing
else do
let msibling= fmap children $ parent env
case msibling of
Nothing -> return Nothing
Just sibling -> do
found <- atomically $ do
sbs <- readTVar sibling
let (sbs', found) = drop [] th sbs
when found $ writeTVar sibling sbs'
return found
if (not found && isJust (parent env))
then free th $ fromJust $ parent env
else return $ Just env
where
drop processed th []= (processed,False)
drop processed th (ev:evts)| th == threadId ev= (processed ++ evts, True)
| otherwise= drop (ev:processed) th evts
hangThread parent child = when(not $ freeTh parent) $ do
let headpths= children parent
atomically $ do
ths <- readTVar headpths
writeTVar headpths $ child:ths
killChildren cont = do
forkIO $ do
let childs= children cont
ths <- atomically $ do
ths <- readTVar childs
writeTVar childs []
return ths
mapM_ (killThread . threadId) ths
return ()
type EventSetter eventdata response= (eventdata -> IO response) -> IO ()
type ToReturn response= IO response
react
:: Typeable eventdata
=> EventSetter eventdata response
-> ToReturn response
-> TransIO eventdata
react setHandler iob= Transient $ do
cont <- get
mEvData <- getData
case mEvData of
Nothing -> do
liftIO $ setHandler $ \dat ->do
runStateT (setData dat >> runCont cont) cont
iob
setSData WasParallel
return Nothing
Just dat -> do
delSessionData dat
return (Just dat)
getLineRef= unsafePerformIO $ newTVarIO Nothing
roption= unsafePerformIO $ newMVar []
option :: (Typeable b, Show b, Read b, Eq b) =>
b -> String -> TransIO b
option ret message= do
let sret= show ret
liftIO $ putStrLn $ "Enter "++sret++"\tto: " ++ message
liftIO $ modifyMVar_ roption $ \msgs-> return $ sret:msgs
waitEvents $ getLine' (==ret)
liftIO $ putStrLn $ show ret ++ " chosen"
return ret
input :: (Typeable a, Read a) => (a -> Bool) -> TransIO a
input cond= Transient . liftIO . atomically $ do
mr <- readTVar getLineRef
case mr of
Nothing -> retry
Just r ->
case reads1 r of
(s,_):_ -> if cond s
then do
writeTVar getLineRef Nothing
return $ Just s
else return Nothing
_ -> return Nothing
getLine' cond= do
atomically $ do
mr <- readTVar getLineRef
case mr of
Nothing -> retry
Just r ->
case reads1 r of
(s,_):_ -> if cond s
then do
writeTVar getLineRef Nothing
return s
else retry
_ -> retry
reads1 s=x where
x= if typeOf(typeOfr x) == typeOf "" then unsafeCoerce[(s,"")] else readsPrec 0 s
typeOfr :: [(a,String)] -> a
typeOfr = undefined
inputLoop= do
putStrLn "Press end to exit"
inputLoop'
where
inputLoop'= do
r<- getLine
processLine r
inputLoop'
processLine r= do
let rs = breakSlash [] r
mapM_ (\ r -> if (r=="end") then exit' $ Left "terminated by user" else do
threadDelay 1000
atomically . writeTVar getLineRef $ Just r) rs
where
breakSlash :: [String] -> String -> [String]
breakSlash s ""= s
breakSlash res s=
let (r,rest) = span(/= '/') s
in breakSlash (res++[r]) $ tail1 rest
where
tail1 []=[]
tail1 x= tail x
rexit= unsafePerformIO $ newEmptyMVar
stay= do
mr <- takeMVar rexit
case mr of
Right Nothing -> stay
Right (Just r) -> return r
Left msg -> error msg
keep :: TransIO a -> IO a
keep mx = do
forkIO inputLoop
forkIO $ do
runTransient $ mx >> liftIO (putMVar rexit $ Right Nothing)
return ()
threadDelay 100000
args <- getArgs
let path = filter (\arg -> arg !! 0 == '/') args
when (not (null path)) $ do
putStr "Executing: " >> print (head path)
processLine $ head path
stay
keep' :: TransIO a -> IO a
keep' mx = do
forkIO $ do
runTransient $ mx >> liftIO (putMVar rexit $ Right Nothing)
return ()
threadDelay 100000
stay
exit :: a -> TransIO a
exit x= do
liftIO $ putMVar rexit . Right $ Just x
stop
exit' x= liftIO $ putMVar rexit x
onNothing :: Monad m => m (Maybe b) -> m b -> m b
onNothing iox iox'= do
mx <- iox
case mx of
Just x -> return x
Nothing -> iox'