module Transient.Move.Internals where
import Transient.Internals
import Transient.Logged
import Transient.Indeterminism(choose)
import Transient.Backtrack
import Transient.EVars
import Data.Typeable
import Control.Applicative
#ifndef ghcjs_HOST_OS
import Network
import Network.Info
import Network.URI
import qualified Network.Socket as NS
import qualified Network.BSD as BSD
import qualified Network.WebSockets as NWS
import qualified Network.WebSockets.Connection as WS
import Network.WebSockets.Stream hiding(parse)
import qualified Data.ByteString as B(ByteString,concat)
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy.Internal as BLC
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BS
import Network.Socket.ByteString as SBS(send,sendMany,sendAll,recv)
import qualified Network.Socket.ByteString.Lazy as SBSL
import Data.CaseInsensitive(mk)
import Data.Char(isSpace)
#else
import JavaScript.Web.WebSocket
import qualified JavaScript.Web.MessageEvent as JM
import GHCJS.Prim (JSVal)
import GHCJS.Marshal(fromJSValUnchecked)
import qualified Data.JSString as JS
import JavaScript.Web.MessageEvent.Internal
import GHCJS.Foreign.Callback.Internal (Callback(..))
import qualified GHCJS.Foreign.Callback as CB
import Data.JSString (JSString(..), pack)
#endif
import Control.Monad.State
import System.IO
import Control.Exception hiding (onException,try)
import Data.Maybe
import Control.Monad
import System.IO.Unsafe
import Control.Concurrent.STM as STM
import Control.Concurrent.MVar
import Data.Monoid
import qualified Data.Map as M
import Data.List (nub,(\\),find, insert)
import Data.IORef
import System.IO
import Control.Concurrent
import Data.Dynamic
import Data.String
import System.Mem.StableName
import Unsafe.Coerce
#ifdef ghcjs_HOST_OS
type HostName = String
newtype PortID = PortNumber Int deriving (Read, Show, Eq, Typeable)
#endif
data Node= Node{ nodeHost :: HostName
, nodePort :: Int
, connection :: MVar Pool
, nodeServices :: [Service]
}
deriving (Typeable)
instance Ord Node where
compare node1 node2= compare (nodeHost node1,nodePort node1)(nodeHost node2,nodePort node2)
newtype Cloud a= Cloud {runCloud' ::TransIO a} deriving (Functor,Applicative,Monoid,Alternative, Monad, Num, MonadState EventF)
runCloud x= do
closRemote <- getSData <|> return (Closure 0)
runCloud' x <*** setData closRemote
#ifndef ghcjs_HOST_OS
tlsHooks ::IORef (SData -> BS.ByteString -> IO ()
,SData -> IO B.ByteString
,NS.Socket -> BS.ByteString -> TransIO ()
,String -> NS.Socket -> BS.ByteString -> TransIO ())
tlsHooks= unsafePerformIO $ newIORef
( notneeded
, notneeded
, \s i -> tlsNotSupported i
, \_ _ _-> return())
where
notneeded= error "TLS hook function called"
tlsNotSupported input = do
if ((not $ BL.null input) && BL.head input == 0x16)
then do
conn <- getSData
sendRaw conn $ BS.pack $ "HTTP/1.0 525 SSL Handshake Failed\nContent-Length: 0\nConnection: close\n\n"
else return ()
(sendTLSData,recvTLSData,maybeTLSServerHandshake,maybeClientTLSHandshake)= unsafePerformIO $ readIORef tlsHooks
#endif
local :: Loggable a => TransIO a -> Cloud a
local = Cloud . logged
runCloudIO :: Typeable a => Cloud a -> IO (Maybe a)
runCloudIO (Cloud mx)= keep mx
runCloudIO' :: Typeable a => Cloud a -> IO (Maybe a)
runCloudIO' (Cloud mx)= keep' mx
onAll :: TransIO a -> Cloud a
onAll = Cloud
lazy :: TransIO a -> Cloud a
lazy mx= onAll $ getCont >>= \st -> Transient $
return $ unsafePerformIO $ runStateT (runTrans mx) st >>= return .fst
loggedc :: Loggable a => Cloud a -> Cloud a
loggedc (Cloud mx)= Cloud $ do
closRemote <- getSData <|> return (Closure 0 )
logged mx <*** setData closRemote
loggedc' :: Loggable a => Cloud a -> Cloud a
loggedc' (Cloud mx)= Cloud $ logged mx
lliftIO :: Loggable a => IO a -> Cloud a
lliftIO= local . liftIO
localIO :: Loggable a => IO a -> Cloud a
localIO= lliftIO
fullStop :: Cloud stop
fullStop= onAll $ setData WasRemote >> stop
beamTo :: Node -> Cloud ()
beamTo node = wormhole node teleport
forkTo :: Node -> Cloud ()
forkTo node= beamTo node <|> return()
callTo :: Loggable a => Node -> Cloud a -> Cloud a
callTo node remoteProc=
wormhole node $ atRemote remoteProc
#ifndef ghcjs_HOST_OS
callTo' :: (Show a, Read a,Typeable a) => Node -> Cloud a -> Cloud a
callTo' node remoteProc= do
mynode <- local getMyNode
beamTo node
r <- remoteProc
beamTo mynode
return r
#endif
atRemote proc= loggedc' $ do
teleport
r <- Cloud $ runCloud proc <** setData WasRemote
teleport
return r
runAt :: Loggable a => Node -> Cloud a -> Cloud a
runAt= callTo
single :: TransIO a -> TransIO a
single f= do
cutExceptions
con@Connection{closChildren=rmap} <- getSData <|> error "single: only works within a wormhole"
mapth <- liftIO $ readIORef rmap
id <- liftIO $ f `seq` makeStableName f >>= return . hashStableName
case M.lookup id mapth of
Just tv -> killBranch' tv
Nothing -> return ()
tv <- get
f <** do
id <- liftIO $ makeStableName f >>= return . hashStableName
liftIO $ modifyIORef rmap $ \mapth -> M.insert id tv mapth
unique :: a -> TransIO ()
unique f= do
con@Connection{closChildren=rmap} <- getSData <|> error "unique: only works within a connection. Use wormhole"
mapth <- liftIO $ readIORef rmap
id <- liftIO $ f `seq` makeStableName f >>= return . hashStableName
let mx = M.lookup id mapth
case mx of
Just tv -> empty
Nothing -> do
tv <- get
liftIO $ modifyIORef rmap $ \mapth -> M.insert id tv mapth
wormhole :: Loggable a => Node -> Cloud a -> Cloud a
wormhole node (Cloud comp) = local $ Transient $ do
moldconn <- getData :: StateIO (Maybe Connection)
mclosure <- getData :: StateIO (Maybe Closure)
labelState $ "wormhole" ++ show node
logdata@(Log rec log fulLog) <- getData `onNothing` return (Log False [][])
mynode <- runTrans getMyNode
if not rec
then runTrans $ (do
conn <- mconnect node
setData conn{calling= True}
comp )
<*** do when (isJust moldconn) . setData $ fromJust moldconn
when (isJust mclosure) . setData $ fromJust mclosure
else do
let conn = fromMaybe (error "wormhole: no connection in remote node") moldconn
setData $ conn{calling= False}
runTrans $ comp
<*** do
when (isJust mclosure) . setData $ fromJust mclosure
#ifndef ghcjs_HOST_OS
type JSString= String
pack= id
#endif
teleport :: Cloud ()
teleport = do
local $ Transient $ do
cont <- get
labelState "teleport"
Log rec log fulLog <- getData `onNothing` return (Log False [][])
if not rec
then do
conn@Connection{connData=contype,closures= closures,calling= calling} <- getData
`onNothing` error "teleport: No connection defined: use wormhole"
#ifndef ghcjs_HOST_OS
case contype of
Just Self -> do
setData $ if (not calling) then WasRemote else WasParallel
runTrans $ async $ return ()
_ -> do
#else
do
#endif
Closure closRemote <- getData `onNothing` return (Closure 0 )
let closLocal = sum $ map (\x-> case x of Wait -> 100000;
Exec -> 1000
_ -> 1) fulLog
node <- runTrans getMyNode
liftIO $ modifyMVar_ closures $ \map -> return $ M.insert closLocal (fulLog,cont) map
let tosend= reverse $ if closRemote==0 then fulLog else log
runTrans $ msend conn $ SMore (closRemote,closLocal,tosend )
setData $ if (not calling) then WasRemote else WasParallel
return Nothing
else do
delData WasRemote
return (Just ())
copyData def = do
r <- local getSData <|> return def
onAll $ setData r
return r
putMailbox :: Typeable a => a -> TransIO ()
putMailbox = putMailbox' 0
putMailbox' :: Typeable a => Int -> a -> TransIO ()
putMailbox' idbox dat= do
let name= MailboxId idbox $ typeOf dat
Connection{comEvent= mv} <- getData `onNothing` errorMailBox
mbs <- liftIO $ readIORef mv
let mev = M.lookup name mbs
case mev of
Nothing ->newMailbox name >> putMailbox' idbox dat
Just ev -> writeEVar ev $ unsafeCoerce dat
newMailbox :: MailboxId -> TransIO ()
newMailbox name= do
Connection{comEvent= mv} <- getData `onNothing` errorMailBox
ev <- newEVar
liftIO $ atomicModifyIORef mv $ \mailboxes -> (M.insert name ev mailboxes,())
errorMailBox= error "MailBox: No connection open. Use wormhole"
getMailbox :: Typeable a => TransIO a
getMailbox = getMailbox' 0
getMailbox' :: Typeable a => Int -> TransIO a
getMailbox' mboxid = x where
x = do
let name= MailboxId mboxid $ typeOf $ typeOf1 x
Connection{comEvent= mv} <- getData `onNothing` errorMailBox
mbs <- liftIO $ readIORef mv
let mev = M.lookup name mbs
case mev of
Nothing ->newMailbox name >> getMailbox' mboxid
Just ev ->unsafeCoerce $ readEVar ev
typeOf1 :: TransIO a -> a
typeOf1 = undefined
cleanMailbox :: Typeable a => a -> TransIO ()
cleanMailbox = cleanMailbox' 0
cleanMailbox' :: Typeable a => Int -> a -> TransIO ()
cleanMailbox' mboxid witness= do
let name= MailboxId mboxid $ typeOf witness
Connection{comEvent= mv} <- getData `onNothing` error "getMailBox: accessing network events out of listen"
mbs <- liftIO $ readIORef mv
let mev = M.lookup name mbs
case mev of
Nothing -> return()
Just ev -> do cleanEVar ev
liftIO $ atomicModifyIORef mv $ \mbs -> (M.delete name mbs,())
clustered :: Loggable a => Cloud a -> Cloud a
clustered proc= callNodes (<|>) empty proc
mclustered :: (Monoid a, Loggable a) => Cloud a -> Cloud a
mclustered proc= callNodes (<>) mempty proc
callNodes op init proc= loggedc' $ do
nodes <- local getNodes
let nodes' = filter (not . isWebNode) nodes
callNodes' nodes' op init proc
where
isWebNode Node {nodeServices=srvs}
| ("webnode","") `elem` srvs = True
| otherwise = False
callNodes' nodes op init proc= foldr op init $ map (\node -> runAt node proc) nodes
#ifndef ghcjs_HOST_OS
sendRaw (Connection _(Just (Node2Web sconn )) _ _ _ _ _ _) r=
liftIO $ WS.sendTextData sconn r
sendRaw (Connection _(Just (Node2Node _ sock _)) _ _ blocked _ _ _) r=
liftIO $ withMVar blocked $ const $ SBS.sendMany sock
(BL.toChunks r )
sendRaw (Connection _(Just (TLSNode2Node ctx )) _ _ blocked _ _ _) r=
liftIO $ withMVar blocked $ const $ sendTLSData ctx r
#else
sendRaw (Connection _ (Just (Web2Node sconn)) _ _ blocked _ _ _) r= liftIO $
withMVar blocked $ const $ JavaScript.Web.WebSocket.send r sconn
#endif
msend :: Loggable a => Connection -> StreamData a -> TransIO ()
#ifndef ghcjs_HOST_OS
msend (Connection _(Just (Node2Node _ sock _)) _ _ blocked _ _ _) r=
liftIO $ withMVar blocked $ const $ SBS.sendAll sock $ BC.pack (show r)
msend (Connection _(Just (TLSNode2Node ctx)) _ _ blocked _ _ _) r=
liftIO $ sendTLSData ctx $ BS.pack (show r)
msend (Connection _(Just (Node2Web sconn)) _ _ blocked _ _ _) r=liftIO $
WS.sendTextData sconn $ BS.pack (show r)
#else
msend (Connection _ (Just (Web2Node sconn)) _ _ blocked _ _ _) r= liftIO $
withMVar blocked $ const $ JavaScript.Web.WebSocket.send (JS.pack $ show r) sconn
#endif
msend (Connection _ Nothing _ _ _ _ _ _) _= error "msend out of wormhole context"
mread :: Loggable a => Connection -> TransIO (StreamData a)
#ifdef ghcjs_HOST_OS
mread (Connection _ (Just (Web2Node sconn)) _ _ _ _ _ _)= wsRead sconn
wsRead :: Loggable a => WebSocket -> TransIO a
wsRead ws= do
dat <- react (hsonmessage ws) (return ())
case JM.getData dat of
JM.StringData str -> return (read' $ JS.unpack str)
JM.BlobData blob -> error " blob"
JM.ArrayBufferData arrBuffer -> error "arrBuffer"
wsOpen :: JS.JSString -> TransIO WebSocket
wsOpen url= do
ws <- liftIO $ js_createDefault url
react (hsopen ws) (return ())
return ws
foreign import javascript safe
"window.location.hostname"
js_hostname :: JSVal
foreign import javascript safe
"window.location.protocol"
js_protocol :: JSVal
foreign import javascript safe
"(function(){var res=window.location.href.split(':')[2];if (res === undefined){return 80} else return res.split('/')[0];})()"
js_port :: JSVal
foreign import javascript safe
"$1.onmessage =$2;"
js_onmessage :: WebSocket -> JSVal -> IO ()
getWebServerNode :: TransIO Node
getWebServerNode = liftIO $ do
h <- fromJSValUnchecked js_hostname
p <- fromIntegral <$> (fromJSValUnchecked js_port :: IO Int)
createNode h p
hsonmessage ::WebSocket -> (MessageEvent ->IO()) -> IO ()
hsonmessage ws hscb= do
cb <- makeCallback MessageEvent hscb
js_onmessage ws cb
foreign import javascript safe
"$1.onopen =$2;"
js_open :: WebSocket -> JSVal -> IO ()
newtype OpenEvent = OpenEvent JSVal deriving Typeable
hsopen :: WebSocket -> (OpenEvent ->IO()) -> IO ()
hsopen ws hscb= do
cb <- makeCallback OpenEvent hscb
js_open ws cb
makeCallback :: (JSVal -> a) -> (a -> IO ()) -> IO JSVal
makeCallback f g = do
Callback cb <- CB.syncCallback1 CB.ContinueAsync (g . f)
return cb
foreign import javascript safe
"new WebSocket($1)" js_createDefault :: JS.JSString -> IO WebSocket
#else
mread (Connection _(Just (Node2Node _ _ _)) _ _ _ _ _ _) = parallelReadHandler
mread (Connection _(Just (TLSNode2Node ctx)) _ _ _ _ _ _) = parallelReadHandler
mread (Connection node (Just (Node2Web sconn )) _ _ _ _ _ _)=
parallel $ do
s <- WS.receiveData sconn
return . read' $ BS.unpack s
getWebServerNode :: TransIO Node
getWebServerNode = getMyNode
#endif
read' s= case readsPrec' 0 s of
[(x,"")] -> x
_ -> error $ "reading " ++ s
mclose :: Connection -> IO ()
#ifndef ghcjs_HOST_OS
mclose (Connection _
(Just (Node2Node _ sock _ )) _ _ _ _ _ _)= NS.close sock
mclose (Connection node
(Just (Node2Web sconn ))
bufSize events blocked _ _ _)=
WS.sendClose sconn ("closemsg" :: BS.ByteString)
#else
mclose (Connection _ (Just (Web2Node sconn)) _ _ blocked _ _ _)=
JavaScript.Web.WebSocket.close Nothing Nothing sconn
#endif
mconnect :: Node -> TransIO Connection
mconnect node@(Node _ _ _ _ )= do
nodes <- getNodes
let fnode = filter (==node) nodes
case fnode of
[] -> addNodes [node] >> mconnect node
[Node host port pool _] -> do
plist <- liftIO $ readMVar pool
case plist of
handle:_ -> do
delData $ Closure undefined
return handle
_ -> mconnect1 host port pool
where
#ifndef ghcjs_HOST_OS
mconnect1 host port pool= do
connectNode2Node host port <|> connectWebSockets host port
watchConnection
where
connectSockTLS host port= do
return ()
my <- getMyNode
let size=8192
Connection{comEvent= ev} <- getSData <|> error "connect: listen not set for this node"
sock <- liftIO $connectTo' size host $ PortNumber $ fromIntegral port
conn' <- liftIO $ defConnection >>= \c ->
return c{myNode=my,comEvent= ev,connData=
Just $ Node2Node u sock (error $ "addr: outgoing connection")}
setData conn'
input <- liftIO $ SBSL.getContents sock
setData $ ParseContext (error "listenResponses: Parse error") input
maybeClientTLSHandshake host sock input
connectNode2Node host port= do
connectSockTLS host port
conn <- getSData <|> error "mconnect: no connection data"
sendRaw conn "CLOS a b\r\n\r\n"
r <-
liftIO $ readFrom conn
case r of
"OK" -> return()
_ -> do
let Connection{connData=cdata}= conn
case cdata of
Just(Node2Node _ s _) -> liftIO $ NS.close s
empty
connectWebSockets host port = do
liftIO $ print "Trying WebSockets"
connectSockTLS host port
never<- liftIO $ newEmptyMVar :: TransIO (MVar ())
conn <- getSData <|> error "connectWebSockets: no connection"
stream <- liftIO $ makeWSStreamFromConn conn
wscon <- react (NWS.runClientWithStream stream host "/"
WS.defaultConnectionOptions []) (takeMVar never)
liftIO $ print "WebSockets connection"
modifyState $ \(Just c) -> Just c{connData= Just $ Node2Web wscon}
watchConnection= do
conn <- getSData
parseContext <- getSData <|> error "NO PASE CONTEXT"
:: TransIO (ParseContext BS.ByteString)
chs <- liftIO $ newIORef M.empty
let conn'= conn{closChildren= chs}
(liftIO $ modifyMVar_ pool $ \plist -> return $ conn':plist)
putMailbox ((conn',parseContext,node)
:: (Connection,ParseContext BS.ByteString,Node))
delData $ Closure undefined
return conn
#else
mconnect1 host port pool= do
my <- getMyNode
Connection{comEvent= ev} <- getSData <|> error "connect: listen not set for this node"
do
ws <- connectToWS host $ PortNumber $ fromIntegral port
conn <- defConnection >>= \c -> return c{comEvent= ev,connData= Just $ Web2Node ws}
let parseContext =
ParseContext (error "parsecontext not available in the browser")
("" :: JSString)
chs <- liftIO $ newIORef M.empty
let conn'= conn{closChildren= chs}
liftIO $ modifyMVar_ pool $ \plist -> return $ conn':plist
putMailbox (conn',parseContext,node)
delData $ Closure undefined
return conn
#endif
u= undefined
#ifndef ghcjs_HOST_OS
connectTo' bufSize hostname (PortNumber port) = do
proto <- BSD.getProtocolNumber "tcp"
bracketOnError
(NS.socket NS.AF_INET NS.Stream proto)
(sClose)
(\sock -> do
NS.setSocketOption sock NS.RecvBuffer bufSize
NS.setSocketOption sock NS.SendBuffer bufSize
he <- BSD.getHostByName hostname
NS.connect sock (NS.SockAddrInet port (BSD.hostAddress he))
return sock)
#else
connectToWS h (PortNumber p) = do
protocol <- liftIO $ fromJSValUnchecked js_protocol
let ps = case (protocol :: JSString)of "http:" -> "ws://"; "https:" -> "wss://"
wsOpen $ JS.pack $ ps++ h++ ":"++ show p
#endif
type Blocked= MVar ()
type BuffSize = Int
data ConnectionData=
#ifndef ghcjs_HOST_OS
Node2Node{port :: PortID
,socket ::Socket
,remoteNode :: NS.SockAddr
}
| TLSNode2Node{tlscontext :: SData}
| Node2Web{webSocket :: WS.Connection}
| Self
#else
Self
| Web2Node{webSocket :: WebSocket}
#endif
data MailboxId= MailboxId Int TypeRep deriving (Eq,Ord)
data Connection= Connection{myNode :: Node
,connData :: Maybe(ConnectionData)
,bufferSize :: BuffSize
,comEvent :: IORef (M.Map MailboxId (EVar SData))
,blocked :: Blocked
,calling :: Bool
,closures :: MVar (M.Map IdClosure ([LogElem], EventF))
,closChildren :: IORef (M.Map Int EventF)}
deriving Typeable
defConnection :: MonadIO m => m Connection
defConnection = liftIO $ do
x <- newMVar ()
y <- newMVar M.empty
z <- return $ error "closchildren newIORef M.empty"
return $ Connection (error "node in default connection") Nothing 8192
(error "defConnection: accessing network events out of listen")
x False y z
#ifndef ghcjs_HOST_OS
setBuffSize :: Int -> TransIO ()
setBuffSize size= Transient $ do
conn<- getData `onNothing` defConnection
setData $ conn{bufferSize= size}
return $ Just ()
getBuffSize=
(do getSData >>= return . bufferSize) <|> return 8192
listen :: Node -> Cloud ()
listen (node@(Node _ port _ _ )) = onAll $ do
addThreads 1
setData $ Log False [] []
conn' <- getSData <|> defConnection
ev <- liftIO $ newIORef M.empty
chs <- liftIO $ newIORef M.empty
let conn= conn'{connData=Just Self,myNode=node, comEvent=ev,closChildren=chs}
setData conn
liftIO $ modifyMVar_ (connection node) $ const $ return [conn]
addNodes [node]
mlog <- listenNew (fromIntegral port) conn <|> listenResponses
execLog mlog
listenNew port conn'= do
sock <- liftIO . listenOn $ PortNumber port
let bufSize= bufferSize conn'
liftIO $ do NS.setSocketOption sock NS.RecvBuffer bufSize
NS.setSocketOption sock NS.SendBuffer bufSize
(sock,addr) <- waitEvents $ NS.accept sock
chs <- liftIO $ newIORef M.empty
let conn= conn'{closChildren=chs}
input <- liftIO $ SBSL.getContents sock
setData $ (ParseContext (error "parsing request") input
::ParseContext BS.ByteString)
cutExceptions
onException $ \(e :: SomeException) -> do
liftIO $ print e
let Connection{closures=closures,closChildren= rmap}= conn
liftIO $ do
modifyMVar_ closures $ const $ return M.empty
writeIORef rmap M.empty
topState >>= showThreads
killBranch
setState conn{connData=Just (Node2Node (PortNumber port) sock addr)}
maybeTLSServerHandshake sock input
(method,uri, headers) <- receiveHTTPHead
case method of
"CLOS" ->
do
conn <- getSData
sendRaw conn "OK"
parallelReadHandler
_ -> do
let uri'= BC.tail $ uriPath uri
if "api/" `BC.isPrefixOf` uri'
then do
log <- return $ Exec: (Var $ IDyns $ BS.unpack method):(map (Var . IDyns ) $ split $ BC.unpack $ BC.drop 4 uri')
str <- giveData <|> error "no api data"
log' <- case (method,lookup "Content-Type" headers) of
("POST",Just "application/x-www-form-urlencoded") -> do
len <- read <$> BC.unpack
<$> (Transient $ return (lookup "Content-Length" headers))
setData $ ParseContext (return mempty) $ BS.take len str
postParams <- parsePostUrlEncoded <|> return []
return $ log ++ [(Var . IDynamic $ postParams)]
_ -> return $ log
return $ SMore (0,0, log' )
else do
servePages (method, uri', headers)
conn <- getSData
sconn <- makeWebsocketConnection conn uri headers
setData conn{connData= Just (Node2Web sconn) ,closChildren=chs}
do
return ()
r <- parallel $ do
msg <- WS.receiveData sconn
case reads $ BS.unpack msg of
[] -> do
let log =Exec: [Var $ IDynamic (msg :: BS.ByteString)]
return $ SMore (0,0,log)
((x ,_):_) -> return (x :: StreamData (Int,Int,[LogElem]))
case r of
SError e -> do
finish (Just e)
_ -> return r
where
uriPath = BC.dropWhile (/= '/')
split []= []
split ('/':r)= split r
split s=
let (h,t) = span (/= '/') s
in h: split t
#endif
listenResponses :: Loggable a => TransIO (StreamData a)
listenResponses= do
(conn, parsecontext, node) <- getMailbox
labelState $ "listen from: "++ show node
setData conn
#ifndef ghcjs_HOST_OS
setData (parsecontext :: ParseContext BS.ByteString)
#else
setData (parsecontext :: ParseContext JSString)
#endif
cutExceptions
onException (\(e:: SomeException) -> do
liftIO $ print e
liftIO $ putStr "removing2 node: " >> print node
nodes <- getNodes
setNodes $ nodes \\ [node]
killChilds
let Connection{closures=closures}= conn
liftIO $ modifyMVar_ closures $ const $ return M.empty)
mread conn
type IdClosure= Int
newtype Closure= Closure IdClosure deriving Show
execLog mlog = Transient $
case mlog of
SError e -> do
runTrans $ back e
return Nothing
SDone -> runTrans(back $ ErrorCall "SDone") >> return Nothing
SMore r -> process r False
SLast r -> process r True
where
process (closl,closr,log) deleteClosure= do
conn@Connection {closures=closures} <- getData `onNothing` error "Listen: myNode not set"
if closl== 0 then do
setData $ Log True log $ reverse log
setData $ Closure closr
return $ Just ()
else do
mcont <- liftIO $ modifyMVar closures
$ \map -> return (if deleteClosure then
M.delete closl map
else map, M.lookup closl map)
case mcont of
Nothing -> do
runTrans $ msend conn $ SLast (closr,closl, [] :: [()] )
error ("request received for non existent closure: "
++ show closl)
Just (fulLog,cont) -> liftIO $ runStateT (do
let nlog= reverse log ++ fulLog
setData $ Log True log nlog
setData $ Closure closr
runCont cont) cont
return Nothing
#ifdef ghcjs_HOST_OS
listen node = onAll $ do
addNodes [node]
events <- liftIO $ newIORef M.empty
conn <- defConnection >>= \c -> return c{myNode=node,comEvent=events}
setData conn
r <- listenResponses
execLog r
#endif
type Pool= [Connection]
type Package= String
type Program= String
type Service= (Package, Program)
data ParseContext a = IsString a => ParseContext (IO a) a deriving Typeable
#ifndef ghcjs_HOST_OS
parallelReadHandler :: Loggable a => TransIO (StreamData a)
parallelReadHandler= do
str <- giveData :: TransIO BS.ByteString
r <- choose $ readStream str
return r
where
readStream :: (Typeable a, Read a) => BS.ByteString -> [StreamData a]
readStream s= readStream1 $ BS.unpack s
where
readStream1 s=
let [(x,r)] = reads s
in x : readStream1 r
readFrom Connection{connData= Just(TLSNode2Node ctx)}= recvTLSData ctx
readFrom Connection{connData= Just(Node2Node _ sock _)} = toStrict <$> loop
where
bufSize= 4098
loop :: IO BL.ByteString
loop = unsafeInterleaveIO $ do
s <- SBS.recv sock bufSize
if BC.length s < bufSize
then return $ BLC.Chunk s mempty
else BLC.Chunk s `liftM` loop
readFrom _ = error "readFrom error"
toStrict= B.concat . BS.toChunks
makeWSStreamFromConn conn= do
let rec= readFrom conn
send= sendRaw conn
makeStream
(do
bs <- rec
return $ if BC.null bs then Nothing else Just bs)
(\mbBl -> case mbBl of
Nothing -> return ()
Just bl -> send bl)
makeWebsocketConnection conn uri headers= liftIO $ do
stream <- makeWSStreamFromConn conn
let
pc = WS.PendingConnection
{ WS.pendingOptions = WS.defaultConnectionOptions
, WS.pendingRequest = NWS.RequestHead uri headers False
, WS.pendingOnAccept = \_ -> return ()
, WS.pendingStream = stream
}
sconn <- WS.acceptRequest pc
WS.forkPingThread sconn 30
return sconn
servePages (method,uri, headers) = do
conn <- getSData <|> error " servePageMode: no connection"
if isWebSocketsReq headers
then return ()
else do
let file= if BC.null uri then "index.html" else uri
mcontent <- liftIO $ (Just <$> BL.readFile ( "./static/out.jsexe/"++ BC.unpack file))
`catch` (\(e:: SomeException) -> return Nothing)
case mcontent of
Just content -> liftIO $ sendRaw conn $
"HTTP/1.0 200 OK\nContent-Type: text/html\nConnection: close\nContent-Length: "
<> BS.pack (show $ BL.length content) <>"\n\n" <> content
Nothing ->liftIO $ sendRaw conn $ BS.pack $ "HTTP/1.0 404 Not Found\nContent-Length: 0\nConnection: close\n\n"
empty
api :: TransIO BS.ByteString -> Cloud ()
api w= Cloud $ do
conn <- getSData <|> error "api: Need a connection opened with initNode, listen, simpleWebApp"
let send= sendRaw conn
r <- w
liftIO $ myThreadId >>= print
send r
isWebSocketsReq = not . null
. filter ( (== mk "Sec-WebSocket-Key") . fst)
data HTTPMethod= GET | POST deriving (Read,Show,Typeable)
receiveHTTPHead = do
(method, uri, vers) <- (,,) <$> getMethod <*> getUri <*> getVers
headers <- manyTill paramPair (string "\r\n\r\n")
return (method, toStrict uri, headers)
where
string :: BS.ByteString -> TransIO BS.ByteString
string s=withData $ \str -> do
let len= BS.length s
ret@(s',str') = BS.splitAt len str
if s == s'
then return ret
else empty
paramPair= (,) <$> (mk <$> getParam) <*> getParamValue
manyTill p end = scan
where
scan = do{ end; return [] }
<|>
do{ x <- p; xs <- scan; return (x:xs) }
getMethod= getString
getUri= getString
getVers= getString
getParam= do
dropSpaces
r <- tTakeWhile (\x -> x /= ':' && not (endline x))
if BS.null r || r=="\r" then empty else dropChar >> return (toStrict r)
getParamValue= toStrict <$> ( dropSpaces >> tTakeWhile (\x -> not (endline x)))
dropSpaces= parse $ \str ->((),BS.dropWhile isSpace str)
dropChar= parse $ \r -> ((), BS.tail r)
endline c= c== '\n' || c =='\r'
type PostParams = [(BS.ByteString, String)]
parsePostUrlEncoded :: TransIO PostParams
parsePostUrlEncoded= do
dropSpaces
many $ (,) <$> param <*> value
where
param= tTakeWhile' ( /= '=')
value= unEscapeString <$> BS.unpack <$> tTakeWhile' ( /= '&')
getString= do
dropSpaces
tTakeWhile (not . isSpace)
tTakeWhile :: (Char -> Bool) -> TransIO BS.ByteString
tTakeWhile cond= parse (BS.span cond)
tTakeWhile' :: (Char -> Bool) -> TransIO BS.ByteString
tTakeWhile' cond= parse ((\(h,t) -> (h, if BS.null t then t else BS.tail t)) . BS.span cond)
parse :: (BS.ByteString -> (b, BS.ByteString)) -> TransIO b
parse split= withData $ \str ->
if str== mempty then empty
else return $ split str
withData :: (BS.ByteString -> TransIO (a,BS.ByteString)) -> TransIO a
withData parser= Transient $ do
ParseContext readMore s <- getData `onNothing` error "parser: no context"
let loop = unsafeInterleaveIO $ do
r <- readMore
(r <>) `liftM` loop
str <- liftIO $ (s <> ) `liftM` loop
mr <- runTrans $ parser str
case mr of
Nothing -> return Nothing
Just (v,str') -> do
setData $ ParseContext readMore str'
return $ Just v
giveData =noTrans $ do
ParseContext readMore s <- getData `onNothing` error "parser: no context"
:: StateIO (ParseContext BS.ByteString)
let loop = unsafeInterleaveIO $ do
r <- readMore
(r <>) `liftM` loop
liftIO $ (s <> ) `liftM` loop
#endif
#ifdef ghcjs_HOST_OS
isBrowserInstance= True
api _= empty
#else
isBrowserInstance= False
#endif
emptyPool :: MonadIO m => m (MVar Pool)
emptyPool= liftIO $ newMVar []
createNodeServ :: HostName -> Integer -> [Service] -> IO Node
createNodeServ h p svs= do
pool <- emptyPool
return $ Node h ( fromInteger p) pool svs
createNode :: HostName -> Integer -> IO Node
createNode h p= createNodeServ h p []
createWebNode :: IO Node
createWebNode= do
pool <- emptyPool
return $ Node "webnode" ( fromInteger 0) pool [("webnode","")]
instance Eq Node where
Node h p _ _ ==Node h' p' _ _= h==h' && p==p'
instance Show Node where
show (Node h p _ servs )= show (h,p, servs)
instance Read Node where
readsPrec _ s=
let r= readsPrec' 0 s
in case r of
[] -> []
[((h,p,ss),s')] -> [(Node h p empty
( ss),s')]
where
empty= unsafePerformIO emptyPool
nodeList :: TVar [Node]
nodeList = unsafePerformIO $ newTVarIO []
deriving instance Ord PortID
errorMyNode f= error $ f ++ ": Node not set. initialize it with connect, listen, initNode..."
getMyNode :: TransIO Node
getMyNode = do
Connection{myNode= node} <- getSData <|> errorMyNode "getMyNode" :: TransIO Connection
return node
getNodes :: MonadIO m => m [Node]
getNodes = liftIO $ atomically $ readTVar nodeList
addNodes :: [Node] -> TransIO ()
addNodes nodes= do
my <- getMyNode
liftIO . atomically $ do
prevnodes <- readTVar nodeList
writeTVar nodeList $ my: (( nub $ nodes ++ prevnodes) \\[my])
setNodes nodes= liftIO $ atomically $ writeTVar nodeList $ nodes
shuffleNodes :: MonadIO m => m [Node]
shuffleNodes= liftIO . atomically $ do
nodes <- readTVar nodeList
let nodes'= tail nodes ++ [head nodes]
writeTVar nodeList nodes'
return nodes'
connect :: Node -> Node -> Cloud ()
#ifndef ghcjs_HOST_OS
connect node remotenode = do
listen node <|> return ()
connect' remotenode
connect' remotenode= do
nodes <- local getNodes
localIO $ putStrLn $ "connecting to: "++ show remotenode
newNodes <- runAt remotenode $ do
local $ do
conn@Connection{} <- getSData <|>
error ("connect': need to be connected to a node: use wormhole/connect/listen")
let nodeConnecting= head nodes
liftIO $ modifyMVar_ (connection nodeConnecting) $ const $ return [conn]
onException $ \(e :: SomeException) -> do
liftIO $ putStrLn "removing node: ">> print nodeConnecting
nodes <- getNodes
setNodes $ nodes \\ [nodeConnecting]
mclustered . local $ addNodes nodes
local $ do
allNodes <- getNodes
liftIO $ putStrLn "Known nodes: " >> print allNodes
return allNodes
local $ addNodes[remotenode]
callNodes' nodes (<>) mempty $ local $ addNodes newNodes
local $ do
nodes <- getNodes
liftIO $ putStrLn "Known nodes: " >> print nodes
#else
connect _ _= empty
connect' _ = empty
#endif