-------------------------------------------------------------------------- -- -- Module : Transient.Move.Internals -- Copyright : -- License : MIT -- -- Maintainer : agocorona@gmail.com -- Stability : -- Portability : -- -- ----------------------------------------------------------------------------- {-# LANGUAGE DeriveDataTypeable , ExistentialQuantification, OverloadedStrings,FlexibleInstances, UndecidableInstances ,ScopedTypeVariables, StandaloneDeriving, RecordWildCards, FlexibleContexts, CPP ,GeneralizedNewtypeDeriving #-} module Transient.Move.Internals where import Prelude hiding(drop,length) import Transient.Internals import Transient.Parse import Transient.Logged import Transient.Indeterminism import Transient.Mailboxes import Data.Typeable import Control.Applicative import System.Random import Data.String import qualified Data.ByteString.Char8 as BC import qualified Data.ByteString.Lazy.Char8 as BS import System.Time import Data.ByteString.Builder #ifndef ghcjs_HOST_OS import Network --- import Network.Info import Network.URI --import qualified Data.IP as IP import qualified Network.Socket as NS import qualified Network.BSD as BSD import qualified Network.WebSockets as NWS -- S(RequestHead(..)) import qualified Network.WebSockets.Connection as WS import Network.WebSockets.Stream hiding(parse) import qualified Data.ByteString as B(ByteString) import qualified Data.ByteString.Lazy.Internal as BLC import qualified Data.ByteString.Lazy as BL import Network.Socket.ByteString as SBS(sendMany,sendAll,recv) import qualified Network.Socket.ByteString.Lazy as SBSL import Data.CaseInsensitive(mk,CI) import Data.Char import Data.Aeson import qualified Data.ByteString.Base64.Lazy as B64 -- import System.Random #else import JavaScript.Web.WebSocket import qualified JavaScript.Web.MessageEvent as JM import GHCJS.Prim (JSVal) import GHCJS.Marshal(fromJSValUnchecked) import qualified Data.JSString as JS -- import Data.Text.Encoding import JavaScript.Web.MessageEvent.Internal import GHCJS.Foreign.Callback.Internal (Callback(..)) import qualified GHCJS.Foreign.Callback as CB --import Data.JSString (JSString(..), pack,drop,length) #endif import Control.Monad.State import Control.Monad.Fail import Control.Exception hiding (onException,try) import Data.Maybe --import Data.Hashable import System.IO.Unsafe import Control.Concurrent.STM as STM import Control.Concurrent.MVar import Data.Monoid import qualified Data.Map as M import Data.List (partition,union,(\\),length, nubBy,isPrefixOf) -- (nub,(\\),intersperse, find, union, length, partition) --import qualified Data.List(length) import Data.IORef import Control.Concurrent import System.Mem.StableName import Unsafe.Coerce import System.Environment {- TODO timeout for closures: little smaller in sender than in receiver -} --import System.Random pk= BS.pack up= BS.unpack #ifdef ghcjs_HOST_OS type HostName = String newtype PortID = PortNumber Int deriving (Read, Show, Eq, Typeable) #endif data Node= Node{ nodeHost :: HostName , nodePort :: Int , connection :: Maybe (MVar Pool) , nodeServices :: [Service] } deriving (Typeable) instance Loggable Node instance Ord Node where compare node1 node2= compare (nodeHost node1,nodePort node1)(nodeHost node2,nodePort node2) -- The cloud monad is a thin layer over Transient in order to make sure that the type system -- forces the logging of intermediate results newtype Cloud a= Cloud {runCloud' ::TransIO a} deriving (AdditionalOperators,Functor, #ifdef MIN_VERSION_base(4,11,0) Semigroup, #endif Monoid ,Applicative, Alternative,MonadFail, Monad, Num, Fractional, MonadState EventF) {- instance Applicative Cloud where pure a = Cloud $ return a Cloud mf <*> Cloud mx = do -- bp <- getData `onNothing` error "no backpoint" -- local $ onExceptionPoint bp $ \(CloudException _ _ _) -> continue r1 <- onAll . liftIO $ newIORef Nothing r2 <- onAll . liftIO $ newIORef Nothing onAll $ fparallel r1 r2 <|> xparallel r1 r2 where fparallel r1 r2= do f <- mf liftIO $ (writeIORef r1 $ Just f) mr <- liftIO (readIORef r2) case mr of Nothing -> empty Just x -> return $ f x xparallel r1 r2 = do mr <- liftIO (readIORef r1) case mr of Nothing -> do p <- gets execMode if p== Serial then empty else do x <- mx liftIO $ (writeIORef r2 $ Just x) mr <- liftIO (readIORef r1) case mr of Nothing -> empty Just f -> return $ f x Just f -> do x <- mx liftIO $ (writeIORef r2 $ Just x) return $ f x -} type UPassword= BS.ByteString type Host= BS.ByteString type ProxyData= (UPassword,Host,Int) rHTTPProxy= unsafePerformIO $ newIORef (Nothing :: Maybe (Maybe ProxyData, Maybe ProxyData)) getHTTProxyParams t= do mp <- liftIO $ readIORef rHTTPProxy case mp of Just (p1,p2) -> return $ if t then p2 else p1 Nothing -> do ps <- (,) <$> getp "http" <*> getp "https" liftIO $ writeIORef rHTTPProxy $ Just ps getHTTProxyParams t where getp t= do let var= t ++ "_proxy" p<- liftIO $ lookupEnv var tr ("proxy",p) case p of Nothing -> return Nothing Just hp -> do pr<- withParseString (BS.pack hp) $ do tDropUntilToken (BS.pack "//") <|> return () (,,) <$> getUPass <*> tTakeWhile' (/=':') <*> int return $ Just pr getUPass= tTakeUntilToken "@" <|> return "" -- | Execute a distributed computation inside a TransIO computation. -- All the computations in the TransIO monad that enclose the cloud computation must be `logged` runCloud :: Cloud a -> TransIO a runCloud x= do closRemote <- getState <|> return (Closure 0) runCloud' x <*** setState closRemote --instance Monoid a => Monoid (Cloud a) where -- f mappend x y = mappend <$> x <*> y -- mempty= return mempty #ifndef ghcjs_HOST_OS --- empty Hooks for TLS {-# NOINLINE tlsHooks #-} tlsHooks ::IORef (Bool ,SData -> BS.ByteString -> IO () ,SData -> IO B.ByteString ,NS.Socket -> BS.ByteString -> TransIO () ,String -> NS.Socket -> BS.ByteString -> TransIO () ,SData -> IO ()) tlsHooks= unsafePerformIO $ newIORef ( False , notneeded , notneeded , \_ i -> tlsNotSupported i , \_ _ _-> return() , \_ -> return()) where notneeded= error "TLS hook function called" tlsNotSupported input = do if ((not $ BL.null input) && BL.head input == 0x16) then do conn <- getSData sendRaw conn $ BS.pack $ "HTTP/1.0 525 SSL Handshake Failed\r\nContent-Length: 0\nConnection: close\r\n\r\n" else return () (isTLSIncluded,sendTLSData,recvTLSData,maybeTLSServerHandshake,maybeClientTLSHandshake,tlsClose)= unsafePerformIO $ readIORef tlsHooks #endif -- | Means that this computation will be executed in the current node. the result will be logged -- so the closure will be recovered if the computation is translated to other node by means of -- primitives like `beamTo`, `forkTo`, `runAt`, `teleport`, `clustered`, `mclustered` etc local :: Loggable a => TransIO a -> Cloud a local = Cloud . logged --stream :: Loggable a => TransIO a -> Cloud (StreamVar a) --stream= Cloud . transport -- #ifndef ghcjs_HOST_OS -- | Run a distributed computation inside the IO monad. Enables asynchronous -- console input (see 'keep'). runCloudIO :: Typeable a => Cloud a -> IO (Maybe a) runCloudIO (Cloud mx)= keep mx -- | Run a distributed computation inside the IO monad with no console input. runCloudIO' :: Typeable a => Cloud a -> IO (Maybe a) runCloudIO' (Cloud mx)= keep' mx -- #endif -- | alternative to `local` It means that if the computation is translated to other node -- this will be executed again if this has not been executed inside a `local` computation. -- -- > onAll foo -- > local foo' -- > local $ do -- > bar -- > runCloud $ do -- > onAll baz -- > runAt node .... -- > runAt node' ..... -- -- foo bar and baz will e executed locally. -- But foo will be executed remotely also in node' while foo' bar and baz don't. -- -- onAll :: TransIO a -> Cloud a onAll = Cloud -- | only executes if the result is demanded. It is useful when the conputation result is only used in -- the remote node, but it is not serializable. All the state changes executed in the argument with -- `setData` `setState` etc. are lost lazy :: TransIO a -> Cloud a lazy mx= onAll $ do st <- get return $ fromJust $ unsafePerformIO $ runStateT (runTrans mx) st >>= return .fst -- | executes a non-serilizable action in the remote node, whose result can be used by subsequent remote invocations fixRemote mx= do r <- lazy mx fixClosure return r -- | subsequent remote invocatioms will send logs to this closure. Therefore logs will be shorter. -- -- Also, non serializable statements before it will not be re-executed fixClosure= atRemote $ local $ async $ return () -- log the result a cloud computation. Like `loogged`, this erases all the log produced by computations -- inside and substitute it for that single result when the computation is completed. loggedc :: Loggable a => Cloud a -> Cloud a loggedc (Cloud mx)= Cloud $ do closRemote <- getState <|> return (Closure 0 ) (fixRemote :: Maybe LocalFixData) <- getData logged mx <*** do setData closRemote when (isJust fixRemote) $ setState (fromJust fixRemote) loggedc' :: Loggable a => Cloud a -> Cloud a loggedc' (Cloud mx)= Cloud $ do fixRemote :: Maybe LocalFixData <- getData logged mx <*** (when (isJust fixRemote) $ setState (fromJust fixRemote)) -- | the `Cloud` monad has no `MonadIO` instance. `lliftIO= local . liftIO` lliftIO :: Loggable a => IO a -> Cloud a lliftIO= local . liftIO -- | `localIO = lliftIO` localIO :: Loggable a => IO a -> Cloud a localIO= lliftIO -- | continue the execution in a new node beamTo :: Node -> Cloud () beamTo node = wormhole node teleport -- | execute in the remote node a process with the same execution state forkTo :: Node -> Cloud () forkTo node= beamTo node <|> return() -- | open a wormhole to another node and executes an action on it. -- currently by default it keep open the connection to receive additional requests -- and responses (streaming) callTo :: Loggable a => Node -> Cloud a -> Cloud a callTo node remoteProc= wormhole' node $ atRemote remoteProc #ifndef ghcjs_HOST_OS -- | A connectionless version of callTo for long running remote calls callTo' :: (Show a, Read a,Typeable a) => Node -> Cloud a -> Cloud a callTo' node remoteProc= do mynode <- local $ getNodes >>= return . Prelude.head beamTo node r <- remoteProc beamTo mynode return r #endif -- | Within a connection to a node opened by `wormhole`, it run the computation in the remote node and return -- the result back to the original node. -- -- If `atRemote` is executed in the remote node, then the computation is executed in the original node -- -- > wormhole node2 $ do -- > t <- atRemote $ do -- > r <- foo -- executed in node2 -- > s <- atRemote bar r -- executed in the original node -- > baz s -- in node2 -- > bat t -- in the original node atRemote :: Loggable a => Cloud a -> Cloud a atRemote proc= loggedc' $ do --modify $ \s -> s{execMode=Parallel} teleport -- !> "teleport 1111" modify $ \s -> s{execMode= if execMode s== Parallel then Parallel else Serial} -- modifyData' f1 Serial {- local $ noTrans $ do cont <- get let loop=do chs <- liftIO $ readMVar $ children $ fromJust $ parent cont tr ("THREADS ***************", length chs) threadDelay 1000000 loop liftIO $ forkIO loop return() -} r <- loggedc $ proc <** modify (\s -> s{execMode= Remote}) -- setData Remote teleport -- !> "teleport 2222" return r -- | Execute a computation in the node that initiated the connection. -- -- if the sequence of connections is n1 -> n2 -> n3 then `atCallingNode $ atCallingNode foo` in n3 -- would execute `foo` in n1, -- while `atRemote $ atRemote foo` would execute it in n3 -- atCallingNode :: Loggable a => Cloud a -> Cloud a -- atCallingNode proc= connectCaller $ atRemote proc -- | synonymous of `callTo` runAt :: Loggable a => Node -> Cloud a -> Cloud a runAt= callTo -- | run a single thread with that action for each connection created. -- When the same action is re-executed within that connection, all the threads generated by the previous execution -- are killed -- -- > box <- foo -- > r <- runAt node . local . single $ getMailbox box -- > localIO $ print r -- -- if foo return different mainbox indentifiers, the above code would print the -- messages of the last one. -- Without single, it would print the messages of all of them since each call would install a new `getMailBox` for each one of them single :: TransIO a -> TransIO a single f= do cutExceptions Connection{closChildren=rmap} <- getSData <|> error "single: only works within a connection" mapth <- liftIO $ readIORef rmap id <- liftIO $ f `seq` makeStableName f >>= return . hashStableName case M.lookup id mapth of Just tv -> liftIO $ killBranch' tv Nothing -> return () tv <- get f <** do id <- liftIO $ makeStableName f >>= return . hashStableName liftIO $ modifyIORef rmap $ \mapth -> M.insert id tv mapth -- | run an unique continuation for each connection. The first thread that execute `unique` is -- executed for that connection. The rest are ignored. unique :: TransIO a -> TransIO a unique f= do Connection{closChildren=rmap} <- getSData <|> error "unique: only works within a connection. Use wormhole" mapth <- liftIO $ readIORef rmap id <- liftIO $ f `seq` makeStableName f >>= return . hashStableName let mx = M.lookup id mapth case mx of Just _ -> empty Nothing -> do tv <- get liftIO $ modifyIORef rmap $ \mapth -> M.insert id tv mapth f -- | A wormhole opens a connection with another node anywhere in a computation. -- `teleport` uses this connection to translate the computation back and forth between the two nodes connected. -- If the connection fails, it search the network for suitable relay nodes to reach the destination node. wormhole node comp= do onAll $ onException $ \(e@(ConnectionError "no connection" nodeerr)) -> if nodeerr== node then do runCloud' $ findRelay node ; continue else return () wormhole' node comp where findRelay node = do relaynode <- exploreNetUntil $ do nodes <- local getNodes let thenode= filter (== node) nodes if not (null thenode) && isJust(connection $ Prelude.head thenode ) then return $ Prelude.head nodes else empty local $ addNodes [node{nodeServices= {-nodeServices node ++ -} [[("relay", show (nodeHost (relaynode :: Node),nodePort relaynode ))]]}] -- when the first teleport has been sent within a wormhole, the -- log sent should be the segment not send in the previous teleport newtype DialogInWormholeInitiated= DialogInWormholeInitiated Bool -- | wormhole without searching for relay nodes. wormhole' :: Loggable a => Node -> Cloud a -> Cloud a wormhole' node (Cloud comp) = local $ Transient $ do moldconn <- getData :: StateIO (Maybe Connection) mclosure <- getData :: StateIO (Maybe Closure) mdialog <- getData :: StateIO (Maybe ( Ref DialogInWormholeInitiated)) -- when (isJust moldconn) . setState $ ParentConnection (fromJust moldconn) mclosure labelState $ "wormhole" <> BC.pack (show node) log <- getLog if not $ recover log then runTrans $ (do conn <- mconnect node liftIO $ writeIORef (remoteNode conn) $ Just node setData conn{synchronous= maybe False id $ fmap synchronous moldconn, calling= True} setState $ Closure 0 newRState $ DialogInWormholeInitiated False --lhls <- liftIO $ atomicModifyIORef (wormholes conn) $ \hls -> ((ref:hls),length hls) --tr ("LENGTH HLS",lhls) comp ) <*** do when (isJust moldconn) . setData $ fromJust moldconn when (isJust mclosure) . setData $ fromJust mclosure when (isJust mdialog) . setData $ fromJust mdialog -- <** is not enough since comp may be reactive else do -- tr "YES REC" let conn = fromMaybe (error "wormhole: no connection in remote node") moldconn setData $ conn{calling= False} runTrans $ comp <*** do when (isJust mclosure) . setData $ fromJust mclosure -- #ifndef ghcjs_HOST_OS -- type JSString= String -- pack= id -- #endif data CloudException = CloudException Node IdClosure String deriving (Typeable, Show, Read) instance Exception CloudException -- | set remote invocations synchronous -- this is necessary when data is transfered very fast from node to node in a stream non-deterministically -- in order to keep the continuation of the calling node unchanged until the arrival of the response -- since all the calls share a single continuation in the calling node. -- -- If there is no response from the remote node, the streaming is interrupted -- -- > main= keep $ initNode $ onBrowser $ do -- > local $ setSynchronous True -- > line <- local $ threads 0 $ choose[1..10::Int] -- > localIO $ print ("1",line) -- > atRemote $ localIO $ print line -- > localIO $ print ("2", line) setSynchronous :: Bool -> TransIO () setSynchronous sync= do modifyData'(\con -> con{synchronous=sync}) (error "setSynchronous: no communication data") return () -- set synchronous mode for remote calls within a cloud computation and also avoid unnecessary -- thread creation syncStream :: Cloud a -> Cloud a syncStream proc= do sync <- local $ do Connection{synchronous= synchronous} <- modifyData'(\con -> con{synchronous=True}) err return synchronous Cloud $ threads 0 $ runCloud' proc <*** modifyData'(\con -> con{synchronous=sync}) err where err= error "syncStream: no communication data" teleport :: Cloud () teleport = do modify $ \s -> s{execMode=if execMode s == Remote then Remote else Parallel} local $ do conn@Connection{connData=contype, synchronous=synchronous, localClosures= localClosures} <- getData `onNothing` error "teleport: No connection defined: use wormhole" -- onException $ \(e :: IOException ) -> do -- to retry the connection in case of failure -- tr ("teleport:", e) -- should be three tries at most -- liftIO $ writeIORef contype Nothing -- mclose conn -- msend will open a new connection. move that open here? -- continue Transient $ do labelState "teleport" cont <- get log <- getLog if not $ recover log -- !> ("teleport rec,loc fulLog=",rec,log,fulLog) -- if is not recovering in the remote node then it is active then do -- when a node call itself, there is no need of socket communications ty <- liftIO $ readIORef contype case ty of Just Self -> runTrans $ do modify $ \s -> s{execMode= Parallel} -- setData Parallel abduce -- !> "SELF" -- call himself liftIO $ do remote <- readIORef $ remoteNode conn writeIORef (myNode conn) $ fromMaybe (error "teleport: no connection?") remote _ -> do --read this Closure DialogInWormholeInitiated initiated <- getRData `onNothing` return(DialogInWormholeInitiated True) --detecta si ya ha enviado en un mismo wormhole -- como detectar eso sin usar Closure? -- un Rflag en estado ejecución --tr("INITIATED",initiated,closRemote/=0) (closRemote',tosend) <- if initiated -- for localFix then do Closure closRemote <- getData `onNothing` return (Closure 0 ) tr ("REMOTE CLOSURE",closRemote) return (closRemote, buildLog log) else do mfix <- getData -- mirar globalFix tr ("mfix", mfix) let droplog Nothing= return (0, fulLog log) droplog (Just localfix)= do sent <- liftIO $ atomicModifyIORef' (fixedConnections localfix) $ \list -> do let n= idConn conn if n `Prelude.elem` list then (list, True) else (n:list,False) tr ("LOCALFIXXXXXXXXXX",localfix) let dropped= lazyByteString $ BS.drop (fromIntegral $ lengthFix localfix) $ toLazyByteString $ fulLog log if sent then return (closure localfix, dropped) else if isService localfix then return (0, dropped) else droplog $ prevFix localfix -- look for other previous closure sent droplog mfix let closLocal= hashClosure log map <- liftIO $ readMVar localClosures let mr = M.lookup closLocal map pair <- case mr of -- for synchronous streaming Just (chs,clos,mvar,_) -> do when synchronous $ liftIO $ takeMVar mvar -- tr ("TELEPORT removing", (Data.List.length $unsafePerformIO $ readMVar chs)-1) --ths <- liftIO $ readMVar (children cont) --liftIO $ when (length ths > 1)$ mapM_ (killChildren . children) $ tail ths --runTrans $ msend conn $ SLast (ClosureData closRemote' closLocal mempty) --no se llama se hace asincronamente en el blucle loopclosures return (children ${- fromJust $ parent -} cont,clos,mvar,cont) _ -> liftIO $ do mv <- newEmptyMVar; return ( children $ fromJust $ parent cont,closRemote',mv,cont) liftIO $ modifyMVar_ localClosures $ \map -> return $ M.insert closLocal pair map -- The log sent is in the order of execution. log is in reverse order -- send log with closure ids at head --tr ("MSEND --------->------>", SMore (unsafePerformIO $ readIORef $ remoteNode conn,closRemote',closLocal,toLazyByteString tosend)) runTrans $ msend conn $ SMore $ ClosureData closRemote' closLocal tosend return Nothing else return $ Just () {- | One problem of forwarding closures for streaming is that it could transport not only the data but extra information that reconstruct the closure in the destination node. In a single in-single out interaction It may not be a problem, but think, for example, when I have to synchronize N editors by forwarding small modifications, or worst of all, when transmitting packets of audio or video. But the size of the closure, that is, the amount of variables that I have to transport increases when the code is more complex. But transient build closures upon closures, so It has to send only what has changed since the last interaction. In one-to-one interactions whithin a wormhole, this is automatic, but when there are different wormholes involved, it is necessary to tell explicitly what is the closure that will continue the execution. this is what `localFix` does. otherwise it will use the closure 0. > main= do > filename <- local input > source <- atServer $ local $ readFile filename > local $ render source inEditor > -- send upto here one single time please, so I only stream the deltas > localFix > delta <- react onEachChange > forallNodes $ update delta if forwardChanges send to all the nodes editing the document, the data necessary to reconstruct the closure would include even the source code of the file on EACH change. Fortunately it is possible to fix a closure that will not change in all the remote nodes so after that, I only have to send the only necessary variable, the delta. This is as efficient as an hand-made socket write/forkThread/readSocket loop for each node. -} localFix= localFixServ False False type ConnectionId= Int type HasClosed= Bool -- for each connection, the list of closures fixed and the list of connections which created that closure in the remote node -- unificar para todas las conexiones -- pero como se sabe si una closure global aplica a un envio despues de una desconexion? -- el programa tiene que pasar por esa globalClosure, -- si solo se ha perdido la conexión, tiene estado y puede utilizarla -- si ha rearrancado, ha ejecutado hasta ahi y tiene que reconstruir su estado de localFix globalFix = unsafePerformIO $ newIORef (M.empty :: M.Map ConnectionId (HasClosed,[(IdClosure, IORef [ConnectionId ])])) -- how to signal that was closed? data LocalFixData= LocalFixData{ isService :: Bool , lengthFix :: Int , closure :: Int , fixedConnections :: IORef [ConnectionId] -- List of connections that created -- that closure in the remote node , prevFix :: Maybe LocalFixData} deriving Show instance Show a => Show (IORef a) where show r= show $ unsafePerformIO $ readIORef r -- data LocalFixData= LocalFixData Bool Int Int (IORef (M.Map Int Int)) -- first flag=True assumes that the localFix closure has been created otherwise -- the first request invoke closure 0 and create the localFix closure -- further request will invoque this closure -- -- the second flag creates a closure that is invoked ever, even if localfix is re-executed. --If this second flag is false, -- a reexecution of localFix will recreate the remote closure, perhaps with different variables. localFixServ isService isGlobal= Cloud $ noTrans $ do log <- getLog Connection{..} <- getData `onNothing` error "teleport: No connection set: use initNode" if recover log then do cont <- get mv <- liftIO newEmptyMVar liftIO $ modifyMVar_ localClosures $ \map -> return $ M.insert (hashClosure log) ( children $ fromJust $ parent cont,0,mv,cont) map else do mprevFix <- getData ref <- liftIO $ if not $ isGlobal then newIORef [] else do map <- readIORef globalFix return $ do (_,l) <- M.lookup idConn map lookup (hashClosure log) l `onNothing` do ref <- newIORef [] modifyIORef globalFix $ \map -> let (closed,l)= fromMaybe (False,[]) $ M.lookup idConn map in M.insert idConn (closed,(hashClosure log, ref):l) map return ref mmprevFix <- liftIO $ readIORef ref >>= \l -> return $ if Prelude.null l then Nothing else mprevFix let newfix =LocalFixData{ isService = isService , lengthFix = fromIntegral $ BS.length $ toLazyByteString $ fulLog log , closure = hashClosure log , fixedConnections = ref , prevFix = mmprevFix} setState newfix !> ("SET LOCALFIX", newfix ) -- | forward exceptions back to the calling node reportBack :: TransIO () reportBack= onException $ \(e :: SomeException) -> do conn <- getData `onNothing` error "reportBack: No connection defined: use wormhole" Closure closRemote <- getData `onNothing` error "teleport: no closRemote" node <- getMyNode let msg= SError $ toException $ ErrorCall $ show $ show $ CloudException node closRemote $ show e msend conn msg !> "MSEND" -- | copy a session data variable from the local to the remote node. -- If there is none set in the local node, The parameter is the default value. -- In this case, the default value is also set in the local node. copyData def = do r <- local getSData <|> return def onAll $ setData r return r -- | execute a Transient action in each of the nodes connected. -- -- The response of each node is received by the invoking node and processed by the rest of the procedure. -- By default, each response is processed in a new thread. To restrict the number of threads -- use the thread control primitives. -- -- this snippet receive a message from each of the simulated nodes: -- -- > main = keep $ do -- > let nodes= map createLocalNode [2000..2005] -- > addNodes nodes -- > (foldl (<|>) empty $ map listen nodes) <|> return () -- > -- > r <- clustered $ do -- > Connection (Just(PortNumber port, _, _, _)) _ <- getSData -- > return $ "hi from " ++ show port++ "\n" -- > liftIO $ putStrLn r -- > where -- > createLocalNode n= createNode "localhost" (PortNumber n) clustered :: Loggable a => Cloud a -> Cloud a clustered proc= callNodes (<|>) empty proc -- A variant of `clustered` that wait for all the responses and `mappend` them mclustered :: (Monoid a, Loggable a) => Cloud a -> Cloud a mclustered proc= callNodes (<>) mempty proc callNodes op init proc= loggedc' $ do nodes <- local getEqualNodes callNodes' nodes op init proc callNodes' nodes op init proc= loggedc' $ Prelude.foldr op init $ Prelude.map (\node -> runAt node proc) nodes ----- #ifndef ghcjs_HOST_OS sendRawRecover con r= do c <- liftIO $ readIORef $ connData con con' <- case c of Nothing -> do tr "CLOSED CON" n <- liftIO $ readIORef $ remoteNode con case n of Nothing -> error "connection closed by caller" Just node -> do r <- mconnect' node return r Just _ -> return con sendRaw con' r `whileException` \(SomeException _)-> liftIO$ writeIORef (connData con) Nothing sendRaw con r= do let blocked= isBlocked con c <- liftIO $ readIORef $ connData con liftIO $ modifyMVar_ blocked $ const $ do tr ("sendRaw",r) case c of Just (Node2Web sconn ) -> liftIO $ WS.sendTextData sconn r Just (Node2Node _ sock _) -> SBS.sendMany sock (BL.toChunks r ) Just (TLSNode2Node ctx ) -> sendTLSData ctx r _ -> error "No connection stablished" TOD time _ <- getClockTime return $ Just time {- sendRaw (Connection _ _ _ (Just (Node2Web sconn )) _ _ _ _ _ _ _) r= liftIO $ WS.sendTextData sconn r -- !> ("NOde2Web",r) sendRaw (Connection _ _ _ (Just (Node2Node _ sock _)) _ _ blocked _ _ _ _) r= liftIO $ withMVar blocked $ const $ SBS.sendMany sock (BL.toChunks r ) -- !> ("NOde2Node",r) sendRaw (Connection _ _ _(Just (TLSNode2Node ctx )) _ _ blocked _ _ _ _) r= liftIO $ withMVar blocked $ const $ sendTLSData ctx r !> ("TLNode2Web",r) -} #else sendRaw con r= do c <- liftIO $ readIORef $ connData con case c of Just (Web2Node sconn) -> JavaScript.Web.WebSocket.send r sconn _ -> error "No connection stablished" {- sendRaw (Connection _ _ _ (Just (Web2Node sconn)) _ _ blocked _ _ _ _) r= liftIO $ withMVar blocked $ const $ JavaScript.Web.WebSocket.send r sconn -- !> "MSEND SOCKET" -} #endif data NodeMSG= ClosureData IdClosure IdClosure Builder deriving (Read, Show) instance Loggable NodeMSG where serialize (ClosureData clos clos' build)= intDec clos <> "/" <> intDec clos' <> "/" <> build deserialize= ClosureData <$> (int <* tChar '/') <*> (int <* tChar '/') <*> restOfIt where restOfIt= lazyByteString <$> giveParseString instance Show Builder where show b= BS.unpack $ toLazyByteString b instance Read Builder where readsPrec _ str= [(lazyByteString $ BS.pack $ read str,"")] instance Loggable a => Loggable (StreamData a) where serialize (SMore x)= byteString "SMore/" <> serialize x serialize (SLast x)= byteString "SLast/" <> serialize x serialize SDone= byteString "SDone" serialize (SError e)= byteString "SError/" <> serialize e deserialize = smore <|> slast <|> sdone <|> serror where smore = symbol "SMore/" >> (SMore <$> deserialize) slast = symbol "SLast/" >> (SLast <$> deserialize) sdone = symbol "SDone" >> return SDone serror= symbol "SError/" >> (SError <$> deserialize) {- en msend escribir la longitud del paquete y el paquete en mread cojer la longitud y el mensaje data Packet= Packet Int BS.ByteString deriving (Read,Show) instance Loggable Packet where serialize (Packet len msg) = intDec len <> lazyByteString msg deserialize = do len <- int Packet len <$> tTake (fromIntegral len) -} msend :: Connection -> StreamData NodeMSG -> TransIO () -- msend (Connection _ _ _ (Just Self) _ _ _ _ _ _ _) r= return () #ifndef ghcjs_HOST_OS msend con r= do tr ("MSEND --------->------>", r) c <- liftIO $ readIORef $ connData con con' <- case c of Nothing -> do tr "CLOSED CON" n <- liftIO $ readIORef $ remoteNode con case n of Nothing -> error "connection closed by caller" Just node -> do r <- mconnect node return r --case r of -- Nothing -> error $ "can not reconnect with " ++ show n -- Just c -> return c Just _ -> return con let blocked= isBlocked con' c <- liftIO $ readIORef $ connData con' let bs = toLazyByteString $ serialize r do --liftIO $ do case c of Just (TLSNode2Node ctx) -> liftIO $ modifyMVar_ blocked $ const $ do tr "TLSSSSSSSSSSS SEND" sendTLSData ctx $ toLazyByteString $ int64Dec $ BS.length bs sendTLSData ctx bs TOD time _ <- getClockTime return $ Just time Just (Node2Node _ sock _) -> liftIO $ modifyMVar_ blocked $ const $ do tr "NODE2NODE SEND" SBSL.send sock $ toLazyByteString $ int64Dec $ BS.length bs SBSL.sendAll sock bs TOD time _ <- getClockTime return $ Just time Just (HTTP2Node _ sock _) -> liftIO $ modifyMVar_ blocked $ const $ do tr "HTTP2NODE SEND" SBSL.sendAll sock $ bs <> "\r\n" TOD time _ <- getClockTime return $ Just time Just (HTTPS2Node ctx) -> liftIO $ modifyMVar_ blocked $ const $ do tr "HTTPS2NODE SEND" sendTLSData ctx $ bs <> "\r\n" TOD time _ <- getClockTime return $ Just time Just (Node2Web sconn) -> do tr "NODE2WEB" -- {-withMVar blocked $ const $ -} WS.sendTextData sconn $ serialize r -- BS.pack (show r) !> "websockets send" liftIO $ do let bs = toLazyByteString $ serialize r -- WS.sendTextData sconn $ toLazyByteString $ int64Dec $ BS.length bs tr "ANTES SEND" WS.sendTextData sconn bs -- !> ("N2N SEND", bd) tr "AFTER SEND" Just Self -> error "connection to the same node shouldn't happen, file a bug please" _ -> error "msend out of connection context: use wormhole to connect" -- return() {- msend (Connection _ _ _ (Just (Node2Node _ sock _)) _ _ blocked _ _ _ _) r=do liftIO $ withMVar blocked $ const $ do let bs = toLazyByteString $ serialize r SBSL.send sock $ toLazyByteString $ int64Dec $ BS.length bs SBSL.sendAll sock bs -- !> ("N2N SEND", bd) msend (Connection _ _ _ (Just (HTTP2Node _ sock _)) _ _ blocked _ _ _ _) r=do liftIO $ withMVar blocked $ const $ do let bs = toLazyByteString $ serialize r let len= BS.length bs lenstr= toLazyByteString $ int64Dec $ len SBSL.send sock $ "HTTP/1.0 200 OK\r\nContent-Type: text/html\r\nContent-Length: " <> lenstr -- <>"\r\n" <> "Set-Cookie:" <> "cookie=" <> cook -- <> "\r\n" <>"\r\n\r\n" SBSL.sendAll sock bs -- !> ("N2N SEND", bd) msend (Connection _ _ _ (Just (TLSNode2Node ctx)) _ _ _ _ _ _ _) r= liftIO $ do let bs = toLazyByteString $ serialize r sendTLSData ctx $ toLazyByteString $ int64Dec $ BS.length bs sendTLSData ctx bs -- !> "TLS SEND" msend (Connection _ _ _ (Just (Node2Web sconn)) _ _ _ _ _ _ _) r= -- {-withMVar blocked $ const $ -} WS.sendTextData sconn $ serialize r -- BS.pack (show r) !> "websockets send" liftIO $ do let bs = toLazyByteString $ serialize r WS.sendTextData sconn $ toLazyByteString $ int64Dec $ BS.length bs WS.sendTextData sconn bs -- !> ("N2N SEND", bd) -} #else msend con r= do tr ("MSEND --------->------>", r) let blocked= isBlocked con c <- liftIO $ readIORef $ connData con case c of Just (Web2Node sconn) -> liftIO $ do tr "MSEND BROWSER" --modifyMVar_ (isBlocked con) $ const $ do let bs = toLazyByteString $ serialize r JavaScript.Web.WebSocket.send (JS.pack $ BS.unpack bs) sconn -- TODO OPTIMIZE THAT! tr "AFTER MSEND" --TOD time _ <- getClockTime --return $ Just time _ -> error "msend out of connection context: use wormhole to connect" {- msend (Connection _ _ remoten (Just (Web2Node sconn)) _ _ blocked _ _ _ _) r= liftIO $ do withMVar blocked $ const $ do -- JavaScript.Web.WebSocket.send (serialize r) sconn -- (JS.pack $ show r) sconn !> "MSEND SOCKET" let bs = toLazyByteString $ serialize r JavaScript.Web.WebSocket.send (toLazyByteString $ int64Dec $ BS.length bs) sconn JavaScript.Web.WebSocket.send bs sconn -} #endif #ifdef ghcjs_HOST_OS mread con= do labelState "mread" sconn <- liftIO $ readIORef $ connData con case sconn of Just (Web2Node sconn) -> wsRead sconn Nothing -> error "connection not opened" --mread (Connection _ _ _ (Just (Web2Node sconn)) _ _ _ _ _ _ _)= wsRead sconn wsRead :: Loggable a => WebSocket -> TransIO a wsRead ws= do dat <- react (hsonmessage ws) (return ()) tr "received" case JM.getData dat of JM.StringData ( text) -> do setParseString $ BS.pack . JS.unpack $ text -- TODO OPTIMIZE THAT --len <- integer tr ("Browser webSocket read", text) !> "<------<----<----<------" deserialize -- return (read' $ JS.unpack str) JM.BlobData blob -> error " blob" JM.ArrayBufferData arrBuffer -> error "arrBuffer" wsOpen :: JS.JSString -> TransIO WebSocket wsOpen url= do ws <- liftIO $ js_createDefault url -- !> ("wsopen",url) react (hsopen ws) (return ()) -- !!> "react" return ws -- !!> "AFTER ReACT" foreign import javascript safe "window.location.hostname" js_hostname :: JSVal foreign import javascript safe "window.location.pathname" js_pathname :: JSVal foreign import javascript safe "window.location.protocol" js_protocol :: JSVal foreign import javascript safe "(function(){var res=window.location.href.split(':')[2];if (res === undefined){return 80} else return res.split('/')[0];})()" js_port :: JSVal foreign import javascript safe "$1.onmessage =$2;" js_onmessage :: WebSocket -> JSVal -> IO () getWebServerNode :: TransIO Node getWebServerNode = liftIO $ do h <- fromJSValUnchecked js_hostname p <- fromIntegral <$> (fromJSValUnchecked js_port :: IO Int) createNode h p hsonmessage ::WebSocket -> (MessageEvent ->IO()) -> IO () hsonmessage ws hscb= do cb <- makeCallback1 MessageEvent hscb js_onmessage ws cb foreign import javascript safe "$1.onopen =$2;" js_open :: WebSocket -> JSVal -> IO () foreign import javascript safe "$1.readyState" js_readystate :: WebSocket -> Int newtype OpenEvent = OpenEvent JSVal deriving Typeable hsopen :: WebSocket -> (OpenEvent ->IO()) -> IO () hsopen ws hscb= do cb <- makeCallback1 OpenEvent hscb js_open ws cb makeCallback1 :: (JSVal -> a) -> (a -> IO ()) -> IO JSVal makeCallback1 f g = do Callback cb <- CB.syncCallback1 CB.ContinueAsync (g . f) return cb -- makeCallback :: IO () -> IO () makeCallback f = do Callback cb <- CB.syncCallback CB.ContinueAsync f return cb foreign import javascript safe "new WebSocket($1)" js_createDefault :: JS.JSString -> IO WebSocket #else mread conn= do cc <- liftIO $ readIORef $ connData conn case cc of Just (Node2Node _ _ _) -> parallelReadHandler conn Just (TLSNode2Node _ ) -> parallelReadHandler conn -- the rest of the cases are managed by listenNew -- Just (Node2Web sconn ) -> do -- ss <- parallel $ receiveData' conn sconn -- case ss of -- SDone -> empty -- SMore s -> do -- tr ("WEBSOCKET RECEIVED", s) -- setParseString s -- --integer -- TOD t _ <- liftIO getClockTime -- liftIO $ modifyMVar_ (isBlocked conn) $ const $ Just <$> return t -- deserialize -- where -- perform timeouts and cleanup of the server when connections receiveData' a b= NWS.receiveData b -- receiveData' :: Connection -> NWS.Connection -> IO BS.ByteString -- receiveData' c conn = do -- msg <- WS.receive conn -- tr ("RECEIVED",msg) -- case msg of -- NWS.DataMessage _ _ _ am -> return $ NWS.fromDataMessage am -- NWS.ControlMessage cm -> case cm of -- NWS.Close i closeMsg -> do -- hasSentClose <- readIORef $ WS.connectionSentClose conn -- unless hasSentClose $ WS.send conn msg -- writeIORef (connData c) Nothing -- cleanConnectionData c -- empty -- NWS.Pong _ -> do -- TOD t _ <- liftIO getClockTime -- liftIO $ modifyMVar_ (isBlocked c) $ const $ Just <$> return t -- receiveData' c conn -- --NWS.connectionOnPong (WS.connectionOptions conn) -- --NWS.receiveDataMessage conn -- NWS.Ping pl -> do -- TOD t _ <- liftIO getClockTime -- liftIO $ modifyMVar_ (isBlocked c) $ const $ Just <$> return t -- WS.send conn (NWS.ControlMessage (NWS.Pong pl)) -- receiveData' c conn -- --WS.receiveDataMessage conn {- mread (Connection _ _ _ (Just (Node2Node _ _ _)) _ _ _ _ _ _ _) = parallelReadHandler -- !> "mread" mread (Connection _ _ _ (Just (TLSNode2Node _ )) _ _ _ _ _ _ _) = parallelReadHandler -- parallel $ do -- s <- recvTLSData ctx -- return . read' $ BC.unpack s mread (Connection _ _ _ (Just (Node2Web sconn )) _ _ _ _ _ _ _)= do s <- waitEvents $ WS.receiveData sconn setParseString s integer deserialize {- parallel $ do s <- WS.receiveData sconn return . read' $ BS.unpack s !> ("WS MREAD RECEIVED ----<----<------<--------", s) -} -} many' p= p <|> many' p parallelReadHandler :: Loggable a => Connection -> TransIO (StreamData a) parallelReadHandler conn= do onException $ \(e:: IOError) -> empty many' extractPacket where extractPacket= do len <- integer <|> (do s <- getParseBuffer; if BS.null s then empty else error $ show $ ("malformed data received: expected Int, received: ", BS.take 10 s)) str <- tTake (fromIntegral len) tr ("MREAD <-------<-------",str) TOD t _ <- liftIO $ getClockTime liftIO $ modifyMVar_ (isBlocked conn) $ const $ Just <$> return t abduce setParseString str deserialize {- parallelReadHandler= do str <- giveParseString :: TransIO BS.ByteString r <- choose $ readStream str return r !> ("parallel read handler read", r) !> "<-------<----------<--------<----------" where readStream :: (Typeable a, Read a) => BS.ByteString -> [StreamData a] readStream s= readStream1 $ BS.unpack s where readStream1 s= let [(x,r)] = reads s in x : readStream1 r -} getWebServerNode :: TransIO Node getWebServerNode = getNodes >>= return . Prelude.head #endif mclose :: MonadIO m => Connection -> m () #ifndef ghcjs_HOST_OS mclose con= do --c <- liftIO $ readIORef $ connData con c <- liftIO $ atomicModifyIORef (connData con) $ \c -> (Nothing,c) case c of Just (TLSNode2Node ctx) -> liftIO $ withMVar (isBlocked con) $ const $ liftIO $ tlsClose ctx Just (Node2Node _ sock _ ) -> liftIO $ withMVar (isBlocked con) $ const $ liftIO $ NS.close sock !> "SOCKET CLOSE" Just (Node2Web sconn ) -> liftIO $ WS.sendClose sconn ("closemsg" :: BS.ByteString) !> "WEBSOCkET CLOSE" _ -> return() cleanConnectionData con {- mclose (Connection _ _ _ (Just (Node2Node _ sock _ )) _ _ _ _ _ _ _)= liftIO $ NS.close sock mclose (Connection _ _ _ (Just (Node2Web sconn )) _ _ _ _ _ _ _)= liftIO $ WS.sendClose sconn ("closemsg" :: BS.ByteString) -} #else mclose con= do --c <- liftIO $ readIORef $ connData con c <- liftIO $ atomicModifyIORef (connData con) $ \c -> (Nothing,c) case c of Just (Web2Node sconn)-> liftIO $ JavaScript.Web.WebSocket.close Nothing Nothing sconn {- mclose (Connection _ _ _ (Just (Web2Node sconn)) _ _ blocked _ _ _ _)= liftIO $ JavaScript.Web.WebSocket.close Nothing Nothing sconn -} #endif #ifndef ghcjs_HOST_OS -- connection cookie rcookie= unsafePerformIO $ newIORef $ BS.pack "cookie1" #endif conSection= unsafePerformIO $ newMVar () exclusiveCon mx= do liftIO $ takeMVar conSection r <- mx liftIO $ putMVar conSection () return r -- check for cached connection and return it, otherwise tries to connect with connect1 without cookie check mconnect' :: Node -> TransIO Connection mconnect' node'= exclusiveCon $ do conn <- do node <- fixNode node' nodes <- getNodes let fnode = filter (==node) nodes case fnode of [] -> mconnect1 node -- !> "NO NODE" (node'@(Node _ _ pool _):_) -> do plist <- liftIO $ readMVar $ fromJust pool case plist of -- !> ("length", length plist,nodePort node) of (handle:_) -> do c <- liftIO $ readIORef $ connData handle if isNothing c -- was closed by timeout then mconnect1 node else return handle !> ("REUSED!", nodeHost node, nodePort node) _ -> do delNodes [node] r <- mconnect1 node tr "after mconnect1" return r -- ctx <- liftIO $ readIORef $ istream conn -- modify $ \s -> s{parseContext= ctx} -- liftIO $ print "SET PARSECONTEXT" setState conn return conn #ifndef ghcjs_HOST_OS -- effective connect trough different methods mconnect1 (node@(Node host port _ services ))= do return () !> ("MCONNECT1",host,port,isTLSIncluded) {- onException $ \(ConnectionError msg node) -> do liftIO $ do putStr msg putStr " connecting " print node continue empty -} let types=mapMaybe (lookup "type") services -- need to look in all services needTLS <- if "HTTP" `elem` types then return False else if "HTTPS" `elem` types then if not isTLSIncluded then error "no 'initTLS'. This is necessary for https connections. Please include it: main= do{ initTLS; keep ...." else return True else return isTLSIncluded -- case lookup "type" services of -- Just "HTTP" -> return False; -- Just "HTTPS" -> -- if not isTLSIncluded then error "no 'initTLS'. This is necessary for https connections. Please include it: main= do{ initTLS; keep ...." -- else return True -- _ -> return isTLSIncluded tr ("NEED TLS",needTLS) (conn,parseContext) <- checkSelf node <|> timeout 10000000 (connectNode2Node host port needTLS) <|> timeout 1000000 (connectWebSockets host port needTLS) <|> timeout 1000000 (checkRelay needTLS) <|> (throw $ ConnectionError "no connection" node) setState conn modify $ \s -> s{execMode=Serial,parseContext= parseContext} -- "write node connected in the connection" liftIO $ writeIORef (remoteNode conn) $ Just node -- "write connection in the node" liftIO $ modifyMVar_ (fromJust $ connection node) . const $ return [conn] addNodes [node] return conn where checkSelf node= do tr "CHECKSELF" node' <- getMyNodeMaybe guard $ isJust (connection node') v <- liftIO $ readMVar (fromJust $ connection node') -- to force connection in case of calling a service of itself tr "IN CHECKSELF" if node /= node' || null v then empty else do conn<- case connection node of Nothing -> error "checkSelf error" Just ref -> do rnode <- liftIO $ newIORef node' cdata <- liftIO $ newIORef $ Just Self conn <- defConnection >>= \c -> return c{myNode= rnode, connData= cdata} liftIO $ withMVar ref $ const $ return [conn] return conn return (conn, noParseContext) timeout t proc= do r <- collect' 1 t proc case r of [] -> empty !> "TIMEOUT EMPTY" mr:_ -> case mr of Nothing -> throw $ ConnectionError "Bad cookie" node Just r -> return r checkRelay needTLS= do case lookup "relay" $ map head (nodeServices node) of Nothing -> empty -- !> "NO RELAY" Just relayinfo -> do let (h,p)= read relayinfo connectWebSockets1 h p ("/relay/" ++ h ++ "/" ++ show p ++ "/") needTLS connectSockTLS host port needTLS= do return () !> "connectSockTLS" let size=8192 c@Connection{myNode=my,connData=rcdata} <- getSData <|> defConnection tr "BEFORE HANDSHAKE" sock <- liftIO $ connectTo' size host $ PortNumber $ fromIntegral port let cdata= (Node2Node u sock (error $ "addr: outgoing connection")) cdata' <- liftIO $ readIORef rcdata --input <- liftIO $ SBSL.getContents sock -- let pcontext= ParseContext (do mclose c; return SDone) input (unsafePerformIO $ newIORef False) pcontext <- makeParseContext $ SBSL.recv sock 4096 conn' <- if isNothing cdata' -- lost connection, reconnect then do liftIO $ writeIORef rcdata $ Just cdata liftIO $ writeIORef (istream c) pcontext return c !> "RECONNECT" else do c <- defConnection rcdata' <- liftIO $ newIORef $ Just cdata liftIO $ writeIORef (istream c) pcontext return c{myNode=my,connData= rcdata'} !> "CONNECT" setData conn' --modify $ \s ->s{parseContext=ParseContext (do NS.close sock ; return SDone) input} --throw $ ConnectionError "connection closed" node) input} modify $ \s ->s{execMode=Serial,parseContext=pcontext} --modify $ \s ->s{execMode=Serial,parseContext=ParseContext (SMore . BL.fromStrict <$> recv sock 1000) mempty} when (isTLSIncluded && needTLS) $ maybeClientTLSHandshake host sock mempty connectNode2Node host port needTLS= do -- onException $ \(e :: SomeException) -> empty tr "NODE 2 NODE" mproxy <- getHTTProxyParams needTLS let (upass,h',p) = case (mproxy) of Just p -> p _ -> ("",BS.pack host,port) h= BS.unpack h' if (isLocal host || h == host && p == port) then connectSockTLS h p needTLS else do let connect = "CONNECT "<> pk host <> ":" <> pk (show port) <> " HTTP/1.1\r\n" <> "Host: "<> pk host <> ":" <> BS.pack (show port) <> "\r\n" <> "User-Agent: transient\r\n" <> (if BS.null upass then "" else "Proxy-Authorization: Basic " <> (B64.encode upass)<> "\r\n") <> "Proxy-Connection: Keep-Alive\r\n" <> "\r\n" tr connect connectSockTLS h p False conn <- getSData <|> error "mconnect: no connection data" sendRaw conn $ connect first@(vers,code,_) <- getFirstLineResp -- tTakeUntilToken (BS.pack "\r\n\r\n") tr ("PROXY RESPONSE=",first) guard (BC.head code== '2') <|> do headers <- getHeaders Raw body <- parseBody headers error $ show (headers,body) -- decode the body and print when (isTLSIncluded && needTLS) $ do Just(Node2Node{socket=sock}) <- liftIO $ readIORef $ connData conn maybeClientTLSHandshake h sock mempty conn <- getSData <|> error "mconnect: no connection data" --mynode <- getMyNode parseContext <- gets parseContext return $ Just(conn,parseContext) connectWebSockets host port needTLS= connectWebSockets1 host port "/" needTLS connectWebSockets1 host port verb needTLS= do -- onException $ \(e :: SomeException) -> empty tr "WEBSOCKETS" connectSockTLS host port needTLS -- a new connection never <- liftIO $ newEmptyMVar :: TransIO (MVar ()) conn <- getSData <|> error "connectWebSockets: no connection" stream <- liftIO $ makeWSStreamFromConn conn co <- liftIO $ readIORef rcookie let hostport= host++(':': show port) headers= [("cookie", "cookie=" <> BS.toStrict co)] -- if verb =="/" then [("Host",fromString hostport)] else [] onException $ \(NWS.CloseRequest code msg) -> do conn <- getSData cleanConnectionData conn -- throw $ ConnectionError (BS.unpack msg) node empty wscon <- react (NWS.runClientWithStream stream hostport verb WS.defaultConnectionOptions headers) (takeMVar never) msg <- liftIO $ WS.receiveData wscon tr "WS RECEIVED" case msg of ("OK" :: BS.ByteString) -> do tr "return connectWebSockets" cdata <- liftIO $ newIORef $ Just $ (Node2Web wscon) return $ Just (conn{connData= cdata}, noParseContext) _ -> do tr "RECEIVED CLOSE"; liftIO $ WS.sendClose wscon ("" ::BS.ByteString); return Nothing isLocal:: String ->Bool isLocal host= host=="localhost" || (or $ map (flip isPrefixOf host) ["0.0","10.","100", "127", "169", "172", "192", "198", "203"]) || isAlphaNum (head host) && not ('.' `elem` host) -- is not a host address with dot inside: www.host.com -- >>> isLocal "titan" -- True -- makeParseContext rec= liftIO $ do done <- newIORef False let receive= liftIO $ do d <- readIORef done if d then return SDone else (do r<- rec if BS.null r then liftIO $ do writeIORef done True; return SDone else return $ SMore r) `catch` \(SomeException e) -> do liftIO $ writeIORef done True putStr "Parse: " print e return SDone return $ ParseContext receive mempty done #else mconnect1 (node@(Node host port (Just pool) _))= do conn <- getSData <|> error "connect: listen not set for this node" if nodeHost node== "webnode" then do liftIO $ writeIORef (connData conn) $ Just Self return conn else do ws <- connectToWS host $ PortNumber $ fromIntegral port -- !> "CONNECTWS" liftIO $ writeIORef (connData conn) $ Just (Web2Node ws) -- !> ("websocker CONNECION") let parseContext = ParseContext (error "parsecontext not available in the browser") "" (unsafePerformIO $ newIORef False) chs <- liftIO $ newIORef M.empty let conn'= conn{closChildren= chs} liftIO $ modifyMVar_ pool $ \plist -> return $ conn':plist return conn' #endif u= undefined data ConnectionError= ConnectionError String Node deriving (Show , Read) instance Exception ConnectionError -- check for cached connect, if not, it connects and check cookie with mconnect2 mconnect node'= do node <- fixNode node' nodes <- getNodes let fnode = filter (==node) nodes case fnode of [] -> mconnect2 node -- !> "NO NODE" [node'@(Node _ _ pool _)] -> do plist <- liftIO $ readMVar $ fromJust pool case plist of -- !> ("length", length plist,nodePort node) of (handle:_) -> do c <- liftIO $ readIORef $ connData handle if isNothing c -- was closed by timeout then mconnect2 node else return handle -- !> ("REUSED!", node) _ -> do delNodes [node] mconnect2 node where -- connect and check for connection cookie among nodes mconnect2 node= do conn <- mconnect1 node -- `catcht` \(e :: SomeException) -> empty cd <- liftIO $ readIORef $ connData conn case cd of #ifndef ghcjs_HOST_OS Just Self -> return() Just (TLSNode2Node _ ) -> do checkCookie conn watchConnection conn node Just (Node2Node _ _ _) -> do checkCookie conn watchConnection conn node #endif _ -> watchConnection conn node return conn #ifndef ghcjs_HOST_OS checkCookie conn= do cookie <- liftIO $ readIORef rcookie mynode <- getMyNode sendRaw conn $ "CLOS " <> cookie <> --" b \r\nField: value\r\n\r\n" -- TODO put it standard: Set-Cookie:... " b \r\nHost: " <> BS.pack (nodeHost mynode) <> "\r\nPort: " <> BS.pack (show $ nodePort mynode) <> "\r\n\r\n" r <- liftIO $ readFrom conn case r of "OK" -> return () _ -> do let Connection{connData=rcdata}= conn cdata <- liftIO $ readIORef rcdata case cdata of Just(Node2Node _ s _) -> liftIO $ NS.close s -- since the HTTP firewall closes the connection Just(TLSNode2Node c) -> liftIO $ tlsClose c empty #endif watchConnection conn node= do liftIO $ atomicModifyIORef connectionList $ \m -> (conn:m,()) parseContext <- gets parseContext -- getSData <|> error "NO PARSE CONTEXT" :: TransIO ParseContext chs <- liftIO $ newIORef M.empty --whls <- liftIO $ newIORef [] let conn'= conn{closChildren= chs} --, wormholes= whls} -- liftIO $ modifyMVar_ (fromJust pool) $ \plist -> do -- if not (null plist) then print "DUPLICATE" else return () -- return $ conn':plist -- !> (node,"ADDED TO POOL") -- tell listenResponses to watch incoming responses putMailbox ((conn',parseContext,node) :: (Connection,ParseContext,Node)) liftIO $ threadDelay 100000 -- give time to initialize listenResponses #ifndef ghcjs_HOST_OS close1 sock= do NS.setSocketOption sock NS.Linger 0 NS.close sock connectTo' bufSize hostname (PortNumber port) = do proto <- BSD.getProtocolNumber "tcp" bracketOnError (NS.socket NS.AF_INET NS.Stream proto) (NS.close) -- only done if there's an error (\sock -> do NS.setSocketOption sock NS.RecvBuffer bufSize NS.setSocketOption sock NS.SendBuffer bufSize -- NS.setSocketOption sock NS.SendTimeOut 1000000 !> ("CONNECT",port) he <- BSD.getHostByName hostname NS.connect sock (NS.SockAddrInet port (BSD.hostAddress he)) return sock) #else connectToWS h (PortNumber p) = do protocol <- liftIO $ fromJSValUnchecked js_protocol pathname <- liftIO $ fromJSValUnchecked js_pathname tr ("PAHT",pathname) let ps = case (protocol :: JS.JSString)of "http:" -> "ws://"; "https:" -> "wss://" wsOpen $ JS.pack $ ps++ h++ ":"++ show p ++ pathname #endif -- last usage+ blocking semantics for sending type Blocked= MVar (Maybe Integer) type BuffSize = Int data ConnectionData= #ifndef ghcjs_HOST_OS Node2Node{port :: PortID ,socket ::Socket ,sockAddr :: NS.SockAddr } | TLSNode2Node{tlscontext :: SData} | HTTPS2Node{tlscontext :: SData} | Node2Web{webSocket :: WS.Connection} | HTTP2Node{port :: PortID ,socket ::Socket ,sockAddr :: NS.SockAddr} | Self #else Self | Web2Node{webSocket :: WebSocket} #endif -- deriving (Eq,Ord) data Connection= Connection{idConn :: Int ,myNode :: IORef Node ,remoteNode :: IORef (Maybe Node) ,connData :: IORef (Maybe ConnectionData) ,istream :: IORef ParseContext ,bufferSize :: BuffSize -- multiple wormhole/teleport use the same connection concurrently ,isBlocked :: Blocked ,calling :: Bool ,synchronous :: Bool -- local localClosures with his continuation and a blocking MVar -- another MVar with the children created by the closure -- also has the id of the remote closure connected with ,localClosures :: MVar (M.Map IdClosure (MVar[EventF],IdClosure, MVar (),EventF)) -- for each remote closure that points to local closure 0, -- a new container of child processes -- in order to treat them separately -- so that 'killChilds' do not kill unrelated processes -- used by `single` and `unique` ,closChildren :: IORef (M.Map Int EventF)} deriving Typeable connectionList :: IORef [Connection] connectionList= unsafePerformIO $ newIORef [] defConnection :: TransIO Connection noParseContext= let err= error "parseContext not set" in ParseContext err err err -- #ifndef ghcjs_HOST_OS defConnection = do idc <- genGlobalId liftIO $ do my <- createNode "localhost" 0 >>= newIORef x <- newMVar Nothing y <- newMVar M.empty ref <- newIORef Nothing z <- newIORef M.empty noconn <- newIORef Nothing np <- newIORef noParseContext -- whls <- newIORef [] return $ Connection idc my ref noconn np 8192 --(error "defConnection: accessing network events out of listen") x False False y z -- whls #ifndef ghcjs_HOST_OS setBuffSize :: Int -> TransIO () setBuffSize size= do conn<- getData `onNothing` (defConnection !> "DEFF3") setData $ conn{bufferSize= size} getBuffSize= (do getSData >>= return . bufferSize) <|> return 8192 -- | Setup the node to start listening for incoming connections. -- listen :: Node -> Cloud () listen (node@(Node _ port _ _ )) = onAll $ do labelState "listen" {- st <- get onException $ \(e :: SomeException) -> do case fromException e of Just (CloudException _ _ _) -> return() _ -> do cutExceptions liftIO $ print "EXCEPTION: KILLING" topState >>= showThreads -- liftIO $ killBranch' st -- Closure closRemote <- getData `onNothing` error "teleport: no closRemote" -- conn <- getData `onNothing` error "reportBack: No connection defined: use wormhole" -- liftIO $ putStrLn "Closing connection" -- mclose conn -- msend conn $ SError $ toException $ ErrorCall $ show $ show $ CloudException node closRemote $ show e empty -} -- ex <- exceptionPoint :: TransIO (BackPoint SomeException) -- setData ex onException $ \(ConnectionError msg node) -> empty --addThreads 2 fork connectionTimeouts fork loopClosures setData $ Log{recover=False, buildLog= mempty, fulLog= mempty, lengthFull= 0, hashClosure= 0} conn' <- getSData <|> defConnection chs <- liftIO $ newIORef M.empty cdata <- liftIO $ newIORef $ Just Self let conn= conn'{connData=cdata,closChildren=chs} pool <- liftIO $ newMVar [conn] let node'= node{connection=Just pool} liftIO $ writeIORef (myNode conn) node' setData conn liftIO $ modifyMVar_ (fromJust $ connection node') $ const $ return [conn] addNodes [node'] setRState(JobGroup M.empty) --used by resetRemote ex <- exceptionPoint :: TransIO (BackPoint SomeException) setData ex mlog <- listenNew (fromIntegral port) conn <|> listenResponses :: TransIO (StreamData NodeMSG) execLog mlog --showNext "after listen" 10 tr "END LISTEN" -- listen incoming requests listenNew port conn'= do sock <- liftIO $ listenOn $ PortNumber port liftIO $ do let bufSize= bufferSize conn' NS.setSocketOption sock NS.RecvBuffer bufSize NS.setSocketOption sock NS.SendBuffer bufSize -- wait for connections. One thread per connection liftIO $ do putStr "Connected to port: "; print port (sock,addr) <- waitEvents $ NS.accept sock chs <- liftIO $ newIORef M.empty -- case addr of -- NS.SockAddrInet port host -> liftIO $ print("connection from", port, host) -- NS.SockAddrInet6 a b c d -> liftIO $ print("connection from", a, b,c,d) noNode <- liftIO $ newIORef Nothing id1 <- genGlobalId let conn= conn'{idConn=id1,closChildren=chs, remoteNode= noNode} --liftIO $ atomicModifyIORef connectionList $ \m -> (conn: m,()) -- TODO input <- liftIO $ SBSL.getContents sock --tr "SOME INPUT" -- cutExceptions -- onException $ \(e :: IOException) -> -- when (ioeGetLocation e=="Network.Socket.recvBuf") $ do -- liftIO $ putStr "listen: " >> print e -- let Connection{remoteNode=rnode,localClosures=localClosures,closChildren= rmap} = conn -- mnode <- liftIO $ readIORef rnode -- case mnode of -- Nothing -> return () -- Just node -> do -- liftIO $ putStr "removing1 node: " >> print node -- nodes <- getNodes -- setNodes $ nodes \\ [node] -- liftIO $ do -- modifyMVar_ localClosures $ const $ return M.empty -- writeIORef rmap M.empty -- -- topState >>= showThreads -- killBranch let nod = unsafePerformIO $ liftIO $ createNode "incoming connection" 0 in modify $ \s -> s{execMode=Serial,parseContext= (ParseContext (liftIO $ NS.close sock >> throw (ConnectionError "connection closed" nod)) input (unsafePerformIO $ newIORef False) ::ParseContext )} cdata <- liftIO $ newIORef $ Just (Node2Node (PortNumber port) sock addr) let conn'= conn{connData=cdata} setState conn' liftIO $ atomicModifyIORef connectionList $ \m -> (conn': m,()) -- TODO maybeTLSServerHandshake sock input -- tr "AFTER HANDSHAKE" firstLine@(method, uri, vers) <- getFirstLine headers <- getHeaders setState $ HTTPHeaders firstLine headers -- tr ("HEADERS", headers) -- string "\r\n\r\n" -- tr (method, uri,vers) case (method, uri) of ("CLOS", hisCookie) -> do conn <- getSData tr "CONNECTING" let host = BC.unpack $ fromMaybe (error "no host in header")$ lookup "Host" headers port = read $ BC.unpack $ fromMaybe (error "no port in header")$ lookup "Port" headers remNode' <- liftIO $ createNode host port rc <- liftIO $ newMVar [conn] let remNode= remNode'{connection= Just rc} liftIO $ writeIORef (remoteNode conn) $ Just remNode tr ("ADDED NODE", remNode) addNodes [remNode] myCookie <- liftIO $ readIORef rcookie if BS.toStrict myCookie /= hisCookie then do sendRaw conn "NOK" mclose conn error "connection attempt with bad cookie" else do sendRaw conn "OK" -- !> "CLOS detected" --async (return (SMore $ ClosureData 0 0[Exec])) <|> mread conn mread conn _ -> do -- it is a HTTP request -- process the current request in his own thread and then (<|>) any other request that arrive in the same connection cutBody method headers <|> many' cutHTTPRequest HTTPHeaders (method,uri,vers) headers <- getState <|> error "HTTP: no headers?" let uri'= BC.tail $ uriPath uri !> uriPath uri tr ("uri'", uri') case BC.span (/= '/') uri' of ("api",_) -> do -- if "api" `BC.isPrefixOf` uri' --then do --log <- return $ Exec:Exec: (Var $ IDyns $ up method):(map (Var . IDyns ) $ split $ BC.unpack $ BC.drop 4 uri') let log= exec <> lazyByteString method <> byteString "/" <> byteString (BC.drop 4 uri') maybeSetHost headers tr ("HEADERS", headers) str <- giveParseString <|> error "no api data" if lookup "Transfer-Encoding" headers == Just "chunked" then error $ "chunked not supported" else do len <- (read <$> BC.unpack <$> (Transient $ return (lookup "Content-Length" headers))) <|> return 0 log' <- case lookup "Content-Type" headers of Just "application/json" -> do let toDecode= BS.take len str -- tr ("TO DECODE", log <> lazyByteString toDecode) setParseString $ BS.take len str return $ log <> "/" <> lazyByteString toDecode -- [(Var $ IDynamic json)] -- TODO hande this serialization Just "application/x-www-form-urlencoded" -> do tr ("POST HEADERS=", BS.take len str) setParseString $ BS.take len str --postParams <- parsePostUrlEncoded <|> return [] return $ log <> lazyByteString ( BS.take len str) -- [(Var . IDynamic $ postParams)] TODO: solve deserialization Just x -> do tr ("POST HEADERS=", BS.take len str) let str= BS.take len str return $ log <> lazyByteString str -- ++ [Var $ IDynamic str] _ -> return $ log setParseString $ toLazyByteString log' return $ SMore $ ClosureData 0 0 log' !> ("APIIIII", log') --else if "relay" `BC.isPrefixOf` uri' then proxy sock method vers uri' ("relay",_) -> proxy sock method vers uri' (h,rest) -> do if BC.null rest || h== "file" then do --headers <- getHeaders maybeSetHost headers let uri= if BC.null h || BC.null rest then uri' else BC.tail rest tr (method,uri) -- stay serving pages until a websocket request is received servePages (method, uri, headers) -- when servePages finish, is because a websocket request has arrived conn <- getSData sconn <- makeWebsocketConnection conn uri headers -- websockets mode -- para qué reiniciarlo todo???? rem <- liftIO $ newIORef Nothing chs <- liftIO $ newIORef M.empty cls <- liftIO $ newMVar M.empty cme <- liftIO $ newIORef M.empty cdata <- liftIO $ newIORef $ Just (Node2Web sconn) let conn'= conn{connData= cdata , closChildren=chs,localClosures=cls, remoteNode=rem} --,comEvent=cme} setState conn' !> "WEBSOCKETS CONNECTION" co <- liftIO $ readIORef rcookie let receivedCookie= lookup "cookie" headers tr ("cookie", receivedCookie) rcookie <- case receivedCookie of Just str-> Just <$> do withParseString (BS.fromStrict str) $ do tDropUntilToken "cookie=" tTakeWhile (not . isSpace) Nothing -> return Nothing tr ("RCOOKIE", rcookie) if rcookie /= Nothing && rcookie /= Just co then do node <- getMyNode --let msg= SError $ toException $ ErrorCall $ show $ show $ CloudException node 0 $ show $ ConnectionError "bad cookie" node tr "SENDINg" liftIO $ WS.sendClose sconn ("Bad Cookie" :: BS.ByteString) !> "SendClose Bad cookie" empty else do liftIO $ WS.sendTextData sconn ("OK" :: BS.ByteString) -- a void message is sent to the application signaling the beginning of a connection -- async (return (SMore $ ClosureData 0 0[Exec])) <|> do tr "WEBSOCKET" -- onException $ \(e :: SomeException) -> do -- liftIO $ putStr "listen websocket:" >> print e -- -- liftIO $ mclose conn' -- -- killBranch -- -- empty s <- waitEvents $ receiveData' conn' sconn :: TransIO BS.ByteString setParseString s tr ("WEBSOCKET RECEIVED <-----------",s) -- integer deserialize -- a void message is sent to the application signaling the beginning of a connection <|> (return $ SMore (ClosureData 0 0 (exec <> lazyByteString s))) else do let uriparsed= BS.pack $ unEscapeString $ BC.unpack uri' setParseString uriparsed !> ("uriparsed",uriparsed) remoteClosure <- deserialize :: TransIO Int tChar '/' thisClosure <- deserialize :: TransIO Int tChar '/' --cdata <- liftIO $ newIORef $ Just (HTTP2Node (PortNumber port) sock addr) conn <- getSData liftIO $ atomicModifyIORef' (connData conn) $ \cdata -> case cdata of Just(Node2Node port sock addr) -> (Just $ HTTP2Node port sock addr,()) Just(TLSNode2Node ctx) -> (Just $ HTTPS2Node ctx,()) --setState conn{connData=cdata} s <- giveParseString cook <- liftIO $ readIORef rcookie liftIO $ SBSL.sendAll sock $ "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n\r\n" return $ SMore $ ClosureData remoteClosure thisClosure $ lazyByteString s where cutHTTPRequest = do first@(method,_,_) <- getFirstLine -- tr ("after getfirstLine", method, uri, vers) headers <- getHeaders setState $ HTTPHeaders first headers cutBody method headers cutBody method headers= do if method == "POST" then case fmap (read . BC.unpack) $ lookup "Content-Length" headers of Nothing -> return () -- most likely chunked encoding Just len -> do str <- tTake (fromIntegral len) abduce setParseString str else abduce uriPath = BC.dropWhile (/= '/') split []= [] split ('/':r)= split r split s= let (h,t) = Prelude.span (/= '/') s in h: split t -- reverse proxy for urls that look like http://host:port/relay/otherhost/otherport/ proxy sclient method vers uri' = do let (host:port:_)= split $ BC.unpack $ BC.drop 6 uri' tr ("RELAY TO",host, port) --liftIO $ threadDelay 1000000 sserver <- liftIO $ connectTo' 4096 host $ PortNumber $ fromIntegral $ read port tr "CONNECTED" rawHeaders <- getRawHeaders tr ("RAWHEADERS",rawHeaders) let uri= BS.fromStrict $ let d x= BC.tail $ BC.dropWhile (/= '/') x in d . d $ d uri' let sent= method <> BS.pack " /" <> uri <> BS.cons ' ' vers <> BS.pack "\r\n" <> rawHeaders <> BS.pack "\r\n\r\n" tr ("SENT",sent) liftIO $ SBSL.send sserver sent -- Connection{connData=Just (Node2Node _ sclient _)} <- getState <|> error "proxy: no connection" (send sclient sserver <|> send sserver sclient) `catcht` \(e:: SomeException ) -> liftIO $ do putStr "Proxy: " >> print e NS.close sserver NS.close sclient empty empty where send f t= async $ mapData f t mapData from to = do content <- recv from 4096 tr (" proxy received ", content) if not $ BC.null content then sendAll to content >> mapData from to else finish where finish= NS.close from >> NS.close to -- throw $ Finish "finish" maybeSetHost headers= do setHost <- liftIO $ readIORef rsetHost when setHost $ do mnode <- liftIO $ do let mhost= lookup "Host" headers case mhost of Nothing -> return Nothing Just host -> atomically $ do -- set the first node (local node) as is called from outside nodes <- readTVar nodeList let (host1,port)= BC.span (/= ':') host hostnode= (Prelude.head nodes){nodeHost= BC.unpack host1 ,nodePort= if BC.null port then 80 else read $ BC.unpack $ BC.tail port} writeTVar nodeList $ hostnode : Prelude.tail nodes return $ Just hostnode -- !> (host1,port) when (isJust mnode) $ do conn <- getState liftIO $ writeIORef (myNode conn) $fromJust mnode liftIO $ writeIORef rsetHost False -- !> "HOSt SET" {-#NOINLINE rsetHost #-} rsetHost= unsafePerformIO $ newIORef True --instance Read PortNumber where -- readsPrec n str= let [(n,s)]= readsPrec n str in [(fromIntegral n,s)] --deriving instance Read PortID --deriving instance Typeable PortID -- | filter out HTTP requests noHTTP= onAll $ do conn <- getState cdata <- liftIO $ readIORef $ connData conn case cdata of Just (HTTPS2Node ctx) -> do liftIO $ sendTLSData ctx $ "HTTP/1.1 403 Forbidden\r\nConnection: close\r\nContent-Length: 11\r\n\r\nForbidden\r\n" liftIO $ tlsClose ctx Just (HTTP2Node _ sock _) -> do liftIO $ SBSL.sendAll sock $ "HTTP/1.1 403 Forbidden\r\nConnection: close\r\nContent-Length: 11\r\n\r\nForbidden\r\n" liftIO $ NS.close sock empty _ -> return () {- -- filter out WebSockets connections(usually coming from a web node) noWebSockets= onAll $ do conn <- getState cdata <- liftIO $ readIORef $ connData conn case cdata of Just (Web2Node _) -> empty _ -> return() -} #endif listenResponses :: Loggable a => TransIO (StreamData a) listenResponses= do labelState "listen responses" (conn, parsecontext, node) <- getMailbox :: TransIO (Connection,ParseContext,Node) labelState . fromString $ "listen from: "++ show node setData conn tr ("CONNECTION RECEIVED","listen from: "++ show node) modify $ \s-> s{execMode=Serial,parseContext = parsecontext} -- cutExceptions -- onException $ \(e:: SomeException) -> do -- liftIO $ putStr "ListenResponses: " >> print e -- liftIO $ putStr "removing node: " >> print node -- nodes <- getNodes -- setNodes $ nodes \\ [node] -- -- topState >>= showThreads -- killChilds -- let Connection{localClosures=localClosures}= conn -- liftIO $ modifyMVar_ localClosures $ const $ return M.empty -- empty mread conn type IdClosure= Int -- The remote closure ids for each node connection newtype Closure= Closure IdClosure deriving (Read,Show,Typeable) type RemoteClosure= (Node, IdClosure) newtype JobGroup= JobGroup (M.Map BC.ByteString RemoteClosure) deriving Typeable -- | if there is a remote job identified by th string identifier, it stop that job, and set the -- current remote operation (if any) as the current remote job for this identifier. -- The purpose is to have a single remote job. -- to identify the remote job, it should be used after the `wormhole` and before the remote call: -- -- > r <- wormhole node $ do -- > stopRemoteJob "streamlog" -- > atRemote myRemotejob -- -- So: -- -- > runAtUnique ident node job= wormhole node $ do stopRemoteJob ident; atRemote job -- This program receive a stream of "hello" from a second node when the option "hello" is entered in the keyboard -- If you enter "world", the "hello" stream from the second node -- will stop and will print an stream of "world" from a third node: -- Entering "hello" again will stream "hello" again from the second node and so on: -- > main= keep $ initNode $ inputNodes <|> do -- > -- > local $ option "init" "init" -- > nodes <- local getNodes -- > r <- proc (nodes !! 1) "hello" <|> proc (nodes !! 2) "world" -- > localIO $ print r -- > return () -- > -- > proc node par = do -- > v <- local $ option par par -- > runAtUnique "name" node $ local $ do -- > abduce -- > r <- threads 0 $ choose $ repeat v -- > liftIO $ threadDelay 1000000 -- > return r -- the nodes could be started from the command line as such in different terminals: -- > program -p start/localhost/8000 -- > program -p start/localhost/8002 -- > program -p start/localhost/8001/add/localhost/8000/n/add/localhost/8002/n/init -- The third is the one wich has the other two connected and can execute the two options. stopRemoteJob :: BC.ByteString -> Cloud () instance Loggable Closure stopRemoteJob ident = do resetRemote ident Closure closr <- local $ getData `onNothing` error "stopRemoteJob: Connection not set, use wormhole" tr ("CLOSRRRRRRRR", closr) fixClosure local $ do Closure closr <- getData `onNothing` error "stopRemoteJob: Connection not set, use wormhole" conn <- getData `onNothing` error "stopRemoteJob: Connection not set, use wormhole" remote <- liftIO $ readIORef $ remoteNode conn return (remote,closr) !> ("REMOTE",remote) JobGroup map <- getRState <|> return (JobGroup M.empty) setRState $ JobGroup $ M.insert ident (fromJust remote,closr) map -- |kill the remote job. Usually, before starting a new one. resetRemote :: BC.ByteString -> Cloud () resetRemote ident = do mj <- local $ do JobGroup map <- getRState <|> return (JobGroup M.empty) return $ M.lookup ident map when (isJust mj) $ do let (remote,closr)= fromJust mj --do -- when (closr /= 0) $ do runAt remote $ local $ do conn@Connection {localClosures=localClosures} <- getData `onNothing` error "Listen: myNode not set" mcont <- liftIO $ modifyMVar localClosures $ \map -> return ( M.delete closr map, M.lookup closr map) case mcont of Nothing -> error $ "closure not found: " ++ show closr Just (_,_,_,cont) -> do topState >>= showThreads liftIO $ killBranch' cont return () execLog :: StreamData NodeMSG -> TransIO () execLog mlog =Transient $ do tr "EXECLOG" case mlog of SError e -> do return() !> ("SERROR",e) case fromException e of Just (ErrorCall str) -> do case read str of (e@(CloudException _ closl err)) -> do process closl (error "closr: should not be used") (Left e) True SDone -> runTrans(back $ ErrorCall "SDone") >> return Nothing -- TODO remove closure? SMore (ClosureData closl closr log) -> process closl closr (Right log) False SLast (ClosureData closl closr log) -> process closl closr (Right log) True where process :: IdClosure -> IdClosure -> (Either CloudException Builder) -> Bool -> StateIO (Maybe ()) process closl closr mlog deleteClosure= do conn@Connection {localClosures=localClosures} <- getData `onNothing` error "Listen: myNode not set" if closl== 0 then do case mlog of Left except -> do setData emptyLog tr "Exception received from network 1" runTrans $ throwt except empty Right log -> do tr ("CLOSURE 0",log) setData Log{recover= True, buildLog= mempty, fulLog= log, lengthFull= 0, hashClosure= 0} --Log True [] [] setState $ Closure closr setRState $ DialogInWormholeInitiated True -- setParseString $ toLazyByteString log -- not needed it is has the log from the request, still not parsed return $ Just () -- !> "executing top level closure" else do mcont <- liftIO $ modifyMVar localClosures $ \map -> return (if deleteClosure then M.delete closl map else map, M.lookup closl map) -- !> ("localClosures=", M.size map) case mcont of Nothing -> do node <- liftIO $ readIORef (remoteNode conn) `onNothing` error "mconnect: no remote node?" let e = "request received for non existent closure. Perhaps the connection was closed by timeout and reopened" let err= CloudException node closl $ show e throw err -- execute the closure Just (chs,closLocal, mv,cont) -> do when deleteClosure $ do -- liftIO $ killChildren chs empty -- last message received liftIO $ tryPutMVar mv () void $ liftIO $ runStateT (case mlog of Right log -> do -- Log _ _ fulLog hashClosure <- getData `onNothing` return (Log True [] [] 0) Log{fulLog=fulLog, hashClosure=hashClosure} <- getLog -- return() !> ("fullog in execlog", reverse fulLog) let nlog= fulLog <> log -- let nlog= reverse log ++ fulLog setData $ Log{recover= True, buildLog= mempty, fulLog= nlog, lengthFull=error "lengthFull TODO", hashClosure= hashClosure} -- TODO hashClosure must change? setState $ Closure closr setRState $ DialogInWormholeInitiated True setParseString $ toLazyByteString log --restrs <- giveParseString --tr ("rs' in execlog =", fmap (BS.take 4) restrs) runContinuation cont () Left except -> do setData emptyLog tr ("Exception received from the network", except) runTrans $ throwt except) cont return Nothing #ifdef ghcjs_HOST_OS listen node = onAll $ do addNodes [node] setRState(JobGroup M.empty) -- ex <- exceptionPoint :: TransIO (BackPoint SomeException) -- setData ex events <- liftIO $ newIORef M.empty rnode <- liftIO $ newIORef node conn <- defConnection >>= \c -> return c{myNode=rnode} -- ,comEvent=events} liftIO $ atomicModifyIORef connectionList $ \m -> (conn: m,()) setData conn r <- listenResponses execLog r #endif type Pool= [Connection] type SKey= String type SValue= String type Service= [(SKey, SValue)] lookup2 key doubleList= let r= mapMaybe(lookup key ) doubleList in if null r then Nothing else Just $ head r filter2 key doubleList= mapMaybe(lookup key ) doubleList -------------------------------------------- #ifndef ghcjs_HOST_OS -- maybeRead line= unsafePerformIO $ do -- let [(v,left)] = reads line ---- print v -- (v `seq` return [(v,left)]) -- `catch` (\(e::SomeException) -> do -- liftIO $ print $ "******readStream ERROR in: "++take 100 line -- maybeRead left) {- readFrom Connection{connData= Just(TLSNode2Node ctx)}= recvTLSData ctx readFrom Connection{connData= Just(Node2Node _ sock _)} = toStrict <$> loop readFrom _ = error "readFrom error" -} readFrom con= do cd <- readIORef $ connData con case cd of Just(TLSNode2Node ctx) -> recvTLSData ctx Just(Node2Node _ sock _) -> BS.toStrict <$> loop sock _ -> error "readFrom error" where bufSize= 4098 loop sock = loop1 where loop1 :: IO BL.ByteString loop1 = unsafeInterleaveIO $ do s <- SBS.recv sock bufSize if BC.length s < bufSize then return $ BLC.Chunk s mempty else BLC.Chunk s `liftM` loop1 -- toStrict= B.concat . BS.toChunks makeWSStreamFromConn conn= do tr "WEBSOCKETS request" let rec= readFrom conn send= sendRaw conn makeStream (do bs <- rec -- SBS.recv sock 4098 return $ if BC.null bs then Nothing else Just bs) (\mbBl -> case mbBl of Nothing -> return () Just bl -> send bl) -- SBS.sendMany sock (BL.toChunks bl) >> return()) -- !!> show ("SOCK RESP",bl) makeWebsocketConnection conn uri headers= liftIO $ do stream <- makeWSStreamFromConn conn let pc = WS.PendingConnection { WS.pendingOptions = WS.defaultConnectionOptions -- {connectionOnPong=xxx} , WS.pendingRequest = NWS.RequestHead uri headers False -- RequestHead (BC.pack $ show uri) -- (map parseh headers) False , WS.pendingOnAccept = \_ -> return () , WS.pendingStream = stream } sconn <- WS.acceptRequest pc -- !!> "accept request" WS.forkPingThread sconn 30 return sconn servePages (method,uri, headers) = do -- return () !> ("HTTP request",method,uri, headers) conn <- getSData <|> error " servePageMode: no connection" if isWebSocketsReq headers then return () else do let file= if BC.null uri then "index.html" else uri {- TODO rendering in server NEEDED: recodify View to use blaze-html in server. wlink to get path in server does file exist? if exist, send else do store path, execute continuation get the rendering send trough HTTP - put this logic as independent alternative programmer options serveFile dirs <|> serveApi apis <|> serveNode nodeCode -} mcontent <- liftIO $ (Just <$> BL.readFile ( "./static/out.jsexe/"++ BC.unpack file) ) `catch` (\(e:: SomeException) -> return Nothing) -- return "Not found file: index.html
please compile with ghcjs
ghcjs program.hs -o static/out") case mcontent of Just content -> do cook <- liftIO $ readIORef rcookie liftIO $ sendRaw conn $ "HTTP/1.0 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\nContent-Length: " <> BS.pack (show $ BL.length content) <>"\r\n" <> "Set-Cookie:" <> "cookie=" <> cook -- <> "\r\n" <>"\r\n\r\n" <> content Nothing ->liftIO $ sendRaw conn $ BS.pack $ "HTTP/1.0 404 Not Found\nContent-Length: 13\r\nConnection: close\r\n\r\nNot Found 404" empty -- | forward all the result of the Transient computation to the opened connection api :: TransIO BS.ByteString -> Cloud () api w= Cloud $ do log <- getLog if not $ recover log then empty else do HTTPHeaders (_,_,vers) hdrs <- getState <|> error "api: no HTTP headers???" let closeit= lookup "Connection" hdrs == Just "close" conn <- getState <|> error "api: Need a connection opened with initNode, listen, simpleWebApp" let send= sendRaw conn r <- w tr ("response",r) send r tr (vers, hdrs) when (vers == http10 || BS.isPrefixOf http10 r || lookup "Connection" hdrs == Just "close" || closeInResponse r) $ liftIO $ mclose conn where closeInResponse r= let rest= findSubstring "Connection:" r rest' = BS.dropWhile (==' ') rest in if BS.isPrefixOf "close" rest' then True else False where findSubstring sub str | BS.null str = str | BS.isPrefixOf sub str = BS.drop (BS.length sub) str | otherwise= findSubstring sub (BS.tail str) http10= "HTTP/1.0" isWebSocketsReq = not . null . filter ( (== mk "Sec-WebSocket-Key") . fst) data HTTPMethod= GET | POST deriving (Read,Show,Typeable,Eq) instance Loggable HTTPMethod getFirstLine= (,,) <$> getMethod <*> (BS.toStrict <$> getUri) <*> getVers where getMethod= parseString getUri= parseString getVers= parseString getRawHeaders= dropSpaces >> (withGetParseString $ \s -> return $ scan mempty s) where scan res str | "\r\n\r\n" `BS.isPrefixOf` str = (res, BS.drop 4 str) | otherwise= scan ( BS.snoc res $ BS.head str) $ BS.tail str -- line= do -- dropSpaces -- tTakeWhile (not . endline) type PostParams = [(BS.ByteString, String)] parsePostUrlEncoded :: TransIO PostParams parsePostUrlEncoded= do dropSpaces many $ (,) <$> param <*> value where param= tTakeWhile' ( /= '=') !> "param" value= unEscapeString <$> BS.unpack <$> tTakeWhile' (/= '&' ) getHeaders = manyTill paramPair (string "\r\n\r\n") -- !> (method, uri, vers) where paramPair= (,) <$> (mk <$> getParam) <*> getParamValue getParam= do dropSpaces r <- tTakeWhile (\x -> x /= ':' && not (endline x)) if BS.null r || r=="\r" then empty else anyChar >> return (BS.toStrict r) where endline c= c== '\r' || c =='\n' getParamValue= BS.toStrict <$> ( dropSpaces >> tTakeWhile (\x -> not (endline x))) where endline c= c== '\r' || c =='\n' #endif #ifdef ghcjs_HOST_OS isBrowserInstance= True api _= empty #else -- | Returns 'True' if we are running in the browser. isBrowserInstance= False #endif {-# NOINLINE emptyPool #-} emptyPool :: MonadIO m => m (MVar Pool) emptyPool= liftIO $ newMVar [] -- | Create a node from a hostname (or IP address), port number and a list of -- services. createNodeServ :: HostName -> Int -> [Service] -> IO Node createNodeServ h p svs= return $ Node h p Nothing svs createNode :: HostName -> Int -> IO Node createNode h p= createNodeServ h p [] createWebNode :: IO Node createWebNode= do pool <- emptyPool port <- randomIO return $ Node "webnode" port (Just pool) [[("webnode","")]] instance Eq Node where Node h p _ _ ==Node h' p' _ _= h==h' && p==p' instance Show Node where show (Node h p _ servs )= show (h,p, servs) instance Read Node where readsPrec n s= let r= readsPrec n s in case r of [] -> [] [((h,p,ss),s')] -> [(Node h p Nothing ss ,s')] nodeList :: TVar [Node] nodeList = unsafePerformIO $ newTVarIO [] deriving instance Ord PortID --myNode :: Int -> DBRef MyNode --myNode= getDBRef $ key $ MyNode undefined errorMyNode f= error $ f ++ ": Node not set. initialize it with connect, listen, initNode..." -- | Return the local node i.e. the node where this computation is running. getMyNode :: TransIO Node getMyNode = do Connection{myNode= node} <- getSData <|> errorMyNode "getMyNode" :: TransIO Connection liftIO $ readIORef node -- | empty if the node is not set getMyNodeMaybe= do Connection{myNode= node} <- getSData liftIO $ readIORef node -- | Return the list of nodes in the cluster. getNodes :: MonadIO m => m [Node] getNodes = liftIO $ atomically $ readTVar nodeList -- | get the nodes that have the same service definition that the calling node getEqualNodes = do nodes <- getNodes let srv= nodeServices $ Prelude.head nodes case srv of [] -> return $ filter (null . nodeServices) nodes (srv:_) -> return $ filter (\n -> (not $ null $ nodeServices n) && Prelude.head (nodeServices n) == srv ) nodes getWebNodes :: MonadIO m => m [Node] getWebNodes = do nodes <- getNodes return $ filter ( (==) "webnode" . nodeHost) nodes matchNodes f = do nodes <- getNodes return $ Prelude.map (\n -> filter f $ nodeServices n) nodes -- | Add a list of nodes to the list of existing nodes know locally. -- If the node is already present, It add his services to the already present node -- services which have the first element equal (usually the "name" field) will be substituted if the match addNodes :: [Node] -> TransIO () addNodes nodes= liftIO $ do -- the local node should be the first nodes' <- mapM fixNode nodes atomically $ mapM_ insert nodes' where insert node= do prevnodes <- readTVar nodeList -- !> ("ADDNODES", nodes) let mn = filter(==node) prevnodes case mn of [] -> do tr "NUEVO NODO"; writeTVar nodeList $ (prevnodes) ++ [node] [n] ->do let nservices= nubBy (\s s' -> head s== head s') $ nodeServices node++ nodeServices n writeTVar nodeList $ ((prevnodes) \\ [node]) ++ [n{nodeServices=nservices}] _ -> error $ "duplicated node: " ++ show node --writeTVar nodeList $ (prevnodes \\ nodes') ++ nodes' delNodes nodes= liftIO $ atomically $ do nodes' <- readTVar nodeList writeTVar nodeList $ nodes' \\ nodes fixNode n= case connection n of Nothing -> do pool <- emptyPool return n{connection= Just pool} Just _ -> return n -- | set the list of nodes setNodes nodes= liftIO $ do nodes' <- mapM fixNode nodes atomically $ writeTVar nodeList nodes' -- | Shuffle the list of cluster nodes and return the shuffled list. shuffleNodes :: MonadIO m => m [Node] shuffleNodes= liftIO . atomically $ do nodes <- readTVar nodeList let nodes'= Prelude.tail nodes ++ [Prelude.head nodes] writeTVar nodeList nodes' return nodes' --getInterfaces :: TransIO TransIO HostName --getInterfaces= do -- host <- logged $ do -- ifs <- liftIO $ getNetworkInterfaces -- liftIO $ mapM_ (\(i,n) ->putStrLn $ show i ++ "\t"++ show (ipv4 n) ++ "\t"++name n)$ zip [0..] ifs -- liftIO $ putStrLn "Select one: " -- ind <- input ( < length ifs) -- return $ show . ipv4 $ ifs !! ind -- #ifndef ghcjs_HOST_OS --instance Read NS.SockAddr where -- readsPrec _ ('[':s)= -- let (s',r1)= span (/=']') s -- [(port,r)]= readsPrec 0 $ tail $ tail r1 -- in [(NS.SockAddrInet6 port 0 (IP.toHostAddress6 $ read s') 0, r)] -- readsPrec _ s= -- let (s',r1)= span(/= ':') s -- [(port,r)]= readsPrec 0 $ tail r1 -- in [(NS.SockAddrInet port (IP.toHostAddress $ read s'),r)] -- #endif -- | add this node to the list of know nodes in the remote node connected by a `wormhole`. -- This is useful when the node is called back by the remote node. -- In the case of web nodes with webSocket connections, this is the way to add it to the list of -- known nodes in the server. addThisNodeToRemote= do n <- local getMyNode atRemote $ local $ do n' <- setConnectionIn n addNodes [n'] setConnectionIn node=do conn <- getState <|> error "addThisNodeToRemote: connection not found" ref <- liftIO $ newMVar [conn] return node{connection=Just ref} -- | Add a node (first parameter) to the cluster using a node that is already -- part of the cluster (second parameter). The added node starts listening for -- incoming connections and the rest of the computation is executed on this -- newly added node. connect :: Node -> Node -> Cloud () #ifndef ghcjs_HOST_OS connect node remotenode = do listen node <|> return () connect' remotenode -- | Reconcile the list of nodes in the cluster using a remote node already -- part of the cluster. Reconciliation end up in each node in the cluster -- having the same list of nodes. connect' :: Node -> Cloud () connect' remotenode= loggedc $ do nodes <- local getNodes localIO $ putStr "connecting to: " >> print remotenode newNodes <- runAt remotenode $ interchange nodes --return () !> "interchange finish" -- add the new nodes to the local nodes in all the nodes connected previously let toAdd=remotenode:Prelude.tail newNodes callNodes' nodes (<>) mempty $ local $ do liftIO $ putStr "New nodes: " >> print toAdd !> "NEWNODES" addNodes toAdd where -- receive new nodes and send their own interchange nodes= do newNodes <- local $ do conn@Connection{remoteNode=rnode} <- getSData <|> error ("connect': need to be connected to a node: use wormhole/connect/listen") -- if is a websockets node, add only this node -- let newNodes = case cdata of -- Node2Web _ -> [(head nodes){nodeServices=[("relay",show remotenode)]}] -- _ -> nodes let newNodes= nodes -- map (\n -> n{nodeServices= nodeServices n ++ [("relay",show (remotenode,n))]}) nodes callingNode<- fixNode $ Prelude.head newNodes liftIO $ writeIORef rnode $ Just callingNode liftIO $ modifyMVar_ (fromJust $ connection callingNode) $ const $ return [conn] -- onException $ \(e :: SomeException) -> do -- liftIO $ putStr "connect:" >> print e -- liftIO $ putStrLn "removing node: " >> print callingNode -- -- topState >>= showThreads -- nodes <- getNodes -- setNodes $ nodes \\ [callingNode] return newNodes oldNodes <- local $ getNodes mclustered . local $ do liftIO $ putStrLn "New nodes: " >> print newNodes addNodes newNodes localIO $ atomically $ do -- set the first node (local node) as is called from outside -- tr "HOST2 set" nodes <- readTVar nodeList let nodes'= (Prelude.head nodes){nodeHost=nodeHost remotenode ,nodePort=nodePort remotenode}:Prelude.tail nodes writeTVar nodeList nodes' return oldNodes #else connect _ _= empty connect' _ = empty #endif #ifndef ghcjs_HOST_OS ------------------------------- HTTP client --------------- instance {-# Overlapping #-} Loggable Value where serialize= return . lazyByteString =<< encode deserialize = decodeIt where jsElem :: TransIO BS.ByteString -- just delimites the json string, do not parse it jsElem= dropSpaces >> (jsonObject <|> array <|> atom) atom= elemString array= (brackets $ return "[" <> return "{}" <> chainSepBy mappend (return "," <> jsElem) (tChar ',')) <> return "]" jsonObject= (braces $ return "{" <> chainMany mappend jsElem) <> return "}" elemString= do dropSpaces tTakeWhile (\c -> c /= '}' && c /= ']' ) decodeIt= do s <- jsElem tr ("decode",s) case eitherDecode s !> "DECODE" of Right x -> return x Left err -> empty data HTTPHeaders= HTTPHeaders (BS.ByteString, B.ByteString, BS.ByteString) [(CI BC.ByteString,BC.ByteString)] deriving Show rawHTTP :: Loggable a => Node -> String -> TransIO a rawHTTP node restmsg = do abduce -- is a parallel operation tr ("***********************rawHTTP",nodeHost node) --sock <- liftIO $ connectTo' 8192 (nodeHost node) (PortNumber $ fromIntegral $ nodePort node) mcon <- getData :: TransIO (Maybe Connection) c <- do c <- mconnect' node tr "after mconnect'" sendRawRecover c $ BS.pack restmsg c <- getState <|> error "rawHTTP: no connection?" let blocked= isBlocked c -- TODO: the same flag is used now for sending and receiving tr "before blocked" liftIO $ takeMVar blocked tr "after blocked" ctx <- liftIO $ readIORef $ istream c liftIO $ writeIORef (done ctx) False modify $ \s -> s{parseContext= ctx} -- actualize the parse context return c `while` \c ->do is <- isTLS c px <- getHTTProxyParams is tr ("PX=", px) (if isJust px then return True else do c <- anyChar ; tPutStr $ BS.singleton c; tr "anyChar"; return True) <|> do TOD t _ <- liftIO $ getClockTime -- ("PUTMVAR",nodeHost node) liftIO $ putMVar (isBlocked c) $ Just t liftIO (writeIORef (connData c) Nothing) mclose c tr "CONNECTION EXHAUSTED,RETRYING WITH A NEW CONNECTION" return False modify $ \s -> s{execMode=Serial} let blocked= isBlocked c -- TODO: the same flag is used now for sending and receiving tr "after send" --showNext "NEXT" 100 --try (do r <-tTake 10;liftIO $ print "NOTPARSED"; liftIO $ print r; empty) <|> return() first@(vers,code,_) <- getFirstLineResp <|> do r <- notParsed error $ "No HTTP header received:\n"++ up r tr ("FIRST line",first) headers <- getHeaders let hdrs= HTTPHeaders first headers setState hdrs --tr ("HEADERS", first, headers) guard (BC.head code== '2') <|> do Raw body <- parseBody headers error $ show (hdrs,body) -- decode the body and print result <- parseBody headers when (vers == http10 || -- BS.isPrefixOf http10 str || lookup "Connection" headers == Just "close" ) $ do TOD t _ <- liftIO $ getClockTime liftIO $ putMVar blocked $ Just t liftIO $ mclose c liftIO $ takeMVar blocked return() --tr ("result", result) --when (not $ null rest) $ error "THERE WERE SOME REST" ctx <- gets parseContext -- "SET PARSECONTEXT PREVIOUS" liftIO $ writeIORef (istream c) ctx TOD t _ <- liftIO $ getClockTime -- ("PUTMVAR",nodeHost node) liftIO $ putMVar blocked $ Just t if (isJust mcon) then setData (fromJust mcon) else delData c return result where isTLS c= liftIO $ do cdata <- readIORef $ connData c case cdata of Just(TLSNode2Node _) -> return True _ -> return False while act fix= do r <- act; b <- fix r; if b then return r else act parseBody headers= case lookup "Transfer-Encoding" headers of Just "chunked" -> dechunk |- deserialize _ -> case fmap (read . BC.unpack) $ lookup "Content-Length" headers of Just length -> do msg <- tTake length tr ("GOT", length) withParseString msg deserialize _ -> do str <- notParsed -- TODO: must be strict to avoid premature close BS.length str `seq` withParseString str deserialize getFirstLineResp= do -- showNext "getFirstLineResp" 20 (,,) <$> httpVers <*> (BS.toStrict <$> getCode) <*> getMessage where httpVers= tTakeUntil (BS.isPrefixOf "HTTP" ) >> parseString getCode= parseString getMessage= tTakeUntilToken ("\r\n") --con<- getState <|> error "rawHTTP: no connection?" --mclose con xxx --maybeClose vers headers c str dechunk= do n<- numChars if n== 0 then do string "\r\n"; return SDone else do r <- tTake $ fromIntegral n !> ("numChars",n) --tr ("message", r) trycrlf tr "SMORE1" return $ SMore r <|> return SDone !> "SDone in dechunk" where trycrlf= try (string "\r\n" >> return()) <|> return () numChars= do l <- hex ; tDrop 2 >> return l #endif -- | crawl the nodes executing the same action in each node and accumulate the results using a binary operator foldNet :: Loggable a => (Cloud a -> Cloud a -> Cloud a) -> Cloud a -> Cloud a -> Cloud a foldNet op init action = atServer $ do ref <- onAll $ liftIO $ newIORef Nothing -- eliminate additional results due to unneded parallelism when using (<|>) r <- exploreNetExclude [] v <-localIO $ atomicModifyIORef ref $ \v -> (Just r, v) case v of Nothing -> return r Just _ -> empty where exploreNetExclude nodesexclude = loggedc $ do local $ tr "EXPLORENETTTTTTTTTTTT" action `op` otherNodes where otherNodes= do node <- local getMyNode nodes <- local getNodes' tr ("NODES to explore",nodes) let nodesToExplore= Prelude.tail nodes \\ (node:nodesexclude) callNodes' nodesToExplore op init $ exploreNetExclude (union (node:nodesexclude) nodes) getNodes'= getEqualNodes -- if isBrowserInstance then return <$>getMyNode -- getEqualNodes -- else (++) <$> getEqualNodes <*> getWebNodes exploreNet :: (Loggable a,Monoid a) => Cloud a -> Cloud a exploreNet = foldNet mappend mempty exploreNetUntil :: (Loggable a) => Cloud a -> Cloud a exploreNetUntil = foldNet (<|>) empty -- | only execute if the the program is executing in the browser. The code inside can contain calls to the server. -- Otherwise return empty (so it stop the computation and may execute alternative computations). onBrowser :: Cloud a -> Cloud a onBrowser x= do r <- local $ return isBrowserInstance if r then x else empty -- | only executes the computaion if it is in the server, but the computation can call the browser. Otherwise return empty onServer :: Cloud a -> Cloud a onServer x= do r <- local $ return isBrowserInstance if not r then x else empty -- | If the computation is running in the server, translates i to the browser and return back. -- If it is already in the browser, just execute it atBrowser :: Loggable a => Cloud a -> Cloud a atBrowser x= do r <- local $ return isBrowserInstance if r then x else atRemote x -- | If the computation is running in the browser, translates i to the server and return back. -- If it is already in the server, just execute it atServer :: Loggable a => Cloud a -> Cloud a atServer x= do r <- local $ return isBrowserInstance tr ("AT SERVER",r) if not r then x else atRemote x ------------------- timeouts ----------------------- -- delete connections. -- delete receiving closures before sending closures delta= 60 -- 3*60 connectionTimeouts :: TransIO () connectionTimeouts= do labelState "loop connections" threads 0 $ waitEvents $ return () --loop liftIO $ threadDelay 10000000 -- tr "timeouts" TOD time _ <- liftIO $ getClockTime toClose <- liftIO $ atomicModifyIORef connectionList $ \ cons -> Data.List.partition (\con -> let mc= unsafePerformIO $ readMVar $ isBlocked con in isNothing mc || -- check that is not doing some IO ((time - fromJust mc) < delta) ) cons -- !> Data.List.length cons -- time etc are in a IORef forM_ toClose $ \c -> liftIO $ do tr "close " tr $ idConn c when (calling c) $ mclose c cleanConnectionData c -- websocket connections close everithing on timeout cleanConnectionData c= liftIO $ do -- reset the remote accessible closures modifyIORef globalFix $ \m -> M.insert (idConn c) (False,[]) m modifyMVar_ (localClosures c) $ const $ return M.empty modifyIORef globalFix $ \m -> M.insert (idConn c) (True,[]) m loopClosures= do labelState "loop closures" threads 0 $ do -- in the current thread waitEvents $ threadDelay 5000000 -- every 5 seconds nodes <- getNodes -- get the nodes known node <- choose $ tail nodes -- walk trough them, except my own node guard (isJust $ connection node) -- when a node has connections nc <- liftIO $ readMVar $ fromJust (connection node) -- get them conn <- choose nc -- and walk trough them lcs <- liftIO $ readMVar $ localClosures conn -- get the local endpoints of this node for that connection (closLocal,(mv,clos,_,cont)) <- choose $ M.toList lcs -- walk trough them chs <- liftIO $ readMVar $ children $ fromJust $ parent cont -- get the threads spawned by requests to this endpoint return() --return ("NUMBER=",length chs) guard (null chs) -- when there is no activity tr ("REMOVING", closLocal) liftIO $ modifyMVar (localClosures conn) $ \lcs -> return $ (M.delete closLocal lcs,()) -- remove the closure msend conn $ SLast $ ClosureData clos closLocal mempty -- notify the remote node -- tr ("THREADS ***************", length chs)