{-# LANGUAGE DeriveDataTypeable , ExistentialQuantification, OverloadedStrings
,ScopedTypeVariables, StandaloneDeriving, RecordWildCards, FlexibleContexts, CPP
,GeneralizedNewtypeDeriving #-}
module Transient.Move.Internals where
import Transient.Internals
import Transient.Parse
import Transient.Logged
import Transient.Indeterminism
import Transient.EVars
import Data.Typeable
import Control.Applicative
import System.IO.Error
#ifndef ghcjs_HOST_OS
import Network
import Network.URI
import qualified Network.Socket as NS
import qualified Network.BSD as BSD
import qualified Network.WebSockets as NWS
import qualified Network.WebSockets.Connection as WS
import Network.WebSockets.Stream hiding(parse)
import qualified Data.ByteString as B(ByteString,concat)
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy.Internal as BLC
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BS
import Network.Socket.ByteString as SBS(sendMany,sendAll,recv)
import qualified Network.Socket.ByteString.Lazy as SBSL
import Data.CaseInsensitive(mk)
import Data.Char(isSpace)
#else
import JavaScript.Web.WebSocket
import qualified JavaScript.Web.MessageEvent as JM
import GHCJS.Prim (JSVal)
import GHCJS.Marshal(fromJSValUnchecked)
import qualified Data.JSString as JS
import JavaScript.Web.MessageEvent.Internal
import GHCJS.Foreign.Callback.Internal (Callback(..))
import qualified GHCJS.Foreign.Callback as CB
import Data.JSString (JSString(..), pack)
#endif
import Control.Monad.State
import Control.Exception hiding (onException,try)
import Data.Maybe
import System.IO.Unsafe
import Control.Concurrent.STM as STM
import Control.Concurrent.MVar
import Data.Monoid
import qualified Data.Map as M
import Data.List (nub,(\\))
import Data.IORef
import Control.Concurrent
import Data.String
import System.Mem.StableName
import Unsafe.Coerce
#ifdef ghcjs_HOST_OS
type HostName = String
newtype PortID = PortNumber Int deriving (Read, Show, Eq, Typeable)
#endif
data Node= Node{ nodeHost :: HostName
, nodePort :: Int
, connection :: Maybe (MVar Pool)
, nodeServices :: Service
}
deriving (Typeable)
instance Ord Node where
compare node1 node2= compare (nodeHost node1,nodePort node1)(nodeHost node2,nodePort node2)
newtype Cloud a= Cloud {runCloud' ::TransIO a} deriving (Functor,Monoid,Applicative,Alternative, Monad, Num, Fractional, MonadState EventF)
runCloud :: Cloud a -> TransIO a
runCloud x= do
closRemote <- getSData <|> return (Closure 0)
runCloud' x <*** setData closRemote
#ifndef ghcjs_HOST_OS
{-# NOINLINE tlsHooks #-}
tlsHooks ::IORef (SData -> BS.ByteString -> IO ()
,SData -> IO B.ByteString
,NS.Socket -> BS.ByteString -> TransIO ()
,String -> NS.Socket -> BS.ByteString -> TransIO ())
tlsHooks= unsafePerformIO $ newIORef
( notneeded
, notneeded
, \_ i -> tlsNotSupported i
, \_ _ _-> return())
where
notneeded= error "TLS hook function called"
tlsNotSupported input = do
if ((not $ BL.null input) && BL.head input == 0x16)
then do
conn <- getSData
sendRaw conn $ BS.pack $ "HTTP/1.0 525 SSL Handshake Failed\r\nContent-Length: 0\nConnection: close\r\n\r\n"
else return ()
(sendTLSData,recvTLSData,maybeTLSServerHandshake,maybeClientTLSHandshake)= unsafePerformIO $ readIORef tlsHooks
#endif
local :: Loggable a => TransIO a -> Cloud a
local = Cloud . logged
runCloudIO :: Typeable a => Cloud a -> IO (Maybe a)
runCloudIO (Cloud mx)= keep mx
runCloudIO' :: Typeable a => Cloud a -> IO (Maybe a)
runCloudIO' (Cloud mx)= keep' mx
onAll :: TransIO a -> Cloud a
onAll = Cloud
lazy :: TransIO a -> Cloud a
lazy mx= onAll $ getCont >>= \st -> Transient $
return $ unsafePerformIO $ runStateT (runTrans mx) st >>= return .fst
fixRemote mx= do
r <- lazy mx
fixClosure
return r
fixClosure= atRemote $ local $ async $ return ()
loggedc :: Loggable a => Cloud a -> Cloud a
loggedc (Cloud mx)= Cloud $ do
closRemote <- getSData <|> return (Closure 0 )
logged mx <*** setData closRemote
loggedc' :: Loggable a => Cloud a -> Cloud a
loggedc' (Cloud mx)= Cloud $ logged mx
lliftIO :: Loggable a => IO a -> Cloud a
lliftIO= local . liftIO
localIO :: Loggable a => IO a -> Cloud a
localIO= lliftIO
fullStop :: TransIO stop
fullStop= setData WasRemote >> stop
beamTo :: Node -> Cloud ()
beamTo node = wormhole node teleport
forkTo :: Node -> Cloud ()
forkTo node= beamTo node <|> return()
callTo :: Loggable a => Node -> Cloud a -> Cloud a
callTo node remoteProc= wormhole node $ atRemote remoteProc
#ifndef ghcjs_HOST_OS
callTo' :: (Show a, Read a,Typeable a) => Node -> Cloud a -> Cloud a
callTo' node remoteProc= do
mynode <- local $ getNodes >>= return . head
beamTo node
r <- remoteProc
beamTo mynode
return r
#endif
atRemote :: Loggable a => Cloud a -> Cloud a
atRemote proc= loggedc' $ do
was <- lazy $ getSData <|> return NoRemote
teleport
r <- Cloud $ runCloud' proc <** setData WasRemote
teleport
lazy $ setData was
return r
runAt :: Loggable a => Node -> Cloud a -> Cloud a
runAt= callTo
single :: TransIO a -> TransIO a
single f= do
cutExceptions
Connection{closChildren=rmap} <- getSData <|> error "single: only works within a wormhole"
mapth <- liftIO $ readIORef rmap
id <- liftIO $ f `seq` makeStableName f >>= return . hashStableName
case M.lookup id mapth of
Just tv -> liftIO $ killBranch' tv
Nothing -> return ()
tv <- get
f <** do
id <- liftIO $ makeStableName f >>= return . hashStableName
liftIO $ modifyIORef rmap $ \mapth -> M.insert id tv mapth
unique :: a -> TransIO ()
unique f= do
Connection{closChildren=rmap} <- getSData <|> error "unique: only works within a connection. Use wormhole"
mapth <- liftIO $ readIORef rmap
id <- liftIO $ f `seq` makeStableName f >>= return . hashStableName
let mx = M.lookup id mapth
case mx of
Just _ -> empty
Nothing -> do
tv <- get
liftIO $ modifyIORef rmap $ \mapth -> M.insert id tv mapth
wormhole :: Loggable a => Node -> Cloud a -> Cloud a
wormhole node (Cloud comp) = local $ Transient $ do
moldconn <- getData :: StateIO (Maybe Connection)
mclosure <- getData :: StateIO (Maybe Closure)
Log rec _ _ _<- getData `onNothing` return (Log False [][] 0)
if not rec
then runTrans $ (do
conn <- mconnect node
liftIO $ writeIORef (remoteNode conn) $ Just node
setData conn{calling= True}
setData $ (Closure 0 )
comp )
<*** do
when (isJust moldconn) . setData $ fromJust moldconn
when (isJust mclosure) . setData $ fromJust mclosure
else do
let conn = fromMaybe (error "wormhole: no connection in remote node") moldconn
setData $ conn{calling= False}
runTrans $ comp
<*** do when (isJust mclosure) . setData $ fromJust mclosure
#ifndef ghcjs_HOST_OS
type JSString= String
pack= id
#endif
data CloudException = CloudException Node IdClosure String deriving (Typeable, Show, Read)
instance Exception CloudException
teleport :: Cloud ()
teleport = local $ do
Transient $ do
cont <- get
Log rec log fulLog closLocal <- getData `onNothing` return (Log False [][] 0)
conn@Connection{connData=contype, localClosures= localClosures,calling= calling} <- getData
`onNothing` error "teleport: No connection defined: use wormhole"
if not rec
then do
case contype of
Just Self -> runTrans $ do
setData $ if (not calling) then WasRemote else WasParallel
abduce !> "SELF"
liftIO $ do
remote <- readIORef $ remoteNode conn
writeIORef (myNode conn) $ fromMaybe (error "teleport: no connection?") remote
_ -> do
Closure closRemote <- getData `onNothing` return (Closure 0 )
return () !> ("TELEPORTTTTTTTTTT", closLocal)
let tosend= reverse $ if closRemote==0 then fulLog else log
liftIO $ modifyMVar_ localClosures $ \map -> return $ M.insert closLocal cont map
runTrans $ msend conn $ SMore $ ClosureData closRemote closLocal tosend
!> ("teleport sending", SMore (unsafePerformIO $ readIORef $ remoteNode conn,closRemote,closLocal,tosend))
!> "--------->------>---------->"
setData $ if (not calling) then WasRemote else WasParallel
return Nothing
else do
delData WasRemote
return $ Just ()
return ()
reportBack :: TransIO ()
reportBack= onException $ \(e :: SomeException) -> do
conn<- getData `onNothing` error "reportBack: No connection defined: use wormhole"
Closure closRemote <- getData `onNothing` error "teleport: no closRemote"
node <- getMyNode
let msg= SError $ toException $ ErrorCall $ show $ show $ CloudException node closRemote $ show e
msend conn msg !> "MSEND"
copyData def = do
r <- local getSData <|> return def
onAll $ setData r
return r
putMailbox :: Typeable val => val -> TransIO ()
putMailbox = putMailbox' (0::Int)
putMailbox' :: (Typeable key, Ord key, Typeable val) => key -> val -> TransIO ()
putMailbox' idbox dat= do
let name= MailboxId idbox $ typeOf dat
Connection{comEvent= mv} <- getData `onNothing` errorMailBox
mbs <- liftIO $ readIORef mv
let mev = M.lookup name mbs
case mev of
Nothing ->newMailbox name >> putMailbox' idbox dat
Just ev -> writeEVar ev $ unsafeCoerce dat
newMailbox :: MailboxId -> TransIO ()
newMailbox name= do
Connection{comEvent= mv} <- getData `onNothing` errorMailBox
ev <- newEVar
liftIO $ atomicModifyIORef mv $ \mailboxes -> (M.insert name ev mailboxes,())
errorMailBox= error "MailBox: No connection open. Use wormhole"
getMailbox :: Typeable val => TransIO val
getMailbox = getMailbox' (0 :: Int)
getMailbox' :: (Typeable key, Ord key, Typeable val) => key -> TransIO val
getMailbox' mboxid = x where
x = do
let name= MailboxId mboxid $ typeOf $ typeOf1 x
Connection{comEvent= mv} <- getData `onNothing` errorMailBox
mbs <- liftIO $ readIORef mv
let mev = M.lookup name mbs
case mev of
Nothing ->newMailbox name >> getMailbox' mboxid
Just ev ->unsafeCoerce $ readEVar ev
typeOf1 :: TransIO a -> a
typeOf1 = undefined
cleanMailbox :: Typeable a => a -> TransIO ()
cleanMailbox = cleanMailbox' 0
cleanMailbox' :: Typeable a => Int -> a -> TransIO ()
cleanMailbox' mboxid witness= do
let name= MailboxId mboxid $ typeOf witness
Connection{comEvent= mv} <- getData `onNothing` error "getMailBox: accessing network events out of listen"
mbs <- liftIO $ readIORef mv
let mev = M.lookup name mbs
case mev of
Nothing -> return()
Just ev -> do cleanEVar ev
liftIO $ atomicModifyIORef mv $ \mbs -> (M.delete name mbs,())
clustered :: Loggable a => Cloud a -> Cloud a
clustered proc= callNodes (<|>) empty proc
mclustered :: (Monoid a, Loggable a) => Cloud a -> Cloud a
mclustered proc= callNodes (<>) mempty proc
callNodes op init proc= loggedc' $ do
nodes <- local getEqualNodes
callNodes' nodes op init proc
callNodes' nodes op init proc= loggedc' $ foldr op init $ map (\node -> runAt node proc) nodes
#ifndef ghcjs_HOST_OS
sendRaw (Connection _ _ _ (Just (Node2Web sconn )) _ _ _ _ _ _) r=
liftIO $ WS.sendTextData sconn r
sendRaw (Connection _ _ _ (Just (Node2Node _ sock _)) _ _ blocked _ _ _) r=
liftIO $ withMVar blocked $ const $ SBS.sendMany sock
(BL.toChunks r )
sendRaw (Connection _ _ _(Just (TLSNode2Node ctx )) _ _ blocked _ _ _) r=
liftIO $ withMVar blocked $ const $ sendTLSData ctx r
#else
sendRaw (Connection _ _ _ (Just (Web2Node sconn)) _ _ blocked _ _ _) r= liftIO $
withMVar blocked $ const $ JavaScript.Web.WebSocket.send r sconn
#endif
sendRaw _ _= error "No connection stablished"
type LengthFulLog= Int
data NodeMSG= ClosureData IdClosure IdClosure CurrentPointer
| RelayMSG Node Node (StreamData NodeMSG)
deriving (Typeable, Read, Show)
msend :: Connection -> StreamData NodeMSG -> TransIO ()
msend (Connection _ _ _ (Just Self) _ _ _ _ _ _) r= return ()
#ifndef ghcjs_HOST_OS
msend (Connection _ _ _ (Just (Node2Node _ sock _)) _ _ blocked _ _ _) r=do
liftIO $ withMVar blocked $ const $ SBS.sendAll sock $ BC.pack (show r) !> ("N2N SEND", r)
msend (Connection _ _ _ (Just (TLSNode2Node ctx)) _ _ _ _ _ _) r=
liftIO $ sendTLSData ctx $ BS.pack (show r) !> "TLS SEND"
msend (Connection _ _ _ (Just (Node2Web sconn)) _ _ _ _ _ _) r=liftIO $
WS.sendTextData sconn $ BS.pack (show r) !> "websockets send"
msend(Connection _ myNode _ (Just (Relay conn remote )) _ _ _ _ _ _) r= do
origin <- liftIO $ readIORef myNode
msend conn $ SMore $ RelayMSG origin remote r
#else
msend (Connection _ _ remoten (Just (Web2Node sconn)) _ _ blocked _ _ _) r= liftIO $ do
withMVar blocked $ const $ JavaScript.Web.WebSocket.send (JS.pack $ show r) sconn !> "MSEND SOCKET"
#endif
msend (Connection _ _ _ Nothing _ _ _ _ _ _) _= error "msend out of connection context: use wormhole to connect"
mread :: Loggable a => Connection -> TransIO (StreamData a)
#ifdef ghcjs_HOST_OS
mread (Connection _ _ _ (Just (Web2Node sconn)) _ _ _ _ _ _)= wsRead sconn
wsRead :: Loggable a => WebSocket -> TransIO a
wsRead ws= do
dat <- react (hsonmessage ws) (return ())
case JM.getData dat of
JM.StringData str -> return (read' $ JS.unpack str)
!> ("Browser webSocket read", str) !> "<------<----<----<------"
JM.BlobData blob -> error " blob"
JM.ArrayBufferData arrBuffer -> error "arrBuffer"
wsOpen :: JS.JSString -> TransIO WebSocket
wsOpen url= do
ws <- liftIO $ js_createDefault url
react (hsopen ws) (return ())
return ws
foreign import javascript safe
"window.location.hostname"
js_hostname :: JSVal
foreign import javascript safe
"window.location.pathname"
js_pathname :: JSVal
foreign import javascript safe
"window.location.protocol"
js_protocol :: JSVal
foreign import javascript safe
"(function(){var res=window.location.href.split(':')[2];if (res === undefined){return 80} else return res.split('/')[0];})()"
js_port :: JSVal
foreign import javascript safe
"$1.onmessage =$2;"
js_onmessage :: WebSocket -> JSVal -> IO ()
getWebServerNode :: TransIO Node
getWebServerNode = liftIO $ do
h <- fromJSValUnchecked js_hostname
p <- fromIntegral <$> (fromJSValUnchecked js_port :: IO Int)
createNode h p
hsonmessage ::WebSocket -> (MessageEvent ->IO()) -> IO ()
hsonmessage ws hscb= do
cb <- makeCallback MessageEvent hscb
js_onmessage ws cb
foreign import javascript safe
"$1.onopen =$2;"
js_open :: WebSocket -> JSVal -> IO ()
foreign import javascript safe
"$1.readyState"
js_readystate :: WebSocket -> Int
newtype OpenEvent = OpenEvent JSVal deriving Typeable
hsopen :: WebSocket -> (OpenEvent ->IO()) -> IO ()
hsopen ws hscb= do
cb <- makeCallback OpenEvent hscb
js_open ws cb
makeCallback :: (JSVal -> a) -> (a -> IO ()) -> IO JSVal
makeCallback f g = do
Callback cb <- CB.syncCallback1 CB.ContinueAsync (g . f)
return cb
foreign import javascript safe
"new WebSocket($1)" js_createDefault :: JS.JSString -> IO WebSocket
#else
mread (Connection _ _ _ (Just (Node2Node _ _ _)) _ _ _ _ _ _) = parallelReadHandler
mread (Connection _ _ _ (Just (TLSNode2Node _ )) _ _ _ _ _ _) = parallelReadHandler
mread (Connection _ _ _ (Just (Node2Web sconn )) _ _ _ _ _ _)=
parallel $ do
s <- WS.receiveData sconn
return . read' $ BS.unpack s
!> ("WS MREAD RECEIVED ----<----<------<--------", s)
mread (Connection _ _ _ (Just (Relay conn _ )) _ _ _ _ _ _)=
mread conn
parallelReadHandler :: Loggable a => TransIO (StreamData a)
parallelReadHandler= do
str <- giveData :: TransIO BS.ByteString
r <- choose $ readStream str
return r
!> ("parallel read handler read", r)
!> "<-------<----------<--------<----------"
where
readStream :: (Typeable a, Read a) => BS.ByteString -> [StreamData a]
readStream s= readStream1 $ BS.unpack s
where
readStream1 s=
let [(x,r)] = reads s
in x : readStream1 r
getWebServerNode :: TransIO Node
getWebServerNode = getNodes >>= return . head
#endif
mclose :: Connection -> IO ()
#ifndef ghcjs_HOST_OS
mclose (Connection _ _ _
(Just (Node2Node _ sock _ )) _ _ _ _ _ _)= NS.close sock
mclose (Connection _ _ _
(Just (Node2Web sconn ))
_ _ _ _ _ _)=
WS.sendClose sconn ("closemsg" :: BS.ByteString)
#else
mclose (Connection _ _ _ (Just (Web2Node sconn)) _ _ blocked _ _ _)=
JavaScript.Web.WebSocket.close Nothing Nothing sconn
#endif
mconnect :: Node -> TransIO Connection
mconnect node'= do
node <- fixNode node'
nodes <- getNodes
return () !> ("mconnnect", nodePort node)
let fnode = filter (==node) nodes
case fnode of
[] -> mconnect1 node
[node'@(Node _ _ pool _)] -> do
plist <- liftIO $ readMVar $ fromJust pool
case plist of
(handle:_) -> do
delData $ Closure undefined
return handle
!> ("REUSED!", node)
_ -> mconnect1 node'
where
#ifndef ghcjs_HOST_OS
mconnect1 (node@(Node host port _ _))= do
return () !> ("MCONNECT1",host,port,nodeServices node)
(conn,parseContext) <- checkSelf node <|>
timeout 1000000 (connectNode2Node host port) <|>
timeout 1000000 (connectWebSockets host port) <|>
checkRelay <|>
(throwt $ ConnectionError "" node)
setState conn
setState parseContext
liftIO $ writeIORef (remoteNode conn) $ Just node
liftIO $ modifyMVar_ (fromJust $ connection node) . const $ return [conn]
addNodes [node]
case connData conn of
Just Self -> return()
_ -> watchConnection
delData $ Closure undefined
return conn
where
checkSelf node= do
node' <- getMyNode
if node /= node'
then empty
else do
conn<- case connection node of
Nothing -> error "checkSelf error"
Just ref -> do
cnn <- getSData <|> error "chechself: no connection"
rnode <- liftIO $ newIORef node
conn <- defConnection >>= \c -> return c{myNode= rnode, comEvent=comEvent cnn,connData= Just Self} !> "DEFF1"
liftIO $ withMVar ref $ const $ return [conn]
return conn
return (conn,(ParseContext (error "checkSelf parse error") (error "checkSelf parse error")
:: ParseContext BS.ByteString))
timeout t proc=do
r <- collect' 1 t proc
case r of
[] -> empty
r:_ -> return r
checkRelay= do
return () !> "RELAY"
myNode <- getMyNode
if nodeHost node== nodeHost myNode
then
case lookup "localNode" $ nodeServices node of
Just snode -> do
con <- mconnect $ read snode
cont <- getSData <|> return noParseContext
return (con,cont)
Nothing -> empty
else do
case lookup "relay" $ nodeServices node of
Nothing -> empty
Just relayInfo -> do
let relay= read relayInfo
conn <- mconnect relay
rem <- liftIO $ newIORef $ Just node
let conn'= conn{connData= Just $ Relay conn node,remoteNode=rem}
parseContext <- getState <|> return noParseContext
return (conn', parseContext)
noParseContext= (ParseContext (error "relay error") (error "relay error")
:: ParseContext BS.ByteString)
connectSockTLS host port= do
return () !> "connectSockTLS"
let size=8192
Connection{myNode=my,comEvent= ev} <- getSData <|> error "connect: listen not set for this node"
sock <- liftIO $ connectTo' size host $ PortNumber $ fromIntegral port
conn' <- defConnection >>= \c ->
return c{myNode=my, comEvent= ev,connData=
Just $ (Node2Node u sock (error $ "addr: outgoing connection"))}
setData conn'
input <- liftIO $ SBSL.getContents sock
setData $ ParseContext (error "parse context: Parse error") input
maybeClientTLSHandshake host sock input
`catcht` \(_ :: SomeException) -> empty
connectNode2Node host port= do
return () !> "NODE 2 NODE"
connectSockTLS host port
conn <- getSData <|> error "mconnect: no connection data"
sendRaw conn "CLOS a b\r\n\r\n"
r <- liftIO $ readFrom conn
case r of
"OK" -> do
parseContext <- getState
return (conn,parseContext)
_ -> do
let Connection{connData=cdata}= conn
case cdata of
Just(Node2Node _ s _) -> liftIO $ NS.close s
empty
connectWebSockets host port = do
return () !> "WEBSOCKETS"
connectSockTLS host port
never <- liftIO $ newEmptyMVar :: TransIO (MVar ())
conn <- getSData <|> error "connectWebSockets: no connection"
stream <- liftIO $ makeWSStreamFromConn conn
wscon <- react (NWS.runClientWithStream stream (host++(':': show port)) "/"
WS.defaultConnectionOptions []) (takeMVar never)
return (conn{connData= Just $ (Node2Web wscon)}, noParseContext)
watchConnection= do
conn <- getSData
parseContext <- getSData <|> error "NO PARSE CONTEXT"
:: TransIO (ParseContext BS.ByteString)
chs <- liftIO $ newIORef M.empty
let conn'= conn{closChildren= chs}
putMailbox ((conn',parseContext,node)
:: (Connection,ParseContext BS.ByteString,Node))
liftIO $ threadDelay 100000
#else
mconnect1 (node@(Node host port (Just pool) _))= do
conn' <- getSData <|> error "connect: listen not set for this node"
if nodeHost node== "webnode" then return conn'{connData= Just Self} else do
ws <- connectToWS host $ PortNumber $ fromIntegral port
let conn= conn'{connData= Just (Web2Node ws)}
let parseContext =
ParseContext (error "parsecontext not available in the browser")
("" :: JSString)
chs <- liftIO $ newIORef M.empty
let conn'= conn{closChildren= chs}
liftIO $ modifyMVar_ pool $ \plist -> return $ conn':plist
putMailbox (conn',parseContext,node)
delData $ Closure undefined
return conn
#endif
u= undefined
data ConnectionError= ConnectionError String Node deriving Show
instance Exception ConnectionError
#ifndef ghcjs_HOST_OS
connectTo' bufSize hostname (PortNumber port) = do
proto <- BSD.getProtocolNumber "tcp"
bracketOnError
(NS.socket NS.AF_INET NS.Stream proto)
(sClose)
(\sock -> do
NS.setSocketOption sock NS.RecvBuffer bufSize
NS.setSocketOption sock NS.SendBuffer bufSize
he <- BSD.getHostByName hostname
NS.connect sock (NS.SockAddrInet port (BSD.hostAddress he))
return sock)
#else
connectToWS h (PortNumber p) = do
protocol <- liftIO $ fromJSValUnchecked js_protocol
pathname <- liftIO $ fromJSValUnchecked js_pathname
return () !> ("PAHT",pathname)
let ps = case (protocol :: JSString)of "http:" -> "ws://"; "https:" -> "wss://"
wsOpen $ JS.pack $ ps++ h++ ":"++ show p ++ pathname
#endif
type Blocked= MVar ()
type BuffSize = Int
data ConnectionData=
#ifndef ghcjs_HOST_OS
Node2Node{port :: PortID
,socket ::Socket
,sockAddr :: NS.SockAddr
}
| TLSNode2Node{tlscontext :: SData}
| Node2Web{webSocket :: WS.Connection}
| Self
| Relay Connection Node
#else
Self
| Web2Node{webSocket :: WebSocket}
#endif
data MailboxId = forall a .(Typeable a, Ord a) => MailboxId a TypeRep
instance Eq MailboxId where
id1 == id2 = id1 `compare` id2== EQ
instance Ord MailboxId where
MailboxId n t `compare` MailboxId n' t'=
case typeOf n `compare` typeOf n' of
EQ -> case n `compare` unsafeCoerce n' of
EQ -> t `compare` t'
LT -> LT
GT -> GT
other -> other
data Connection= Connection{idConn :: Int
,myNode :: IORef Node
,remoteNode :: IORef (Maybe Node)
,connData :: Maybe ConnectionData
,bufferSize :: BuffSize
,comEvent :: IORef (M.Map MailboxId (EVar SData))
,blocked :: Blocked
,calling :: Bool
,localClosures :: MVar (M.Map IdClosure EventF)
,closChildren :: IORef (M.Map Int EventF)}
deriving Typeable
defConnection :: (MonadIO m, MonadState EventF m) => m Connection
defConnection = do
idc <- genGlobalId
liftIO $ do
my <- newIORef (error "node in default connection")
x <- newMVar ()
y <- newMVar M.empty
noremote <- newIORef Nothing
z <- return $ error "closchildren: newIORef M.empty"
return $ Connection idc my noremote Nothing 8192
(error "defConnection: accessing network events out of listen")
x False y z
#ifndef ghcjs_HOST_OS
setBuffSize :: Int -> TransIO ()
setBuffSize size= Transient $ do
conn<- getData `onNothing` (defConnection !> "DEFF3")
setData $ conn{bufferSize= size}
return $ Just ()
getBuffSize=
(do getSData >>= return . bufferSize) <|> return 8192
listen :: Node -> Cloud ()
listen (node@(Node _ port _ _ )) = onAll $ do
addThreads 1
setData $ Log False [] [] 0
conn' <- getSData <|> defConnection
ev <- liftIO $ newIORef M.empty
chs <- liftIO $ newIORef M.empty
let conn= conn'{connData=Just Self, comEvent=ev,closChildren=chs}
pool <- liftIO $ newMVar [conn]
let node'= node{connection=Just pool}
liftIO $ writeIORef (myNode conn) node'
setData conn
liftIO $ modifyMVar_ (fromJust $ connection node') $ const $ return [conn]
addNodes [node']
mlog <- listenNew (fromIntegral port) conn <|> listenResponses :: TransIO (StreamData NodeMSG)
return () !> mlog
case mlog of
SMore (RelayMSG _ _ _) -> relay mlog
_ -> execLog mlog
`catcht` (\(e ::SomeException) -> liftIO $ print e)
relay (SMore (RelayMSG origin destiny streamdata)) = do
nodes <- getNodes
my <- getMyNode
if destiny== my
then do
case filter (==origin) nodes of
[node] -> do
(conn: _) <- liftIO $ readMVar $ fromJust $ connection node
setData conn
[] -> do
conn@Connection{remoteNode= rorigin} <- getState
let conn'= conn{connData= Just $ Relay conn origin}
pool <- liftIO $ newMVar [conn']
addNodes [origin{connection= Just pool}]
setData conn'
execLog streamdata
else do
let (origin',destiny')= nat origin destiny my nodes
con <- mconnect destiny'
msend con . SMore $ RelayMSG origin' destiny' streamdata
return () !> ("SEND RELAY DATA",streamdata)
fullStop
relay _= empty
nat origin destiny my nodes=
let destiny' = if nodeHost destiny== nodeHost my
then
case filter (==destiny) nodes of
[node] -> case lookup "localNode" $ nodeServices node of
Just snode -> read snode
Nothing -> destiny
_ -> destiny
else destiny
origin'= if nodeHost origin == "localhost"
then case filter (==origin) nodes of
[node] ->case lookup "externalNode" $ nodeServices node of
Just snode -> read snode
Nothing -> origin
_ -> origin
else origin
in (origin',destiny')
listenNew port conn'= do
sock <- liftIO . listenOn $ PortNumber port
let bufSize= bufferSize conn'
liftIO $ do NS.setSocketOption sock NS.RecvBuffer bufSize
NS.setSocketOption sock NS.SendBuffer bufSize
(sock,addr) <- waitEvents $ NS.accept sock
chs <- liftIO $ newIORef M.empty
noNode <- liftIO $ newIORef Nothing
id1 <- genId
let conn= conn'{idConn=id1,closChildren=chs, remoteNode= noNode}
input <- liftIO $ SBSL.getContents sock
cutExceptions
onException $ \(e :: IOException) ->
when (ioeGetLocation e=="Network.Socket.recvBuf") $ do
liftIO $ putStr "listen: " >> print e
let Connection{remoteNode=rnode,localClosures=localClosures,closChildren= rmap} = conn
mnode <- liftIO $ readIORef rnode
case mnode of
Nothing -> return ()
Just node -> do
liftIO $ putStr "removing1 node: " >> print node
nodes <- getNodes
setNodes $ nodes \\ [node]
liftIO $ do
modifyMVar_ localClosures $ const $ return M.empty
writeIORef rmap M.empty
killBranch
setData $ (ParseContext (NS.close sock >> error "Communication error" ) input
::ParseContext BS.ByteString)
setState conn{connData=Just (Node2Node (PortNumber port) sock addr)}
maybeTLSServerHandshake sock input
(method, uri, vers) <- getFirstLine
case method of
"CLOS" ->
do
conn <- getSData
sendRaw conn "OK"
mread conn
_ -> do
let uri'= BC.tail $ uriPath uri !> uriPath uri
if "api/" `BC.isPrefixOf` uri'
then do
log <- return $ Exec: (Var $ IDyns $ BS.unpack method):(map (Var . IDyns ) $ split $ BC.unpack $ BC.drop 4 uri')
str <- giveData <|> error "no api data"
headers <- getHeaders
maybeSetHost headers
log' <- case (method,lookup "Content-Type" headers) of
("POST",Just "application/x-www-form-urlencoded") -> do
len <- read <$> BC.unpack
<$> (Transient $ return (lookup "Content-Length" headers))
setData $ ParseContext (return mempty) $ BS.take len str
postParams <- parsePostUrlEncoded <|> return []
return $ log ++ [(Var . IDynamic $ postParams)]
_ -> return $ log
return $ SMore $ ClosureData 0 0 log'
else if "relay/" `BC.isPrefixOf` uri' then proxy sock method vers uri'
else do
headers <- getHeaders
return () !> (method,uri')
servePages (method, uri', headers)
conn <- getSData
sconn <- makeWebsocketConnection conn uri headers
let conn'= conn{connData= Just (Node2Web sconn)
, closChildren=chs}
setState conn' !> "WEBSOCKETS-----------------------------------------------"
onException $ \(e :: SomeException) -> do
cutExceptions
liftIO $ putStr "listen websocket:" >> print e
killBranch
empty
do
r <- parallel $ do
msg <- WS.receiveData sconn
return () !> ("Server WebSocket msg read",msg)
!> "<-------<---------<--------------"
case reads $ BS.unpack msg of
[] -> do
let log =Exec: [Var $ IDynamic (msg :: BS.ByteString)]
return $ SMore (ClosureData 0 0 log)
((x ,_):_) -> return (x :: StreamData NodeMSG)
case r of
SError e -> do
back e
_ -> return r
where
uriPath = BC.dropWhile (/= '/')
split []= []
split ('/':r)= split r
split s=
let (h,t) = span (/= '/') s
in h: split t
proxy sclient method vers uri' = do
let (host:port:_)= split $ BC.unpack $ BC.drop 6 uri'
sserver <- liftIO $ connectTo' 4096 host $ PortNumber $ fromIntegral $ read port
rawHeaders <- getRawHeaders
let uri= BS.fromStrict $ let d x= BC.tail $ BC.dropWhile (/= '/') x in d . d $ d uri'
let sent= method <> BS.pack " /"
<> uri
<> BS.cons ' ' vers
<> BS.pack "\r\n"
<> rawHeaders <> BS.pack "\r\n\r\n"
liftIO $ SBSL.send sserver sent
cutExceptions
onException $ \(e:: SomeException ) -> liftIO $ do
putStr "Proxy: " >> print e
sClose sserver
sClose sclient
send sclient sserver <|> send sserver sclient
empty
where
send f t= async $ mapData f t
mapData from to = do
content <- recv from 4096
if not $ BC.null content
then sendAll to content >> mapData from to
else finish
where
finish= sClose from >> sClose to
maybeSetHost headers= do
setHost <- liftIO $ readIORef rsetHost
when setHost $ do
mnode <- liftIO $ do
let mhost= lookup "Host" headers
case mhost of
Nothing -> return Nothing
Just host -> atomically $ do
nodes <- readTVar nodeList
let (host1,port)= BC.span (/= ':') host
hostnode= (head nodes){nodeHost= BC.unpack host1
,nodePort= if BC.null port then 80
else read $ BC.unpack $ BC.tail port}
writeTVar nodeList $ hostnode : tail nodes
return $ Just hostnode
when (isJust mnode) $ do
conn <- getState
liftIO $ writeIORef (myNode conn) $fromJust mnode
liftIO $ writeIORef rsetHost False
{-#NOINLINE rsetHost #-}
rsetHost= unsafePerformIO $ newIORef True
#endif
listenResponses :: Loggable a => TransIO (StreamData a)
listenResponses= do
(conn, parsecontext, node) <- getMailbox
labelState $ "listen from: "++ show node
setData conn
#ifndef ghcjs_HOST_OS
setData (parsecontext :: ParseContext BS.ByteString)
#else
setData (parsecontext :: ParseContext JSString)
#endif
cutExceptions
onException (\(e:: SomeException) -> do
liftIO $ putStr "ListenResponses: " >> print e
liftIO $ putStr "removing node: " >> print node
nodes <- getNodes
setNodes $ nodes \\ [node]
killChilds
let Connection{localClosures=localClosures}= conn
liftIO $ modifyMVar_ localClosures $ const $ return M.empty)
mread conn
type IdClosure= Int
newtype Closure= Closure IdClosure
type RemoteClosure= (Node, IdClosure)
newtype JobGroup= JobGroup (M.Map String RemoteClosure) deriving Typeable
stopRemoteJob :: String -> Cloud ()
stopRemoteJob ident = do
local $ do
JobGroup map <- getRState <|> return (JobGroup M.empty)
let mj= M.lookup ident map
when (isJust mj) $ putMailbox $ fromJust mj
fixClosure
local $ do
JobGroup map <- getRState <|> return (JobGroup M.empty)
Closure closr <- getData `onNothing` error "resetRemote: Closure not set, use wormhole"
conn <- getData `onNothing` error "resetRemote: no connection set"
remote <- liftIO $ readIORef $ remoteNode conn
when (isJust remote) $ do
setRState $ JobGroup $ M.insert ident (fromJust remote,closr) map
putMailbox (fromJust remote, closr)
resetRemote :: Cloud ()
resetRemote= local $ do
Closure clos <- getState `onNothing` return (Closure 0)
conn <- getData `onNothing` error "resetRemote: no connection set"
remote <- liftIO $ readIORef $ remoteNode conn
when (isJust remote) $ putMailbox (fromJust remote, clos)
manageClosures = do
(remote, clos) <- local getMailbox
localIO $ print ("MANAGECLOSURESSSSSSSSSSSSSS", clos)
when (clos /= 0) $ runAt remote $ local $ do
conn@Connection {localClosures=localClosures} <- getData `onNothing` error "Listen: myNode not set"
mcont <- liftIO $ modifyMVar localClosures $ \map -> return ( M.delete clos map, M.lookup clos map)
case mcont of
Nothing -> error $ "closure not found: " ++ show clos
Just cont -> do
showThreads $ fromJust $ parent cont
liftIO $ killBranch' cont
return ()
execLog :: StreamData NodeMSG -> TransIO ()
execLog mlog = Transient $ do
return () !> "EXECLOG"
case mlog of
SError e -> do
case fromException e of
Just (ErrorCall str) -> do
case read str of
(e@(CloudException _ closl err)) -> do
process closl (error "closr: should not be used") (Left e) True
SDone -> runTrans(back $ ErrorCall "SDone") >> return Nothing
SMore (ClosureData closl closr log) -> process closl closr (Right log) False
SLast (ClosureData closl closr log) -> process closl closr (Right log) True
where
process :: IdClosure -> IdClosure -> (Either CloudException CurrentPointer) -> Bool -> StateIO (Maybe ())
process closl closr mlog deleteClosure= do
conn@Connection {localClosures=localClosures} <- getData `onNothing` error "Listen: myNode not set"
if closl== 0 then case mlog of
Left except -> do
setData $ Log True [] []
return () !> "THROWWWW1"
runTrans $ throwt except
empty
Right log -> do
setData $ Log True log (reverse log) 0
setData $ Closure closr
return $ Just ()
else do
mcont <- liftIO $ modifyMVar localClosures
$ \map -> return (if deleteClosure then
M.delete closl map
else map, M.lookup closl map)
case mcont of
Nothing -> do
runTrans $ msend conn $ SLast (ClosureData closr closl [])
runTrans $ liftIO $ error ("request received for non existent closure: "
++ show closl)
Just cont -> do
liftIO $ runStateT (case mlog of
Right log -> do
Log _ _ fulLog hash <- getData `onNothing` return (Log True [] [] 0)
let nlog= reverse log ++ fulLog
setData $ Log True log nlog hash
setData $ Closure closr
runContinuation cont ()
Left except -> do
setData $ Log True [] []
return () !> "THROWWWW2"
runTrans $ throwt except) cont
return Nothing
#ifdef ghcjs_HOST_OS
listen node = onAll $ do
addNodes [node]
events <- liftIO $ newIORef M.empty
rnode <- liftIO $ newIORef node
conn <- defConnection >>= \c -> return c{myNode=rnode,comEvent=events}
setData conn
r <- listenResponses
execLog r
#endif
type Pool= [Connection]
type Package= String
type Program= String
type Service= [(Package, Program)]
#ifndef ghcjs_HOST_OS
readFrom Connection{connData= Just(TLSNode2Node ctx)}= recvTLSData ctx
readFrom Connection{connData= Just(Node2Node _ sock _)} = toStrict <$> loop
where
bufSize= 4098
loop :: IO BL.ByteString
loop = unsafeInterleaveIO $ do
s <- SBS.recv sock bufSize
if BC.length s < bufSize
then return $ BLC.Chunk s mempty
else BLC.Chunk s `liftM` loop
readFrom _ = error "readFrom error"
toStrict= B.concat . BS.toChunks
makeWSStreamFromConn conn= do
let rec= readFrom conn
send= sendRaw conn
makeStream
(do
bs <- rec
return $ if BC.null bs then Nothing else Just bs)
(\mbBl -> case mbBl of
Nothing -> return ()
Just bl -> send bl)
makeWebsocketConnection conn uri headers= liftIO $ do
stream <- makeWSStreamFromConn conn
let
pc = WS.PendingConnection
{ WS.pendingOptions = WS.defaultConnectionOptions
, WS.pendingRequest = NWS.RequestHead uri headers False
, WS.pendingOnAccept = \_ -> return ()
, WS.pendingStream = stream
}
sconn <- WS.acceptRequest pc
WS.forkPingThread sconn 30
return sconn
servePages (method,uri, headers) = do
conn <- getSData <|> error " servePageMode: no connection"
if isWebSocketsReq headers
then return ()
else do
let file= if BC.null uri then "index.html" else uri
mcontent <- liftIO $ (Just <$> BL.readFile ( "./static/out.jsexe/"++ BC.unpack file) )
`catch` (\(e:: SomeException) -> return Nothing)
case mcontent of
Just content -> liftIO $ sendRaw conn $
"HTTP/1.0 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\nContent-Length: "
<> BS.pack (show $ BL.length content) <>"\r\n\r\n" <> content
Nothing ->liftIO $ sendRaw conn $ BS.pack $
"HTTP/1.0 404 Not Found\nContent-Length: 13\nConnection: close\n\nNot Found 404"
empty
api :: TransIO BS.ByteString -> Cloud ()
api w= Cloud $ do
conn <- getSData <|> error "api: Need a connection opened with initNode, listen, simpleWebApp"
let send= sendRaw conn
r <- w
send r
isWebSocketsReq = not . null
. filter ( (== mk "Sec-WebSocket-Key") . fst)
data HTTPMethod= GET | POST deriving (Read,Show,Typeable)
getFirstLine= (,,) <$> getMethod <*> (toStrict <$> getUri) <*> getVers
where
getMethod= parseString
getUri= parseString
getVers= parseString
getRawHeaders= dropSpaces >> parse (scan mempty)
where
scan res str
| "\r\n\r\n" `BS.isPrefixOf` str= (res, BS.drop 4 str)
| otherwise= scan ( BS.snoc res $ BS.head str) $ BS.tail str
type PostParams = [(BS.ByteString, String)]
parsePostUrlEncoded :: TransIO PostParams
parsePostUrlEncoded= do
dropSpaces
many $ (,) <$> param <*> value
where
param= tTakeWhile' ( /= '=')
value= unEscapeString <$> BS.unpack <$> tTakeWhile' ( /= '&')
getHeaders = manyTill paramPair (string "\r\n\r\n")
where
paramPair= (,) <$> (mk <$> getParam) <*> getParamValue
getParam= do
dropSpaces
r <- tTakeWhile (\x -> x /= ':' && not (endline x))
if BS.null r || r=="\r" then empty else dropChar >> return (toStrict r)
getParamValue= toStrict <$> ( dropSpaces >> tTakeWhile (\x -> not (endline x)))
#endif
#ifdef ghcjs_HOST_OS
isBrowserInstance= True
api _= empty
#else
isBrowserInstance= False
#endif
{-# NOINLINE emptyPool #-}
emptyPool :: MonadIO m => m (MVar Pool)
emptyPool= liftIO $ newMVar []
createNodeServ :: HostName -> Int -> Service -> IO Node
createNodeServ h p svs= return $ Node h p Nothing svs
createNode :: HostName -> Int -> IO Node
createNode h p= createNodeServ h p []
createWebNode :: IO Node
createWebNode= do
pool <- emptyPool
return $ Node "webnode" 0 (Just pool) [("webnode","")]
instance Eq Node where
Node h p _ _ ==Node h' p' _ _= h==h' && p==p'
instance Show Node where
show (Node h p _ servs )= show (h,p, servs)
instance Read Node where
readsPrec n s=
let r= readsPrec n s
in case r of
[] -> []
[((h,p,ss),s')] -> [(Node h p Nothing ss ,s')]
nodeList :: TVar [Node]
nodeList = unsafePerformIO $ newTVarIO []
deriving instance Ord PortID
errorMyNode f= error $ f ++ ": Node not set. initialize it with connect, listen, initNode..."
getMyNode :: TransIO Node
getMyNode = do
Connection{myNode= node} <- getSData <|> errorMyNode "getMyNode" :: TransIO Connection
liftIO $ readIORef node
getNodes :: MonadIO m => m [Node]
getNodes = liftIO $ atomically $ readTVar nodeList
getEqualNodes = do
nodes <- getNodes
let srv= nodeServices $ head nodes
case srv of
[] -> return $ filter (null . nodeServices) nodes
(srv:_) -> return $ filter (\n -> head (nodeServices n) == srv ) nodes
matchNodes f = do
nodes <- getNodes
return $ map (\n -> filter f $ nodeServices n) nodes
addNodes :: [Node] -> TransIO ()
addNodes nodes= do
nodes' <- mapM fixNode nodes
liftIO . atomically $ do
prevnodes <- readTVar nodeList
writeTVar nodeList $ nub $ prevnodes ++ nodes'
fixNode n= case connection n of
Nothing -> do
pool <- emptyPool
return n{connection= Just pool}
Just _ -> return n
setNodes nodes= liftIO $ atomically $ writeTVar nodeList $ nodes
shuffleNodes :: MonadIO m => m [Node]
shuffleNodes= liftIO . atomically $ do
nodes <- readTVar nodeList
let nodes'= tail nodes ++ [head nodes]
writeTVar nodeList nodes'
return nodes'
connect :: Node -> Node -> Cloud ()
#ifndef ghcjs_HOST_OS
connect node remotenode = do
listen node <|> return ()
connect' remotenode
connect' :: Node -> Cloud ()
connect' remotenode= loggedc $ do
nodes <- local getNodes
localIO $ putStr "connecting to: " >> print remotenode
newNodes <- runAt remotenode $ interchange nodes
local $ return () !> "interchange finish"
let toAdd=remotenode:tail newNodes
callNodes' nodes (<>) mempty $ local $ do
liftIO $ putStr "New nodes: " >> print toAdd !> "NEWNODES"
addNodes toAdd
where
interchange nodes=
do
newNodes <- local $ do
conn@Connection{remoteNode=rnode, connData=Just cdata} <- getSData <|>
error ("connect': need to be connected to a node: use wormhole/connect/listen")
let newNodes= map (\n -> n{nodeServices= nodeServices n ++ [("relay",show (remotenode,n))]}) nodes
callingNode<- fixNode $ head newNodes
liftIO $ writeIORef rnode $ Just callingNode
liftIO $ modifyMVar_ (fromJust $ connection callingNode) $ const $ return [conn]
return newNodes
oldNodes <- local $ getNodes
mclustered . local $ do
liftIO $ putStrLn "New nodes: " >> print newNodes
addNodes newNodes
localIO $ atomically $ do
nodes <- readTVar nodeList
let nodes'= (head nodes){nodeHost=nodeHost remotenode
,nodePort=nodePort remotenode}:tail nodes
writeTVar nodeList nodes'
return oldNodes
#else
connect _ _= empty
connect' _ = empty
#endif