module Transient.Base where
import Control.Applicative
import Control.Monad.State
import Data.Dynamic
import qualified Data.Map as M
import Data.Monoid
import Debug.Trace
import System.IO.Unsafe
import Unsafe.Coerce
import Control.Exception
import Control.Concurrent
import Control.Concurrent.STM
import System.Mem.StableName
import Data.Maybe
import GHC.Conc
import Data.List
import Data.IORef
import System.Environment
(!>) = const . id
infixr 0 !>
(!!>) = flip trace
infixr 0 !!>
data TransIO x = Transient {runTrans :: StateT EventF IO (Maybe x)}
type SData= ()
type EventId= Int
type TransientIO= TransIO
data EventF = forall a b . EventF{event :: Maybe SData
,xcomp :: TransientIO a
,fcomp :: [b -> TransientIO b]
,mfData :: M.Map TypeRep SData
,mfSequence :: Int
,threadId :: ThreadId
,freeTh :: Bool
,parent :: Maybe EventF
,children :: TVar[EventF]
,maxThread :: Maybe (P Int)
}
deriving Typeable
type P= IORef
newp= newIORef
(=:) n f= liftIO $ atomicModifyIORef n $ \v -> ((f v),())
addr x= show $ unsafePerformIO $ do
st <- makeStableName $! x
return $ hashStableName st
instance MonadState EventF TransientIO where
get= Transient $ get >>= return . Just
put x= Transient $ put x >> return (Just ())
type StateIO= StateT EventF IO
runTransient :: TransientIO x -> IO (Maybe x, EventF)
runTransient t= do
th <- myThreadId
let eventf0= EventF Nothing empty [] M.empty 0
th False Nothing (unsafePerformIO $ newTVarIO []) Nothing
runStateT (runTrans t) eventf0{threadId=th} !> "MAIN="++show th
getCont ::(MonadState EventF m) => m EventF
getCont = get
runCont :: EventF -> StateIO ()
runCont (EventF _ x fs _ _ _ _ _ _ _)= runTrans ((unsafeCoerce x') >>= compose ( fs)) >> return ()
where
x'= do
r<- x
return r
compose []= const empty
compose (f: fs)= \x -> f x >>= compose fs
runClosure :: EventF -> StateIO (Maybe a)
runClosure (EventF _ x _ _ _ _ _ _ _ _) = unsafeCoerce $ runTrans x
runContinuation :: EventF -> a -> StateIO (Maybe b)
runContinuation (EventF _ _ fs _ _ _ _ _ _ _) x=
runTrans $ (unsafeCoerce $ compose $ fs) x
runContinuations :: [a -> TransIO b] -> c -> TransIO d
runContinuations fs x= (compose $ unsafeCoerce fs) x
instance Functor TransientIO where
fmap f mx=
do
x <- mx
return $ f x
instance Applicative TransientIO where
pure a = Transient . return $ Just a
f <*> g = Transient $ do
rf <- liftIO $ newIORef Nothing
rg <- liftIO $ newIORef Nothing
cont@(EventF _ _ fs a b c d peers children g1) <- get
let
appg x = Transient $ do
liftIO $ writeIORef rg $ Just x :: StateIO ()
k <- liftIO $ readIORef rf
return $ k <*> Just x
appf k = Transient $ do
liftIO $ writeIORef rf $ Just k :: StateIO ()
x<- liftIO $ readIORef rg
return $ Just k <*> x
put $ EventF Nothing f (unsafeCoerce appf: fs)
a b c d peers children g1
k <- runTrans f
was <- getSessionData `onNothing` return NoRemote
if was== WasRemote
then return Nothing
else do
liftIO $ writeIORef rf k
mfdata <- gets mfData
put $ EventF Nothing g (unsafeCoerce appg : fs) mfdata b c d peers children g1
x <- runTrans g !> "RUN g"
liftIO $ writeIORef rg x
return $ k <*> x
data IDynamic= IDyns String | forall a.(Read a, Show a,Typeable a) => IDynamic a
instance Show IDynamic where
show (IDynamic x)= show $ show x
show (IDyns s)= show s
instance Read IDynamic where
readsPrec n str= map (\(x,s) -> (IDyns x,s)) $ readsPrec n str
type Recover= Bool
type CurrentPointer= [LogElem]
type LogEntries= [LogElem]
data LogElem= WaitRemote | Exec | Step IDynamic deriving (Read,Show)
data Log= Log Recover CurrentPointer LogEntries deriving Typeable
instance Alternative TransientIO where
empty = Transient $ return Nothing
(<|>) = mplus
data RemoteStatus= WasRemote | NoRemote deriving (Typeable, Eq)
instance MonadPlus TransientIO where
mzero= empty
mplus x y= Transient $ do
mx <- runTrans x
was <- getSessionData `onNothing` return NoRemote
if was== WasRemote
then return Nothing
else case mx of
Nothing -> runTrans y
justx -> return justx
stop :: TransientIO a
stop= Control.Applicative.empty
instance Monoid a => Monoid (TransientIO a) where
mappend x y = mappend <$> x <*> y
mempty= return mempty
setEventCont :: TransientIO a -> (a -> TransientIO b) -> StateIO ()
setEventCont x f = do
st@(EventF e _ fs d n r applic ch rc bs) <- get
put $ EventF e x ( unsafeCoerce f : fs) d n r applic ch rc bs
resetEventCont :: Maybe a -> StateIO ()
resetEventCont mx =do
st@(EventF e _ fs d n r nr ch rc bs) <- get
let f= \mx -> case mx of
Nothing -> empty
Just x -> (unsafeCoerce $ head fs) x
put $ EventF e (f mx) ( tailsafe fs)d n r nr ch rc bs
where
tailsafe []=[]
tailsafe (x:xs)= xs
instance Monad TransientIO where
return x = Transient $ return $ Just x
x >>= f = Transient $ do
setEventCont x f
mk <- runTrans x
resetEventCont mk
case mk of
Just k -> do
runTrans $ f k
Nothing -> return Nothing
instance MonadIO TransientIO where
liftIO x = Transient $ liftIO x >>= return . Just
waitQSemB sem= atomicModifyIORef sem $ \n -> if n > 0 then(n1,True) else (n,False)
signalQSemB sem= atomicModifyIORef sem $ \n -> (n + 1,())
threads :: Int -> TransientIO a -> TransientIO a
threads n proc= Transient $ do
msem <- gets maxThread
sem <- liftIO $ newIORef n
modify $ \s -> s{maxThread= Just sem}
r <- runTrans proc
modify $ \s -> s{maxThread = msem}
return r
oneThread :: TransientIO a -> TransientIO a
oneThread comp= do
chs <- liftIO $ newTVarIO []
r <- comp
modify $ \ s -> s{children= chs}
killChilds
return r
addThreads' :: Int -> TransIO ()
addThreads' n= do
msem <- gets maxThread
case msem of
Just sem -> liftIO $ modifyIORef sem $ \n' -> n + n'
Nothing -> do
sem <- liftIO (newIORef n)
modify $ \ s -> s{maxThread= Just sem}
addThreads n= do
msem <- gets maxThread
case msem of
Nothing -> return ()
Just sem -> liftIO $ modifyIORef sem $ \n' -> if n' > n then n' else n
getNonUsedThreads :: TransIO (Maybe Int)
getNonUsedThreads= do
msem <- gets maxThread
case msem of
Just sem -> liftIO $ Just <$> readIORef sem
Nothing -> return Nothing
freeThreads :: TransientIO a -> TransientIO a
freeThreads proc= Transient $ do
st <- get
put st{freeTh= True}
r <- runTrans proc
modify $ \st -> st{freeTh= freeTh st}
return r
hookedThreads :: TransientIO a -> TransientIO a
hookedThreads proc= Transient $ do
st <- get
put st{freeTh= False}
r <- runTrans proc
modify $ \st -> st{freeTh= freeTh st}
return r
killChilds :: TransientIO()
killChilds= Transient $ do
cont <- get
liftIO $ killChildren cont
return $ Just ()
getSessionData :: (MonadState EventF m,Typeable a) => m (Maybe a)
getSessionData = resp where
resp= gets mfData >>= \list ->
case M.lookup ( typeOf $ typeResp resp ) list of
Just x -> return . Just $ unsafeCoerce x
Nothing -> return $ Nothing
typeResp :: m (Maybe x) -> x
typeResp= undefined
getSData :: Typeable a => TransIO a
getSData= Transient getSessionData
setSessionData :: (MonadState EventF m, Typeable a) => a -> m ()
setSessionData x= do
let t= typeOf x in modify $ \st -> st{mfData= M.insert t (unsafeCoerce x) (mfData st)}
setSData :: ( MonadState EventF m,Typeable a) => a -> m ()
setSData= setSessionData
delSessionData x=
modify $ \st -> st{mfData= M.delete (typeOf x ) (mfData st)}
delSData :: ( MonadState EventF m,Typeable a) => a -> m ()
delSData= delSessionData
genNewId :: MonadIO m => MonadState EventF m => m Int
genNewId= do
st <- get
let n= mfSequence st
put $ st{mfSequence= n+1}
return n
refSequence :: IORef Int
refSequence= unsafePerformIO $ newp 0
data StreamData a= SMore a | SLast a | SDone | SError String deriving (Typeable, Show,Read)
waitEvents :: IO b -> TransientIO b
waitEvents io= do
SMore r <- parallel (SMore <$> io)
killChilds
return r
waitEvents' :: IO b -> TransientIO b
waitEvents' io= do
SMore r <- parallel (SMore <$> io)
return r
async :: IO b -> TransientIO b
async io= do
SLast r <- parallel (SLast <$>io)
killChilds
return r
spawn :: IO b -> TransientIO b
spawn io= freeThreads $ do
SMore r <- parallel (SMore <$>io)
return r
parallel :: IO (StreamData b) -> TransientIO (StreamData b)
parallel ioaction= Transient $ do
cont <- getCont
case event cont of
j@(Just _) -> do
put cont{event=Nothing}
return $ unsafeCoerce j
Nothing -> do
liftIO $ loop cont ioaction
return Nothing
loop :: EventF -> IO (StreamData t) -> IO ()
loop (cont'@(EventF e x fs a b c d peers childs g)) rec = do
chs <- liftIO $ newTVarIO []
let cont = EventF e x fs a b c d (Just cont') chs g
iocont dat= do
runStateT (runCont cont) cont{event= Just $ unsafeCoerce dat}
return ()
loop'= forkMaybe False cont $ do
mdat <- rec
case mdat of
se@(SError _) -> iocont se
SDone -> iocont SDone
last@(SLast _) -> iocont last
more@(SMore _) -> do
forkMaybe False cont $ iocont more
loop'
loop'
return ()
where
forkMaybe True cont proc = forkMaybe' True cont proc
forkMaybe False cont proc = do
dofork <- case maxThread cont of
Nothing -> return True
Just sem -> do
dofork <- waitQSemB sem
if dofork then return True else return False
forkMaybe' dofork cont proc
forkMaybe' dofork cont proc=
if dofork
then do
th <- forkFinally1 proc $ \me -> do
case me of
Left e -> do
when (fromException e /= Just ThreadKilled)$ liftIO $ print e
killChildren cont !> "KILL RECEIVED" ++ (show $ unsafePerformIO myThreadId)
Right _ -> when(not $ freeTh cont') $ do
th <- myThreadId
mparent <- free th cont
case mparent of
Nothing -> return()
Just parent -> atomically $ do
chs' <- readTVar $ children cont
chs <- (readTVar $ children parent)
writeTVar (children parent)$ chs ++ chs'
return ()
case maxThread cont of
Just sem -> signalQSemB sem
Nothing -> return ()
addThread cont' cont{threadId=th}
else proc
forkFinally1 :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally1 action and_then =
mask $ \restore ->
forkIO $ try (restore action) >>= and_then
free th env= do
if isNothing $ parent env
then return Nothing !> show th ++ " orphan"
else do
let msibling= fmap children $ parent env
case msibling of
Nothing -> return Nothing
Just sibling -> do
found <- atomically $ do
sbs <- readTVar sibling
let (sbs', found) = drop [] th sbs
when found $ writeTVar sibling sbs'
return found
if (not found && isJust (parent env))
then free th $ fromJust $ parent env
else return $ Just env
where
drop processed th []= (processed,False)
drop processed th (ev:evts)| th == threadId ev= (processed ++ evts, True)
| otherwise= drop (ev:processed) th evts
addThread parent child = when(not $ freeTh parent) $ do
let headpths= children parent
atomically $ do
ths <- readTVar headpths
writeTVar headpths $ child:ths
killChildren cont = do
forkIO $ do
let childs= children cont !> "killChildren list= "++ addr (children cont)
ths <- atomically $ do
ths <- readTVar childs
writeTVar childs []
return ths
mapM_ (killThread . threadId) ths !> "KILLEVENT " ++ show (map threadId ths)
return ()
type EventSetter eventdata response= (eventdata -> IO response) -> IO ()
type ToReturn response= IO response
react
:: Typeable eventdata
=> EventSetter eventdata response
-> ToReturn response
-> TransientIO eventdata
react setHandler iob= Transient $ do
cont <- getCont
mEvData <- getSessionData
case mEvData of
Nothing -> do
liftIO $ setHandler $ \dat ->do
runStateT (setSData dat >> runCont cont) cont
iob
return Nothing
Just dat -> delSessionData dat >> return (Just dat)
getLineRef= unsafePerformIO $ newTVarIO Nothing
roption= unsafePerformIO $ newMVar []
option :: (Typeable b, Show b, Read b, Eq b) =>
b -> [Char] -> TransientIO b
option ret message= do
let sret= show ret
liftIO $ putStrLn $ "Enter "++sret++"\tto: " ++ message
liftIO $ modifyMVar_ roption $ \msgs-> return $ sret:msgs
waitEvents $ getLine' (==ret)
liftIO $ putStrLn $ show ret ++ " chosen"
return ret
input :: (Typeable a, Read a) => (a -> Bool) -> TransientIO a
input cond= Transient . liftIO . atomically $ do
mr <- readTVar getLineRef
case mr of
Nothing -> retry
Just r ->
case reads1 r of
(s,_):_ -> if cond s
then do
writeTVar getLineRef Nothing
return $ Just s
else return Nothing
_ -> return Nothing
getLine' cond= do
atomically $ do
mr <- readTVar getLineRef
case mr of
Nothing -> retry
Just r ->
case reads1 r of
(s,_):_ -> if cond s
then do
writeTVar getLineRef Nothing
return s
else retry
_ -> retry
reads1 s=x where
x= if typeOf(typeOfr x) == typeOf "" then unsafeCoerce[(s,"")] else readsPrec 0 s
typeOfr :: [(a,String)] -> a
typeOfr = undefined
inputLoop= do
putStrLn "Press end to exit"
inputLoop'
where
inputLoop'= do
r<- getLine
processLine r
inputLoop'
processLine r= do
let rs = breakSlash [] r
mapM_ (\ r -> if (r=="end") then putMVar rexit () else do
threadDelay 1000
atomically . writeTVar getLineRef $ Just r) rs
where
breakSlash :: [String] -> String -> [String]
breakSlash s ""= s
breakSlash res s=
let (r,rest) = span(/= '/') s
in breakSlash (res++[r]) $ tail1 rest
where
tail1 []=[]
tail1 x= tail x
rexit= unsafePerformIO newEmptyMVar
stay= takeMVar rexit
keep :: TransIO a -> IO a
keep mx = do
forkIO $ inputLoop
forkIO $ runTransient mx >> return ()
threadDelay 100000
args <- getArgs
let path = filter (\arg -> arg !! 0 == '/') args
when (not (null path)) $ do
putStr "Executing: " >> print (head path)
processLine $ head path
stay
keep' :: TransIO a -> IO a
keep' mx = do
forkIO $ runTransient mx >> return ()
stay
exit :: TransientIO a
exit= do
liftIO $ putMVar rexit True
stop
onNothing :: Monad m => m (Maybe b) -> m b -> m b
onNothing iox iox'= do
mx <- iox
case mx of
Just x -> return x
Nothing -> iox'