-----------------------------------------------------------------------------

--

-- Module      :  Transient.Move.Internals

-- Copyright   :

-- License     :  MIT

--

-- Maintainer  :  agocorona@gmail.com

-- Stability   :

-- Portability :

--

-- |

--

-----------------------------------------------------------------------------





{-# LANGUAGE DeriveDataTypeable , ExistentialQuantification, OverloadedStrings
    ,ScopedTypeVariables, StandaloneDeriving, RecordWildCards, FlexibleContexts, CPP
    ,GeneralizedNewtypeDeriving #-}

module Transient.Move.Internals where



import Transient.Internals

import Transient.Parse

import Transient.Logged 

import Transient.Indeterminism

--  import Transient.Backtrack

import Transient.EVars





import Data.Typeable

import Control.Applicative

import System.IO.Error



#ifndef ghcjs_HOST_OS

import Network

--- import Network.Info

import Network.URI

--import qualified Data.IP                              as IP

import qualified Network.Socket                         as NS

import qualified Network.BSD                            as BSD

import qualified Network.WebSockets                     as NWS -- S(RequestHead(..))



import qualified Network.WebSockets.Connection          as WS



import           Network.WebSockets.Stream hiding(parse)

import qualified Data.ByteString                        as B(ByteString,concat)

import qualified Data.ByteString.Char8 as BC

import qualified Data.ByteString.Lazy.Internal          as BLC

import qualified Data.ByteString.Lazy                   as BL

import qualified Data.ByteString.Lazy.Char8             as BS

import           Network.Socket.ByteString              as SBS(sendMany,sendAll,recv)

import qualified Network.Socket.ByteString.Lazy         as SBSL

import           Data.CaseInsensitive(mk)

import           Data.Char(isSpace)



-- import System.Random



#else

import           JavaScript.Web.WebSocket

import qualified JavaScript.Web.MessageEvent           as JM

import           GHCJS.Prim (JSVal)

import           GHCJS.Marshal(fromJSValUnchecked)

import qualified Data.JSString                          as JS





import           JavaScript.Web.MessageEvent.Internal

import           GHCJS.Foreign.Callback.Internal (Callback(..))

import qualified GHCJS.Foreign.Callback                 as CB

import           Data.JSString  (JSString(..), pack)



#endif





import Control.Monad.State

-- import System.IO

import Control.Exception hiding (onException,try)

import Data.Maybe

--import Data.Hashable



--import System.Directory

-- import Control.Monad



import System.IO.Unsafe

import Control.Concurrent.STM as STM

import Control.Concurrent.MVar



import Data.Monoid

import qualified Data.Map as M

import Data.List (nub,(\\)) -- ,find, insert)

import Data.IORef







-- import System.IO



import Control.Concurrent







-- import Data.Dynamic

import Data.String



import System.Mem.StableName

import Unsafe.Coerce







--import System.Random



#ifdef ghcjs_HOST_OS

type HostName  = String

newtype PortID = PortNumber Int deriving (Read, Show, Eq, Typeable)

#endif



data Node= Node{ nodeHost   :: HostName

               , nodePort   :: Int

               , connection :: Maybe (MVar Pool)

               , nodeServices   :: Service

               }



         deriving (Typeable)



instance Ord Node where

   compare node1 node2= compare (nodeHost node1,nodePort node1)(nodeHost node2,nodePort node2)





-- The cloud monad is a thin layer over Transient in order to make sure that the type system

-- forces the logging of intermediate results

newtype Cloud a= Cloud {runCloud' ::TransIO a} deriving (Functor,Monoid,Applicative,Alternative, Monad, Num, Fractional, MonadState EventF)







-- | Execute a distributed computation inside a TransIO computation.

-- All the  computations in the TransIO monad that enclose the cloud computation must be `logged`

runCloud :: Cloud a -> TransIO a



runCloud x= do

       closRemote  <- getSData <|> return (Closure  0)

       runCloud' x <*** setData  closRemote





--instance Monoid a => Monoid (Cloud a) where

--   mappend x y = mappend <$> x <*> y

--   mempty= return mempty



#ifndef ghcjs_HOST_OS



--- empty Hooks for TLS



{-# NOINLINE tlsHooks #-}

tlsHooks ::IORef (SData -> BS.ByteString -> IO ()

                 ,SData -> IO B.ByteString

                 ,NS.Socket -> BS.ByteString -> TransIO ()

                 ,String -> NS.Socket -> BS.ByteString -> TransIO ())

tlsHooks= unsafePerformIO $ newIORef

                 ( notneeded

                 , notneeded

                 , \_ i -> tlsNotSupported i

                 , \_ _ _-> return())



  where

  notneeded= error "TLS hook function called"







  tlsNotSupported input = do

     if ((not $ BL.null input) && BL.head input  == 0x16)

       then  do

         conn <- getSData

         sendRaw conn $ BS.pack $ "HTTP/1.0 525 SSL Handshake Failed\r\nContent-Length: 0\nConnection: close\r\n\r\n"

       else return ()



(sendTLSData,recvTLSData,maybeTLSServerHandshake,maybeClientTLSHandshake)= unsafePerformIO $ readIORef tlsHooks





#endif



-- | Means that this computation will be executed in the current node. the result will be logged

-- so the closure will be recovered if the computation is translated to other node by means of

-- primitives like `beamTo`, `forkTo`, `runAt`, `teleport`, `clustered`, `mclustered` etc

local :: Loggable a => TransIO a -> Cloud a

local =  Cloud . logged



--stream :: Loggable a => TransIO a -> Cloud (StreamVar a)

--stream= Cloud . transport



-- #ifndef ghcjs_HOST_OS

-- | Run a distributed computation inside the IO monad. Enables asynchronous

-- console input (see 'keep').

runCloudIO :: Typeable a =>  Cloud a -> IO (Maybe a)

runCloudIO (Cloud mx)= keep mx



-- | Run a distributed computation inside the IO monad with no console input.

runCloudIO' :: Typeable a =>  Cloud a -> IO (Maybe a)

runCloudIO' (Cloud mx)=  keep' mx



-- #endif



-- | alternative to `local` It means that if the computation is translated to other node

-- this will be executed again if this has not been executed inside a `local` computation.

--

-- > onAll foo

-- > local foo'

-- > local $ do

-- >       bar

-- >       runCloud $ do

-- >               onAll baz

-- >               runAt node ....

-- > callTo node' .....

--

-- Here foo will be executed in node' but foo' bar and baz don't.

--

-- However foo bar and baz will e executed in node.

--



onAll ::  TransIO a -> Cloud a

onAll =  Cloud



-- | only executes if the result is demanded. It is useful when the conputation result is only used in

-- the remote node, but it is not serializable.

lazy :: TransIO a -> Cloud a

lazy mx= onAll $ getCont >>= \st -> Transient $

        return $ unsafePerformIO $  runStateT (runTrans mx) st >>=  return .fst



-- | executes a non-serilizable action in the remote node, whose result can be used by subsequent remote invocations

fixRemote mx= do

             r <- lazy mx

             fixClosure

             return r

        

-- | experimental: subsequent remote invocatioms will send logs to this closure. Therefore logs will be shorter. 

--

-- Also, non serializable statements before it will not be re-executed

fixClosure= atRemote $ local $ async $ return ()



-- log the result a cloud computation. like `loogged`, this erases all the log produced by computations

-- inside and substitute it for that single result when the computation is completed.

loggedc :: Loggable a => Cloud a -> Cloud a

loggedc (Cloud mx)= Cloud $ do

     closRemote  <- getSData <|> return (Closure  0 )

     logged mx <*** setData  closRemote 





loggedc' :: Loggable a => Cloud a -> Cloud a

loggedc' (Cloud mx)= Cloud $ logged mx

  



    



-- | the `Cloud` monad has no `MonadIO` instance. `lliftIO= local . liftIO`

lliftIO :: Loggable a => IO a -> Cloud a

lliftIO= local . liftIO



-- |  `localIO = lliftIO`

localIO :: Loggable a => IO a -> Cloud a

localIO= lliftIO



-- | stop the current computation and does not execute any alternative computation

fullStop :: TransIO stop

fullStop= setData WasRemote >> stop





-- | continue the execution in a new node

beamTo :: Node -> Cloud ()

beamTo node =  wormhole node teleport





-- | execute in the remote node a process with the same execution state

forkTo  :: Node -> Cloud ()

forkTo node= beamTo node <|> return()



-- | open a wormhole to another node and executes an action on it.

-- currently by default it keep open the connection to receive additional requests

-- and responses (streaming)

callTo :: Loggable a => Node -> Cloud a -> Cloud a

callTo node  remoteProc= wormhole node $ atRemote remoteProc

 



#ifndef ghcjs_HOST_OS

-- | A connectionless version of callTo for long running remote calls

callTo' :: (Show a, Read a,Typeable a) => Node -> Cloud a -> Cloud a

callTo' node remoteProc=  do

    mynode <-  local $ getNodes >>= return . head

    beamTo node

    r <-  remoteProc

    beamTo mynode

    return r

#endif



-- | Within a connection to a node opened by `wormhole`, it run the computation in the remote node and return

-- the result back to the original node.

--

-- If `atRemote` is executed in the remote node, then the computation is executed in the original node

--

-- > wormhole node2 $ do

-- >     t <- atRemote $ do

-- >           r <- foo              -- executed in node2

-- >           s <- atRemote bar r   -- executed in the original node

-- >           baz s                 -- in node2

-- >     bat t                      -- in the original node



atRemote :: Loggable a => Cloud a -> Cloud a

atRemote proc= loggedc' $ do

     was <- lazy $ getSData <|> return NoRemote

     teleport                                              -- !> "teleport 1111"

     r <- Cloud $ runCloud' proc  <** setData WasRemote

     teleport                                              -- !> "teleport 2222"

     lazy $ setData was

     return r



-- | Execute a computation in the node that initiated the connection. 

--

-- if the sequence of connections is  n1 -> n2 -> n3 then  `atCallingNode $ atCallingNode foo` in n3 

-- would execute `foo` in n1, -- while `atRemote $ atRemote foo` would execute it in n3

-- atCallingNode :: Loggable a => Cloud a -> Cloud a

-- atCallingNode proc=  connectCaller $ atRemote proc 



-- | synonymous of `callTo`

runAt :: Loggable a => Node -> Cloud a -> Cloud a

runAt= callTo







-- | run a single thread with that action for each connection created.

-- When the same action is re-executed within that connection, all the threads generated by the previous execution

-- are killed

--

-- >   box <-  foo

-- >   r <- runAt node . local . single $ getMailbox box

-- >   localIO $ print r

--

-- if foo  return differnt mainbox indentifiers, the above code would print the

-- messages of  the last one.

-- Without single, it would print the messages of all of them.

single :: TransIO a -> TransIO a

single f= do

   cutExceptions

   Connection{closChildren=rmap} <- getSData <|> error "single: only works within a wormhole"

   mapth <- liftIO $ readIORef rmap

   id <- liftIO $ f `seq` makeStableName f >>= return .  hashStableName





   case  M.lookup id mapth of

          Just tv -> liftIO $ killBranch'  tv      -- !> "JUSTTTTTTTTT"

          Nothing ->  return ()           -- !> "NOTHING"





   tv <- get

   f <** do

          id <- liftIO $ makeStableName f >>= return . hashStableName

          liftIO $ modifyIORef rmap $ \mapth -> M.insert id tv mapth





-- | run an unique continuation for each connection. The first thread that execute `unique` is

-- executed for that connection. The rest are ignored.

unique :: a -> TransIO ()

unique f= do

   Connection{closChildren=rmap} <- getSData <|> error "unique: only works within a connection. Use wormhole"

   mapth <- liftIO $ readIORef rmap

   id <- liftIO $ f `seq` makeStableName f >>= return .  hashStableName



   let mx = M.lookup id mapth

   case mx of

          Just _ -> empty

          Nothing -> do

             tv <- get

             liftIO $ modifyIORef rmap $ \mapth -> M.insert id tv mapth





--data ParentConnection= ParentConnection Connection (Maybe Closure) deriving Typeable



-- | A wormhole opens a connection with another node anywhere in a computation.

-- `teleport` uses this connection to translate the computation back and forth between the two nodes connected

wormhole :: Loggable a => Node -> Cloud a -> Cloud a

wormhole node (Cloud comp) = local $ Transient $ do

  

   moldconn <- getData :: StateIO (Maybe Connection)

   mclosure <- getData :: StateIO (Maybe Closure)

      -- when (isJust moldconn) . setState $ ParentConnection (fromJust moldconn) mclosure

   

   -- labelState $ "wormhole" ++ show node

   Log rec _ _  _<- getData `onNothing` return (Log False [][] 0)



   if not rec                                    

            then runTrans $ (do

                    conn <-  mconnect node

                    liftIO $ writeIORef (remoteNode conn) $ Just node

                    setData  conn{calling= True}

                    

                    setData $ (Closure 0 )

                    

                    comp )

                  <*** do

                       when (isJust moldconn) . setData $ fromJust moldconn

                       when (isJust mclosure) . setData $ fromJust mclosure

                    -- <** is not enough since comp may be reactive

            else do

                    let conn = fromMaybe (error "wormhole: no connection in remote node") moldconn

                    setData $ conn{calling= False}

                    runTrans $ comp

                             <***  do when (isJust mclosure) . setData $ fromJust mclosure



-- | connect to the caller node.

-- connectCaller :: Loggable a => Cloud a -> Cloud a 

-- connectCaller (Cloud comp)= local $ do 

--   conn <-  getState  !> "CONNECTCALLER"

--   case connData conn of

--      Nothing -> empty

--      Just Self -> empty 

--      _ ->  if not $ calling conn !> ("calling", calling conn) then comp else do

--           ParentConnection conn mmclosure <- getState <|> error "connectCaller: No connection defined: use wormhole"

--           moldconn <- getData :: TransIO (Maybe Connection)

--           mclosure <- getData :: TransIO (Maybe Closure)

       

--           -- labelState $ "wormhole" ++ show node

--           Log rec _ _ <- getData `onNothing` return (Log False [][])

       

       

--           if not rec                                    

--                    then do

--                           --  liftIO $ writeIORef (remoteNode conn) $ Just node

--                            setData  conn{calling= True}

--                            setData $ if (isJust mmclosure) 

--                                then fromJust mmclosure

--                                else Closure 0

       

--                            comp 

--                          <*** do when (isJust moldconn) . setData $ fromJust moldconn

--                                  when (isJust mclosure) . setData $ fromJust mclosure

--                            -- <** is not enough since comp may be reactive

--                    else do

--                            let conn = fromMaybe (error "wormhole: no connection in remote node") moldconn

--                            setData $ conn{calling= False}

--                            comp

--                                  <*** do when (isJust mclosure) . setData $ fromJust mclosure



#ifndef ghcjs_HOST_OS

type JSString= String

pack= id







#endif



data CloudException = CloudException Node IdClosure   String deriving (Typeable, Show, Read)



instance Exception CloudException 



teleport ::   Cloud ()

teleport =  local $ do

  Transient $ do

     cont <- get

     Log rec log fulLog closLocal <- getData `onNothing` return (Log False [][] 0)

    

     conn@Connection{connData=contype, localClosures= localClosures,calling= calling} <- getData

                             `onNothing` error "teleport: No connection defined: use wormhole"

     if not rec   -- !> ("teleport rec,loc fulLog=",rec,log,fulLog)

                  -- if is not recovering in the remote node then it is active

      then  do



        

-- when a node call itself, there is no need of socket communications

-- #ifndef ghcjs_HOST_OS

        case contype of

         Just Self ->  runTrans $ do

               setData $ if (not calling) then WasRemote else WasParallel

               abduce   !> "SELF" -- call himself

               liftIO $ do

                  remote <- readIORef $ remoteNode conn

                  writeIORef (myNode conn) $ fromMaybe (error "teleport: no connection?") remote





         _ -> do



-- #endif



         --read this Closure

          Closure closRemote  <- getData `onNothing`  return (Closure 0 )

          



          return () !> ("TELEPORTTTTTTTTTT", closLocal)

         --set his own closure in his Node data



          -- closLocal  <-   liftIO $ randomRIO (0,1000000)

--          node <- runTrans getMyNode

          



          let tosend= reverse $ if closRemote==0 then fulLog     else log

          

          liftIO $ modifyMVar_ localClosures $ \map -> return $ M.insert closLocal cont map

          -- The log sent is in the order of execution. log is in reverse order

          

          -- send log with closure ids at head

          runTrans $ msend conn $ SMore $ ClosureData closRemote closLocal tosend 

                                       !> ("teleport sending", SMore (unsafePerformIO $ readIORef $ remoteNode conn,closRemote,closLocal,tosend))

                                       !> "--------->------>---------->"

                                      --  -- !> ("log",reverse fulLog)

 

  

          setData $ if (not calling) then  WasRemote else WasParallel  -- !> "SET WASPAraLLEL"

          return Nothing



      else do

         delData WasRemote                -- deleting wasremote in teleport

                                          -- it is recovering, therefore it will be the

                                          -- local, not remote

         return $ Just ()



        --  code moved to reportBack

        --  runTrans $ onException $ \(e :: SomeException) -> do 



        --         Closure closRemote <- getData `onNothing` error "teleport: no closRemote"

        --         node <- getMyNode

        --         let msg= SError $ toException $ ErrorCall $  show $ show $ CloudException node closRemote   $ show e

        --         msend conn msg  !> "MSEND"

                

                

             

  return ()                           --  !> "TELEPORT remote"



-- | forward exceptions to the calling node

reportBack :: TransIO ()

reportBack= onException $ \(e :: SomeException) -> do 

    conn<- getData `onNothing` error "reportBack: No connection defined: use wormhole"

    Closure closRemote <- getData `onNothing` error "teleport: no closRemote"

    node <- getMyNode

    let msg= SError $ toException $ ErrorCall $  show $ show $ CloudException node closRemote   $ show e

    msend conn msg  !> "MSEND"







-- | copy a session data variable from the local to the remote node.

-- If there is none set in the local node, The parameter is the default value.

-- In this case, the default value is also set in the local node.

copyData def = do

  r <- local getSData <|> return def

  onAll $ setData r

  return r





-- | write to the mailbox

-- Mailboxes are node-wide, for all processes that share the same connection data, that is, are under the

-- same `listen`  or `connect`

-- while EVars are only visible by the process that initialized  it and his children.

-- Internally, the mailbox is in a well known EVar stored by `listen` in the `Connection` state.

putMailbox :: Typeable val => val -> TransIO ()

putMailbox = putMailbox' (0::Int)



-- | write to a mailbox identified by an identifier besides the type

putMailbox' :: (Typeable key, Ord key, Typeable val) =>  key -> val -> TransIO ()

putMailbox'  idbox dat= do

   let name= MailboxId idbox $ typeOf dat

   Connection{comEvent= mv} <- getData `onNothing` errorMailBox

   mbs <- liftIO $ readIORef mv

   let mev =  M.lookup name mbs

   case mev of

     Nothing ->newMailbox name >> putMailbox' idbox dat

     Just ev -> writeEVar ev $ unsafeCoerce dat





newMailbox :: MailboxId -> TransIO ()

newMailbox name= do

--   return ()  -- !> "newMailBox"

   Connection{comEvent= mv} <- getData `onNothing` errorMailBox

   ev <- newEVar

   liftIO $ atomicModifyIORef mv $ \mailboxes ->   (M.insert name ev mailboxes,())





errorMailBox= error "MailBox: No connection open. Use wormhole"



-- | get messages from the mailbox that matches with the type expected.

-- The order of reading is defined by `readTChan`

-- This is reactive. it means that each new message trigger the execution of the continuation

-- each message wake up all the `getMailbox` computations waiting for it.

getMailbox :: Typeable val => TransIO val

getMailbox = getMailbox' (0 :: Int)



-- | read from a mailbox identified by an identifier besides the type

getMailbox' :: (Typeable key, Ord key, Typeable val) => key -> TransIO val

getMailbox' mboxid = x where

 x = do



   let name= MailboxId mboxid $ typeOf $ typeOf1 x

   Connection{comEvent= mv} <- getData `onNothing` errorMailBox

   mbs <- liftIO $ readIORef mv

   let mev =  M.lookup name mbs

   case mev of

     Nothing ->newMailbox name >> getMailbox' mboxid

     Just ev ->unsafeCoerce $ readEVar ev



 typeOf1 :: TransIO a -> a

 typeOf1 = undefined



-- | delete all subscriptions for that mailbox expecting this kind of data

cleanMailbox :: Typeable a => a -> TransIO ()

cleanMailbox = cleanMailbox' 0



-- | clean a mailbox identified by an Int and the type

cleanMailbox' :: Typeable a => Int ->  a -> TransIO ()

cleanMailbox'  mboxid witness= do

   let name= MailboxId mboxid $ typeOf witness

   Connection{comEvent= mv} <- getData `onNothing` error "getMailBox: accessing network events out of listen"

   mbs <- liftIO $ readIORef mv

   let mev =  M.lookup name mbs

   case mev of

     Nothing -> return()

     Just ev -> do cleanEVar ev

                   liftIO $ atomicModifyIORef mv $ \mbs -> (M.delete name mbs,())



-- | execute a Transient action in each of the nodes connected.

--

-- The response of each node is received by the invoking node and processed by the rest of the procedure.

-- By default, each response is processed in a new thread. To restrict the number of threads

-- use the thread control primitives.

--

-- this snippet receive a message from each of the simulated nodes:

--

-- > main = keep $ do

-- >    let nodes= map createLocalNode [2000..2005]

-- >    addNodes nodes

-- >    (foldl (<|>) empty $ map listen nodes) <|> return ()

-- >

-- >    r <- clustered $ do

-- >               Connection (Just(PortNumber port, _, _, _)) _ <- getSData

-- >               return $ "hi from " ++ show port++ "\n"

-- >    liftIO $ putStrLn r

-- >    where

-- >    createLocalNode n= createNode "localhost" (PortNumber n)

clustered :: Loggable a  => Cloud a -> Cloud a

clustered proc= callNodes (<|>) empty proc





-- A variant of `clustered` that wait for all the responses and `mappend` them

mclustered :: (Monoid a, Loggable a)  => Cloud a -> Cloud a

mclustered proc= callNodes (<>) mempty proc





callNodes op init proc= loggedc' $ do

    nodes <-  local getEqualNodes

    callNodes' nodes op init proc





callNodes' nodes op init proc= loggedc' $ foldr op init $ map (\node -> runAt node proc) nodes

-----

#ifndef ghcjs_HOST_OS

sendRaw (Connection _ _ _ (Just (Node2Web  sconn )) _ _ _ _ _ _) r=

      liftIO $   WS.sendTextData sconn  r                                --  !> ("NOde2Web",r)



sendRaw (Connection _ _ _ (Just (Node2Node _ sock _)) _ _ blocked _ _ _) r=

      liftIO $  withMVar blocked $ const $  SBS.sendMany sock

                                      (BL.toChunks r )                   -- !> ("NOde2Node",r)



sendRaw (Connection _ _ _(Just (TLSNode2Node  ctx )) _ _ blocked _ _ _) r=

      liftIO $ withMVar blocked $ const $ sendTLSData ctx  r       --  !> ("TLNode2Web",r)



#else

sendRaw (Connection _ _ _ (Just (Web2Node sconn)) _ _ blocked _  _ _) r= liftIO $

   withMVar blocked $ const $ JavaScript.Web.WebSocket.send   r sconn   -- !!> "MSEND SOCKET"

#endif



sendRaw _ _= error "No connection stablished"



type LengthFulLog= Int

data NodeMSG= ClosureData IdClosure IdClosure CurrentPointer 

            | RelayMSG Node Node (StreamData NodeMSG) 

   deriving (Typeable, Read, Show)



msend ::  Connection -> StreamData NodeMSG -> TransIO ()



msend (Connection _ _ _ (Just Self) _ _ _ _ _ _) r= return ()



#ifndef ghcjs_HOST_OS





msend (Connection _ _ _ (Just (Node2Node _ sock _)) _ _ blocked _ _ _) r=do

   liftIO $   withMVar blocked $  const $ SBS.sendAll sock $ BC.pack (show r)   !> ("N2N SEND", r)



msend (Connection _ _ _ (Just (TLSNode2Node ctx)) _ _ _ _ _ _) r=

     liftIO $ sendTLSData  ctx $ BS.pack (show r)                              !> "TLS SEND"





msend (Connection _ _ _ (Just (Node2Web sconn)) _ _ _ _ _ _) r=liftIO $

  {-withMVar blocked $ const $ -} WS.sendTextData sconn $ BS.pack (show r)    !> "websockets send"



msend(Connection _ myNode _ (Just (Relay conn remote )) _ _ _ _ _ _) r= do

   origin <- liftIO $ readIORef myNode 

   msend conn $ SMore $ RelayMSG origin remote r





#else



msend (Connection _ _ remoten (Just (Web2Node sconn)) _ _ blocked _  _ _) r= liftIO $  do

  -- when (js_readystate sconn /= 1) $ do  -- must try to reconnect

  --        Just node <- liftIO $ readIORef remoten

  --        ...

  withMVar blocked $ const $ JavaScript.Web.WebSocket.send  (JS.pack $ show r) sconn    !> "MSEND SOCKET"







#endif



msend (Connection _ _ _ Nothing _ _  _ _ _ _) _= error "msend out of connection context: use wormhole to connect"







mread :: Loggable a => Connection -> TransIO (StreamData a)





#ifdef ghcjs_HOST_OS





mread (Connection _ _ _ (Just (Web2Node sconn)) _ _ _ _  _ _)=  wsRead sconn







wsRead :: Loggable a => WebSocket  -> TransIO  a

wsRead ws= do

  dat <- react (hsonmessage ws) (return ())

  case JM.getData dat of

    JM.StringData str  ->  return (read' $ JS.unpack str)

                 !> ("Browser webSocket read", str)  !> "<------<----<----<------"

    JM.BlobData   blob -> error " blob"

    JM.ArrayBufferData arrBuffer -> error "arrBuffer"







wsOpen :: JS.JSString -> TransIO WebSocket

wsOpen url= do

   ws <-  liftIO $ js_createDefault url      --  !> ("wsopen",url)

   react (hsopen ws) (return ())             -- !!> "react"

   return ws                                 -- !!> "AFTER ReACT"



foreign import javascript safe

    "window.location.hostname"

   js_hostname ::    JSVal



foreign import javascript safe

   "window.location.pathname"

  js_pathname ::    JSVal



foreign import javascript safe

    "window.location.protocol"

   js_protocol ::    JSVal



foreign import javascript safe

   "(function(){var res=window.location.href.split(':')[2];if (res === undefined){return 80} else return res.split('/')[0];})()"

   js_port ::   JSVal



foreign import javascript safe

    "$1.onmessage =$2;"

   js_onmessage :: WebSocket  -> JSVal  -> IO ()





getWebServerNode :: TransIO Node

getWebServerNode = liftIO $ do

   h <- fromJSValUnchecked js_hostname

   p <- fromIntegral <$> (fromJSValUnchecked js_port :: IO Int)

   createNode h p





hsonmessage ::WebSocket -> (MessageEvent ->IO()) -> IO ()

hsonmessage ws hscb= do

  cb <- makeCallback MessageEvent hscb

  js_onmessage ws cb



foreign import javascript safe

             "$1.onopen =$2;"

   js_open :: WebSocket  -> JSVal  -> IO ()



foreign import javascript safe

             "$1.readyState"

  js_readystate ::  WebSocket -> Int



newtype OpenEvent = OpenEvent JSVal deriving Typeable

hsopen ::  WebSocket -> (OpenEvent ->IO()) -> IO ()

hsopen ws hscb= do

   cb <- makeCallback OpenEvent hscb

   js_open ws cb



makeCallback :: (JSVal -> a) ->  (a -> IO ()) -> IO JSVal



makeCallback f g = do

   Callback cb <- CB.syncCallback1 CB.ContinueAsync (g . f)

   return cb





foreign import javascript safe

   "new WebSocket($1)" js_createDefault :: JS.JSString -> IO WebSocket





#else

mread (Connection _ _ _ (Just (Node2Node _ _ _)) _ _ _ _ _ _) =  parallelReadHandler -- !> "mread"



mread (Connection _ _ _ (Just (TLSNode2Node _ )) _ _ _ _ _ _) =  parallelReadHandler

--        parallel $ do

--            s <- recvTLSData  ctx

--            return . read' $  BC.unpack s



mread (Connection _ _ _  (Just (Node2Web sconn )) _ _ _ _ _ _)=

        parallel $ do

            s <- WS.receiveData sconn

            return . read' $  BS.unpack s

                !>  ("WS MREAD RECEIVED ----<----<------<--------", s)



mread (Connection  _ _ _ (Just (Relay conn _  )) _ _ _ _ _ _)=  

     mread conn  -- !> "MREAD RELAY"

       







parallelReadHandler :: Loggable a => TransIO (StreamData a)

parallelReadHandler= do

      str <- giveData :: TransIO BS.ByteString

      r <- choose $ readStream  str



      return  r

                   !> ("parallel read handler read",  r)

                   !> "<-------<----------<--------<----------"

    where

    readStream :: (Typeable a, Read a) =>  BS.ByteString -> [StreamData a]

    readStream s=  readStream1 $ BS.unpack s

     where



     readStream1 s=

       let [(x,r)] = reads  s

       in  x : readStream1 r







getWebServerNode :: TransIO Node

getWebServerNode = getNodes >>= return . head

#endif







--release (Node h p rpool _) hand= liftIO $ do

----    print "RELEASED"

--    atomicModifyIORef rpool $  \ hs -> (hand:hs,())

--      -- !!> "RELEASED"



mclose :: Connection -> IO ()



#ifndef ghcjs_HOST_OS



mclose (Connection _ _ _

   (Just (Node2Node _  sock _ )) _ _ _ _ _ _)= NS.close sock



mclose (Connection _ _ _

   (Just (Node2Web sconn ))

   _ _ _ _  _ _)=

    WS.sendClose sconn ("closemsg" :: BS.ByteString)



#else



mclose (Connection _ _ _ (Just (Web2Node sconn)) _ _ blocked _ _ _)=

    JavaScript.Web.WebSocket.close Nothing Nothing sconn



#endif







mconnect :: Node -> TransIO  Connection

mconnect  node'=  do

  node <- fixNode node'

  nodes <- getNodes

  return ()                                                !>  ("mconnnect", nodePort node)

  let fnode =  filter (==node) nodes

  case fnode of

   [] -> mconnect1 node   -- !> "NO NODE"

   [node'@(Node _ _ pool _)] -> do

      plist <- liftIO $  readMVar $ fromJust pool 

      case plist of                                      --  !>  ("length", length plist,nodePort node) of

        (handle:_) -> do

                  delData $ Closure undefined

                  return  handle

                                                           !>   ("REUSED!", node)



        _ -> mconnect1 node'                                 

  where





#ifndef ghcjs_HOST_OS

  mconnect1 (node@(Node host port _ _))= do



     return ()  !> ("MCONNECT1",host,port,nodeServices node)

     (conn,parseContext) <- checkSelf node                                 <|>

                            timeout 1000000 (connectNode2Node host port)   <|>

                            timeout 1000000 (connectWebSockets host port)  <|> 

                            checkRelay                                     <|>

                            (throwt $ ConnectionError "" node)



     setState conn

     setState parseContext

--     return () !> "CONNECTED AFTER TIMEOUT"



     -- write node connected in the connection

     liftIO $ writeIORef (remoteNode conn) $ Just node

     -- write connection in the node

     liftIO $ modifyMVar_ (fromJust $ connection node) . const $ return [conn]

     addNodes [node]



     case connData conn of

       Just Self -> return()

       _         -> watchConnection

     delData $ Closure undefined

     return  conn



    where

    checkSelf node= do

      node' <- getMyNode

      if node /= node' 

       then  empty

       else do

          conn<- case connection node of

             Nothing    -> error "checkSelf error"

             Just ref   ->  do

                 cnn <- getSData <|> error "chechself: no connection"

                 rnode  <- liftIO $ newIORef node

                 conn   <- defConnection >>= \c -> return c{myNode= rnode, comEvent=comEvent cnn,connData= Just Self} !> "DEFF1"

                 liftIO $ withMVar ref $ const $ return [conn]

                 return conn



          return (conn,(ParseContext (error "checkSelf parse error") (error "checkSelf parse error")

                            ::  ParseContext BS.ByteString)) 



    timeout t proc=do

       r <- collect' 1 t proc

       case r of

          []  -> empty

          r:_ -> return r



    checkRelay= do

        return () !> "RELAY"

        myNode <- getMyNode

        if nodeHost node== nodeHost myNode

                then

                    case lookup "localNode" $ nodeServices node   of

                            Just snode -> do

                                con <- mconnect $ read snode 

                                cont <- getSData <|> return  noParseContext

                                return (con,cont)

                            Nothing -> empty

                else do



                  case lookup "relay" $ nodeServices node of

                    Nothing -> empty  -- !> "NO RELAY"

                    Just relayInfo -> do

                      let relay= read relayInfo

                      conn <- mconnect relay        -- !> ("RELAY",relay, node)

                      rem <- liftIO $ newIORef $ Just node

                      -- clos <- liftIO $ newMVar $ M.empty

                      let conn'= conn{connData= Just $ Relay conn node,remoteNode=rem} --,closures= clos}

                    

                      parseContext <- getState <|> return noParseContext

                      return (conn', parseContext)



    noParseContext= (ParseContext (error "relay error") (error "relay error")

                             ::  ParseContext BS.ByteString)



    connectSockTLS host port= do

        return ()                                         !> "connectSockTLS"



        let size=8192

        Connection{myNode=my,comEvent= ev} <- getSData <|> error "connect: listen not set for this node"



        sock  <- liftIO $ connectTo'  size  host $ PortNumber $ fromIntegral port



        conn' <- defConnection >>= \c ->

                     return c{myNode=my, comEvent= ev,connData=

                     

                     Just $ (Node2Node u  sock (error $ "addr: outgoing connection"))} 



        setData conn'

        input <-  liftIO $ SBSL.getContents sock



        setData $ ParseContext (error "parse context: Parse error") input



        

        maybeClientTLSHandshake host sock input





      `catcht` \(_ :: SomeException) ->   empty 





    connectNode2Node host port= do

        return () !> "NODE 2 NODE"

        connectSockTLS host port



        conn <- getSData <|> error "mconnect: no connection data"

        sendRaw conn "CLOS a b\r\n\r\n"

        r <- liftIO $ readFrom conn



        case r of

          "OK" ->  do

                parseContext <- getState

                return (conn,parseContext)



          _    ->  do

               let Connection{connData=cdata}= conn

               case cdata of

                     Just(Node2Node _ s _) ->  liftIO $ NS.close s -- since the HTTP firewall closes the connection

--                   Just(TLSNode2Node c) -> contextClose c   -- TODO

               empty





    connectWebSockets host port = do

         return () !> "WEBSOCKETS"

         connectSockTLS host port  -- a new connection



         never  <- liftIO $ newEmptyMVar :: TransIO (MVar ())

         conn   <- getSData <|> error "connectWebSockets: no connection"

         stream <- liftIO $ makeWSStreamFromConn conn

         wscon  <- react (NWS.runClientWithStream stream (host++(':': show port)) "/"

                      WS.defaultConnectionOptions []) (takeMVar never)





         return (conn{connData=  Just $ (Node2Web wscon)}, noParseContext)



--    noConnection= error $ show node ++ ": no connection"





    watchConnection= do

        conn <- getSData

        parseContext <- getSData <|> error "NO PARSE CONTEXT"

                         :: TransIO (ParseContext BS.ByteString)

        chs <- liftIO $ newIORef M.empty

        let conn'= conn{closChildren= chs}

        -- liftIO $ modifyMVar_ (fromJust pool) $  \plist -> do

        --                  if not (null plist) then print "DUPLICATE" else return ()

        --                  return $ conn':plist    -- !> (node,"ADDED TO POOL")



        -- tell listenResponses to watch incoming responses

        putMailbox  ((conn',parseContext,node)                            

              :: (Connection,ParseContext BS.ByteString,Node))

        liftIO $ threadDelay 100000  -- give time to initialize listenResponses



#else

  mconnect1 (node@(Node host port (Just pool) _))= do

     conn' <- getSData <|> error "connect: listen not set for this node"

     if nodeHost node== "webnode" then return  conn'{connData= Just Self} else do

        ws <- connectToWS host $ PortNumber $ fromIntegral port

--                                                           !> "CONNECTWS"

        let conn=  conn'{connData= Just  (Web2Node ws)}

--                                                           !>  ("websocker CONNECION")

        let parseContext =

                      ParseContext (error "parsecontext not available in the browser")

                        ("" :: JSString)



        chs <- liftIO $ newIORef M.empty

        let conn'= conn{closChildren= chs}

        liftIO $ modifyMVar_ pool $  \plist -> return $ conn':plist

        putMailbox  (conn',parseContext,node)  -- tell listenResponses to watch incoming responses

        delData $ Closure undefined

        return  conn

#endif



  u= undefined



data ConnectionError= ConnectionError String Node deriving Show 



instance Exception ConnectionError



-- mconnect _ = empty







#ifndef ghcjs_HOST_OS

connectTo' bufSize hostname (PortNumber port) =  do

        proto <- BSD.getProtocolNumber "tcp"

        bracketOnError

            (NS.socket NS.AF_INET NS.Stream proto)

            (sClose)  -- only done if there's an error

            (\sock -> do

              NS.setSocketOption sock NS.RecvBuffer bufSize

              NS.setSocketOption sock NS.SendBuffer bufSize

--              NS.setSocketOption sock NS.SendTimeOut 1000000  !> ("CONNECT",port)



              he <- BSD.getHostByName hostname



              NS.connect sock (NS.SockAddrInet port (BSD.hostAddress he))



              return sock)



#else

connectToWS  h (PortNumber p) = do

   protocol <- liftIO $ fromJSValUnchecked js_protocol

   pathname <- liftIO $ fromJSValUnchecked js_pathname

   return () !> ("PAHT",pathname)

   let ps = case (protocol :: JSString)of "http:" -> "ws://"; "https:" -> "wss://"

   wsOpen $ JS.pack $ ps++ h++ ":"++ show p ++ pathname

#endif







type Blocked= MVar ()

type BuffSize = Int

data ConnectionData=

#ifndef ghcjs_HOST_OS

                   Node2Node{port :: PortID

                            ,socket ::Socket

                            ,sockAddr :: NS.SockAddr

                             }

                   | TLSNode2Node{tlscontext :: SData}

                   | Node2Web{webSocket :: WS.Connection}

--                   | WS2Node{webSocketNode :: WS.Connection}

                   | Self

                   | Relay Connection Node  -- (EVar (StreamData NodeMSG))

#else

                   Self

                   | Web2Node{webSocket :: WebSocket}

#endif

   --   deriving (Eq,Ord)





data MailboxId =  forall a .(Typeable a, Ord a) => MailboxId a TypeRep



instance Eq MailboxId where

   id1 == id2 =  id1 `compare` id2== EQ



instance Ord MailboxId where

   MailboxId n t `compare` MailboxId n' t'=

     case typeOf n `compare` typeOf n' of

         EQ -> case n `compare` unsafeCoerce n' of

                 EQ -> t `compare` t'

                 LT -> LT

                 GT -> GT



         other -> other



data Connection= Connection{idConn     :: Int

                           ,myNode     :: IORef Node

                           ,remoteNode :: IORef (Maybe Node)

                           ,connData   :: Maybe ConnectionData

                           ,bufferSize :: BuffSize

                           -- Used by getMailBox, putMailBox

                           ,comEvent   :: IORef (M.Map MailboxId (EVar SData))

                           -- multiple wormhole/teleport use the same connection concurrently

                           ,blocked    :: Blocked

                           ,calling    :: Bool

                           -- local localClosures with his log and his continuation

                           ,localClosures   :: MVar (M.Map IdClosure  EventF)



                           -- for each remote closure that points to local closure 0,

                           -- a new container of child processes

                           -- in order to treat them separately

                           -- so that 'killChilds' do not kill unrelated processes

                           ,closChildren :: IORef (M.Map Int EventF)}



                  deriving Typeable



















defConnection :: (MonadIO m, MonadState EventF m)  => m Connection





-- #ifndef ghcjs_HOST_OS

defConnection =  do

  idc <- genGlobalId

  liftIO $ do

    my <- newIORef (error "node in default connection")

    x <- newMVar ()

    y <- newMVar M.empty

    noremote <- newIORef Nothing

    z <-  return $ error "closchildren: newIORef M.empty"

    return $ Connection idc my noremote Nothing  8192

                  (error "defConnection: accessing network events out of listen")

                  x  False y z







#ifndef ghcjs_HOST_OS

setBuffSize :: Int -> TransIO ()

setBuffSize size= Transient $ do

   conn<- getData `onNothing`  (defConnection !> "DEFF3")

   setData $ conn{bufferSize= size}

   return $ Just ()



getBuffSize=

  (do getSData >>= return . bufferSize) <|> return  8192









-- | Setup the node to start listening for incoming connections.

--

listen ::  Node ->  Cloud ()

listen  (node@(Node _   port _ _ )) = onAll $ do

   addThreads 1

   

   setData $ Log False [] [] 0



   conn' <- getSData <|> defConnection

   ev <- liftIO $ newIORef M.empty

   chs <- liftIO $ newIORef M.empty

   let conn= conn'{connData=Just Self, comEvent=ev,closChildren=chs}

   pool <- liftIO $ newMVar [conn]



   let node'= node{connection=Just pool}

   liftIO $ writeIORef (myNode conn) node'

   setData conn



   liftIO $ modifyMVar_ (fromJust $ connection node') $ const $ return [conn]



   addNodes [node'] 

   

   mlog <- listenNew (fromIntegral port) conn  <|> listenResponses :: TransIO (StreamData NodeMSG)

   return () !> mlog  

   case mlog  of 

       SMore (RelayMSG _ _ _) -> relay mlog

       _                      -> execLog  mlog

 `catcht` (\(e ::SomeException) -> liftIO $ print  e)





-- relayService :: TransIO ()



relay (SMore (RelayMSG origin destiny streamdata)) = do

  nodes <- getNodes

  my <- getMyNode                        -- !> "relayService"

  if destiny== my 

    then do

       case  filter (==origin) nodes of 

          [node] -> do

              (conn: _) <- liftIO $ readMVar $ fromJust $ connection node

              setData  conn

              

          [] -> do

              conn@Connection{remoteNode= rorigin} <- getState 

              let conn'= conn{connData= Just $ Relay conn origin}       --  !> ("Relay set with: ",  origin, destiny)

              pool <- liftIO $ newMVar [conn']

              addNodes [origin{connection= Just pool}]

              setData conn'

       execLog streamdata 

       

    else do

          -- search local node name if hostname is the same 



          -- let destiny' = if nodeHost destiny== nodeHost my 

          --       then

          --           case filter (==destiny) nodes  of 

          --               [node]  -> case lookup "localNode" $ nodeServices node   of

          --                   Just snode ->  read snode 

          --                   Nothing -> destiny

          --               _ -> destiny

          --       else destiny

          -- let origin'=  if nodeHost origin == "localhost" 

          --          then case filter (==origin) nodes of 

          --               [node]  ->case lookup "externalNode" $ nodeServices node of

          --                            Just snode -> read snode

          --                            Nothing -> origin 

          --               _ -> origin

          --       else origin 



          let (origin',destiny')= nat  origin destiny  my nodes

          con <- mconnect destiny'

          msend con . SMore $ RelayMSG origin' destiny' streamdata

          return () !> ("SEND RELAY DATA",streamdata)

          fullStop

  

 

relay _= empty



nat  origin destiny  my nodes= 

          let destiny' = if nodeHost destiny== nodeHost my 

                then

                    case filter (==destiny) nodes  of 

                        [node]  -> case lookup "localNode" $ nodeServices node   of

                            Just snode ->  read snode 

                            Nothing -> destiny

                        _ -> destiny

                else destiny

              origin'=  if nodeHost origin == "localhost" 

                   then case filter (==origin) nodes of 

                        [node]  ->case lookup "externalNode" $ nodeServices node of

                                     Just snode -> read snode

                                     Nothing -> origin 

                        _ -> origin

                else origin 

          in (origin',destiny')



-- listen incoming requests

listenNew port conn'=  do

   



   sock <- liftIO . listenOn $ PortNumber port



   let bufSize= bufferSize conn'

   liftIO $ do NS.setSocketOption sock NS.RecvBuffer bufSize

               NS.setSocketOption sock NS.SendBuffer bufSize



   -- wait for connections. One thread per connection

   (sock,addr) <- waitEvents $ NS.accept sock

   chs <- liftIO $ newIORef M.empty

--   case addr of

--     NS.SockAddrInet port host -> liftIO $ print("connection from", port, host)

--     NS.SockAddrInet6  a b c d -> liftIO $ print("connection from", a, b,c,d)

   noNode <- liftIO $ newIORef Nothing

   id1 <- genId

   let conn= conn'{idConn=id1,closChildren=chs, remoteNode= noNode}



   input <-  liftIO $ SBSL.getContents sock



   cutExceptions

   

   onException $ \(e :: IOException) -> 

          when (ioeGetLocation e=="Network.Socket.recvBuf") $ do

             liftIO $ putStr "listen: " >> print e

             

             let Connection{remoteNode=rnode,localClosures=localClosures,closChildren= rmap} = conn

             -- TODO How to close Connection by discriminating exceptions

             mnode <- liftIO $ readIORef rnode

             case mnode of

               Nothing -> return ()

               Just node  -> do

                             liftIO $ putStr "removing1 node: " >> print node

                             nodes <- getNodes

                             setNodes $ nodes \\ [node]

             liftIO $ do

                  modifyMVar_ localClosures $ const $ return M.empty

                  writeIORef rmap M.empty

             -- topState >>= showThreads

            

             killBranch

             



   setData $ (ParseContext (NS.close sock >> error "Communication error" ) input

             ::ParseContext BS.ByteString)



   setState conn{connData=Just (Node2Node (PortNumber port) sock addr)}

   maybeTLSServerHandshake sock input







  --  (method,uri, headers) <- receiveHTTPHead

   (method, uri, vers) <- getFirstLine

   case method of



     "CLOS" ->

          do

           conn <- getSData

           sendRaw conn "OK"                               --      !> "CLOS detected"



           mread conn



     _ -> do

           let uri'= BC.tail $ uriPath uri !> uriPath uri

           if  "api/" `BC.isPrefixOf` uri'

             then do



               log <- return $ Exec: (Var $ IDyns $ BS.unpack method):(map (Var . IDyns ) $ split $ BC.unpack $ BC.drop 4 uri')





               str <-  giveData  <|> error "no api data"

               headers <- getHeaders

               maybeSetHost headers

               log' <- case (method,lookup "Content-Type" headers) of

                       ("POST",Just "application/x-www-form-urlencoded") -> do

                            len <- read <$> BC.unpack

                                        <$> (Transient $ return (lookup "Content-Length" headers))

                            setData $ ParseContext (return mempty) $ BS.take len str



                            postParams <- parsePostUrlEncoded  <|> return []

                            return $ log ++  [(Var . IDynamic $ postParams)]



                       _ -> return $ log  -- ++ [Var $ IDynamic  str]



               return $ SMore $ ClosureData 0 0  log'



             else if "relay/"  `BC.isPrefixOf` uri' then proxy sock method vers uri'

                

             else do

                   headers <- getHeaders

                   return () !> (method,uri')

                   -- stay serving pages until a websocket request is received

                   servePages (method, uri', headers)

                   conn <- getSData

                   sconn <- makeWebsocketConnection conn uri headers

                   -- websockets mode





                   let conn'= conn{connData= Just (Node2Web sconn)

                             , closChildren=chs}

                   setState conn'    !> "WEBSOCKETS-----------------------------------------------"

                   onException $ \(e :: SomeException) -> do

                            cutExceptions

                            liftIO $ putStr "listen websocket:" >> print e

                            --liftIO $ mclose conn'

                            killBranch

                            empty

--                   async (return (SMore (0,0,[Exec]))) <|> do

                   do

--                     return ()                                                   !> "WEBSOCKET"

                     r <-  parallel $ do

                             msg <- WS.receiveData sconn

                             return ()   !> ("Server WebSocket msg read",msg)

                                         !> "<-------<---------<--------------"



                             case reads $ BS.unpack msg of

                               [] -> do

                                   let log =Exec: [Var $ IDynamic  (msg :: BS.ByteString)]

                                   return $ SMore (ClosureData 0 0 log)

                               ((x ,_):_) -> return (x :: StreamData NodeMSG) -- StreamData (Int,Int,[LogElem]))



                     case r of

                       SError e -> do

--                           liftIO $ WS.sendClose sconn ("error" :: BS.ByteString)

                           back e

--                                                                 !> "FINISH1"

                       _ -> return r



     where

      uriPath = BC.dropWhile (/= '/')

      split []= []

      split ('/':r)= split r

      split s=

          let (h,t) = span (/= '/') s

          in h: split  t

      

      -- reverse proxy for urls that look like http://host:port/relay/otherhost/otherport

      proxy sclient method vers uri' = do

        -- get host port

        let (host:port:_)=  split $ BC.unpack $ BC.drop 6 uri'

        sserver <- liftIO $ connectTo' 4096 host $ PortNumber $ fromIntegral $ read port

                       

        rawHeaders <- getRawHeaders

        let uri= BS.fromStrict $ let d x= BC.tail $ BC.dropWhile (/= '/') x in d . d $ d uri'

        

        let sent=   method <> BS.pack " /" 

                           <> uri  

                           <> BS.cons ' ' vers 

                           <> BS.pack "\r\n" 

                           <> rawHeaders <> BS.pack "\r\n\r\n"

        liftIO $ SBSL.send  sserver sent

          -- Connection{connData=Just (Node2Node _ sclient _)} <- getState <|> error "proxy: no connection"

        cutExceptions

        onException $ \(e:: SomeException ) -> liftIO $ do 

                            putStr "Proxy: " >> print e

                            sClose sserver

                            sClose sclient



        send sclient sserver <|> send sserver sclient

        empty

        where

        send f t= async $ mapData f t

        mapData from to = do

            content <- recv from 4096 

            -- return () !> (" proxy received ", content)

            if not $ BC.null content 

              then sendAll to content >> mapData from to

              else finish

            where

            finish= sClose from >> sClose to

           -- throw $ Finish "finish"

           



      maybeSetHost headers= do

        setHost <- liftIO $ readIORef rsetHost

        when setHost $ do



          mnode <- liftIO $ do

           let mhost= lookup "Host" headers

           case mhost of

              Nothing -> return Nothing

              Just host -> atomically $ do

                   -- set the firt node (local node) as is called from outside

                     nodes <- readTVar  nodeList

                     let (host1,port)= BC.span (/= ':') host

                         hostnode= (head nodes){nodeHost=  BC.unpack host1

                                           ,nodePort= if BC.null port then 80

                                            else read $ BC.unpack $ BC.tail port}

                     writeTVar nodeList $ hostnode : tail nodes

                     return $ Just  hostnode  -- !> (host1,port)



          when (isJust mnode) $ do

            conn <- getState

            liftIO $ writeIORef (myNode conn) $fromJust mnode

          liftIO $ writeIORef rsetHost False  -- !> "HOSt SET"



{-#NOINLINE rsetHost #-}

rsetHost= unsafePerformIO $ newIORef True







--instance Read PortNumber where

--  readsPrec n str= let [(n,s)]=   readsPrec n str in [(fromIntegral n,s)]





--deriving instance Read PortID

--deriving instance Typeable PortID

#endif



listenResponses :: Loggable a => TransIO (StreamData a)

listenResponses= do

      (conn, parsecontext, node) <- getMailbox  -- :: TransIO (Connection,ParseContext BS.ByteString,Node)

      labelState $ "listen from: "++ show node

--      return () !> ("LISTEN",case connData conn of Just (Relay _) -> "RELAY"; _ -> "OTHER")

      setData conn



#ifndef ghcjs_HOST_OS

      setData (parsecontext :: ParseContext BS.ByteString)

#else

      setData (parsecontext :: ParseContext JSString)

#endif







      cutExceptions

      onException (\(e:: SomeException) -> do

        liftIO $ putStr "ListenResponses: " >> print e

        liftIO $ putStr "removing node: " >> print node

        nodes <- getNodes

        setNodes $ nodes \\ [node]

      --  topState >>= showThreads

        killChilds

        let Connection{localClosures=localClosures}= conn

        liftIO $ modifyMVar_ localClosures $ const $ return M.empty)





      mread conn









type IdClosure= Int



-- The remote closure ids for each node connection

newtype Closure= Closure  IdClosure -- deriving Show











type RemoteClosure=  (Node, IdClosure)



newtype JobGroup= JobGroup  (M.Map String RemoteClosure) deriving Typeable



-- | if there is a remote job  identified by th string identifier, it stop that job, and set the

-- current remote operation (if any) as the current remote job for this identifier.

-- The purpose is to have a single remote job.

--  to identify the remote job, it should be used after the `wormhole` and before the remote call:

--

-- r <- wormhole node $ do

--        stopRemoteJob "streamlog"

--        atRemote myRemotejob

--

-- So:

--

-- runAtUnique ident node job= wormhole node $ do stopRemoteJob ident; aRemote job

stopRemoteJob :: String -> Cloud ()

stopRemoteJob ident =  do

    local $  do

      JobGroup map <- getRState <|> return (JobGroup M.empty)

      let mj= M.lookup ident map

      when (isJust mj) $  putMailbox $ fromJust mj

    fixClosure

    local $ do

      JobGroup map <- getRState <|> return (JobGroup M.empty)

      Closure closr <- getData `onNothing` error "resetRemote: Closure not set, use wormhole"

      conn <- getData `onNothing` error "resetRemote: no connection set"

      remote <- liftIO $ readIORef $ remoteNode conn

      when (isJust remote) $  do

        setRState $ JobGroup $ M.insert ident (fromJust remote,closr) map

        putMailbox  (fromJust remote, closr)





-- kill the remote job. Usually, before starting a new one.

resetRemote :: Cloud ()

resetRemote= local $ do 

   Closure clos <- getState  `onNothing` return (Closure 0)

   conn <- getData `onNothing` error "resetRemote: no connection set"

   remote <- liftIO $ readIORef $ remoteNode conn

   when (isJust remote)  $ putMailbox  (fromJust remote, clos)



-- | delete closures in a remote node when is requested by `resetRemote` or `stopRemoteJob`.

-- This is necessary because a remote closure can be reactive or may take a long time.

--

-- It should be located as an alternative computation to the program:

--

-- >  main= initNode $ inputNodes <|> manageClosures <|>  myCloudCode

manageClosures =   do

   (remote, clos) <- local getMailbox

   localIO $ print ("MANAGECLOSURESSSSSSSSSSSSSS", clos)

   when (clos /= 0) $ runAt remote $  local $ do

      conn@Connection {localClosures=localClosures} <- getData `onNothing` error "Listen: myNode not set"

      mcont <- liftIO $ modifyMVar localClosures $ \map -> return ( M.delete clos map,  M.lookup clos map)

      case mcont of

        Nothing -> error $ "closure not found: " ++ show clos

        Just cont -> do 

                        showThreads $  fromJust $ parent cont

                        liftIO $ killBranch' cont   

                        return ()

  





execLog :: StreamData NodeMSG -> TransIO ()

execLog  mlog =  Transient $ do

       

       return () !> "EXECLOG"

       case mlog of

             SError e -> do

               case fromException e of

                 Just (ErrorCall str) -> do

                  case read str of

                    (e@(CloudException  _ closl   err)) -> do

                      process  closl (error "closr: should not be used") (Left  e) True

                 



             SDone   -> runTrans(back $ ErrorCall "SDone") >> return Nothing   -- TODO remove closure?

             SMore (ClosureData closl closr  log) -> process closl closr  (Right log) False

             SLast (ClosureData closl closr  log) -> process closl closr  (Right log) True

  -- !> ("EXECLOG",mlog)

   where

   process :: IdClosure -> IdClosure  -> (Either CloudException CurrentPointer) -> Bool -> StateIO (Maybe ())

   process  closl closr  mlog  deleteClosure= do

      conn@Connection {localClosures=localClosures} <- getData `onNothing` error "Listen: myNode not set"

      if closl== 0 then case mlog of

        Left except -> do

          setData $ Log True [] []

          --setData $ Closure closr

          return () !> "THROWWWW1"

          runTrans $ throwt except

          empty

        Right log -> do

           -- runTrans cutExceptions !> "CUTEXCEPTIONS"

           setData $ Log True log  (reverse log) 0  

           setData $ Closure  closr

           

           

           return $ Just ()                  --  !> "executing top level closure"

       else do



         mcont <- liftIO $ modifyMVar localClosures

                         $ \map -> return (if deleteClosure then

                                           M.delete closl map

                                         else map, M.lookup closl map)

                                           -- !> ("localClosures=", M.size map)

         case mcont of

           Nothing -> do

--

--              if closl == 0   -- add what is after execLog as closure 0

--               then do

--                     setData $ Log True log  $ reverse log

--                     setData $ Closure closr

--                     cont <- get    !> ("CLOSL","000000000")

--                     liftIO $ modifyMVar localClosures

--                            $ \map -> return (M.insert closl ([],cont) map,())

--                     return $ Just ()     --exec what is after execLog (closure 0)

--

--               else do

                     runTrans $ msend conn $ SLast (ClosureData closr closl  [])

                        -- to delete the remote closure

                     runTrans $ liftIO $ error ("request received for non existent closure: "

                                             ++  show closl)

           -- execute the closure

           Just cont -> do  -- remove fulLog?

              liftIO $ runStateT (case mlog of

                Right log -> do

                  Log _ _ fulLog hash <- getData `onNothing` return (Log True [] [] 0)

                  -- return() !> ("fullog in execlog", reverse fulLog)

                  let nlog= reverse log ++  fulLog

                  

                  setData $ Log True  log  nlog  hash

                  setData $ Closure  closr

                                              

                  runContinuation cont ()



                Left except -> do

                  setData $ Log True  []  []

                  --setData $ Closure  closr

                  return () !> "THROWWWW2"

                  runTrans $ throwt except) cont

              return Nothing

                            



#ifdef ghcjs_HOST_OS

listen node = onAll $ do

        addNodes [node]



        events <- liftIO $ newIORef M.empty

        rnode  <- liftIO $ newIORef node

        conn <-  defConnection >>= \c -> return c{myNode=rnode,comEvent=events}

        setData conn

        r <- listenResponses

        execLog  r

#endif



type Pool= [Connection]

type Package= String

type Program= String

type Service= [(Package, Program)]











--------------------------------------------





#ifndef ghcjs_HOST_OS





--    maybeRead line= unsafePerformIO $ do

--         let [(v,left)] = reads  line

----         print v

--         (v   `seq` return [(v,left)])

--                        `catch` (\(e::SomeException) -> do

--                          liftIO $ print  $ "******readStream ERROR in: "++take 100 line

--                          maybeRead left)





readFrom Connection{connData= Just(TLSNode2Node ctx)}= recvTLSData ctx



readFrom Connection{connData= Just(Node2Node _ sock _)} =  toStrict <$> loop



  where

  bufSize= 4098

  loop :: IO BL.ByteString

  loop = unsafeInterleaveIO $ do

    s <- SBS.recv sock bufSize



    if BC.length s < bufSize

      then  return $ BLC.Chunk s mempty

      else BLC.Chunk s `liftM` loop

      

readFrom _ = error "readFrom error"



toStrict= B.concat . BS.toChunks



makeWSStreamFromConn conn= do

     let rec= readFrom conn

         send= sendRaw conn

     makeStream                  -- !!> "WEBSOCKETS request"

            (do

                bs <-  rec         -- SBS.recv sock 4098

                return $ if BC.null bs then Nothing else Just  bs)

            (\mbBl -> case mbBl of

                Nothing -> return ()

                Just bl ->  send bl) -- SBS.sendMany sock (BL.toChunks bl) >> return())   -- !!> show ("SOCK RESP",bl)



makeWebsocketConnection conn uri headers= liftIO $ do



         stream <- makeWSStreamFromConn conn

         let

             pc = WS.PendingConnection

                { WS.pendingOptions     = WS.defaultConnectionOptions

                , WS.pendingRequest     = NWS.RequestHead  uri  headers False -- RequestHead (BC.pack $ show uri)

                                                      -- (map parseh headers) False

                , WS.pendingOnAccept    = \_ -> return ()

                , WS.pendingStream      = stream

                }





         sconn    <- WS.acceptRequest pc               -- !!> "accept request"

         WS.forkPingThread sconn 30

         return sconn



servePages (method,uri, headers)   = do

--   return ()                        !> ("HTTP request",method,uri, headers)

   conn <- getSData <|> error " servePageMode: no connection"



   if isWebSocketsReq headers

     then  return ()







     else do



        let file= if BC.null uri then "index.html" else uri



        {- TODO rendering in server
           NEEDED:  recodify View to use blaze-html in server. wlink to get path in server
           does file exist?
           if exist, send else do
              store path, execute continuation
              get the rendering
              send trough HTTP
           - put this logic as independent alternative programmer options
              serveFile dirs <|> serveApi apis <|> serveNode nodeCode
        -}

        mcontent <- liftIO $ (Just <$> BL.readFile ( "./static/out.jsexe/"++ BC.unpack file) )

                                `catch` (\(e:: SomeException) -> return Nothing)

                                

--                                    return  "Not found file: index.html<br/> please compile with ghcjs<br/> ghcjs program.hs -o static/out")

        case mcontent of

          Just content -> liftIO $ sendRaw conn $

            "HTTP/1.0 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\nContent-Length: "

            <> BS.pack (show $ BL.length content) <>"\r\n\r\n" <> content



          Nothing ->liftIO $ sendRaw conn $ BS.pack $ 

              "HTTP/1.0 404 Not Found\nContent-Length: 13\nConnection: close\n\nNot Found 404"

        empty





--counter=  unsafePerformIO $ newMVar 0

api :: TransIO BS.ByteString -> Cloud ()

api  w= Cloud  $ do

   conn <- getSData  <|> error "api: Need a connection opened with initNode, listen, simpleWebApp"

   let send= sendRaw conn

   r <- w

   send r                         --  !> r



















isWebSocketsReq = not  . null

    . filter ( (== mk "Sec-WebSocket-Key") . fst)





data HTTPMethod= GET | POST deriving (Read,Show,Typeable)



getFirstLine=  (,,) <$> getMethod <*> (toStrict <$> getUri) <*> getVers

    where

    getMethod= parseString

    getUri= parseString

    getVers= parseString



getRawHeaders= dropSpaces >> parse (scan mempty)

  --  rs <- manyTill line (string "\r\n\r\n") 

  --  return $ BS.concat rs

  --  where

  --  line= parse cond

   where

   scan  res str

       | "\r\n\r\n" `BS.isPrefixOf` str= (res, BS.drop 4 str)

       | otherwise=  scan ( BS.snoc res $ BS.head str) $ BS.tail str 

  --  line= do

  --   dropSpaces

  --   tTakeWhile (not . endline)



type PostParams = [(BS.ByteString, String)]



parsePostUrlEncoded :: TransIO PostParams

parsePostUrlEncoded=  do

   dropSpaces

   many $ (,) <$> param  <*> value

   where

   param= tTakeWhile' ( /= '=')

   value= unEscapeString <$> BS.unpack <$> tTakeWhile' ( /= '&')









getHeaders =  manyTill paramPair  (string "\r\n\r\n")          -- !>  (method, uri, vers)



  where  





  paramPair=  (,) <$> (mk <$> getParam) <*> getParamValue

  



  getParam= do

      dropSpaces

      r <- tTakeWhile (\x -> x /= ':' && not (endline x))

      if BS.null r || r=="\r"  then  empty  else  dropChar >> return (toStrict r)



  getParamValue= toStrict <$> ( dropSpaces >> tTakeWhile  (\x -> not (endline x)))







#endif







#ifdef ghcjs_HOST_OS

isBrowserInstance= True

api _= empty

#else

-- | Returns 'True' if we are running in the browser.

isBrowserInstance= False



#endif











{-# NOINLINE emptyPool #-}

emptyPool :: MonadIO m => m (MVar Pool)

emptyPool= liftIO $ newMVar  []





-- | Create a node from a hostname (or IP address), port number and a list of

-- services.

createNodeServ ::  HostName -> Int -> Service -> IO Node

createNodeServ h p svs=  return $ Node h  p Nothing svs





createNode :: HostName -> Int -> IO Node

createNode h p= createNodeServ h p []



createWebNode :: IO Node

createWebNode= do

  pool <- emptyPool

  return $ Node "webnode"  0 (Just pool)  [("webnode","")]





instance Eq Node where

    Node h p _ _ ==Node h' p' _ _= h==h' && p==p'





instance Show Node where

    show (Node h p _ servs )= show (h,p, servs)



instance Read Node where

    readsPrec n s=

          let r= readsPrec n s

          in case r of

            [] -> []

            [((h,p,ss),s')] ->  [(Node h p Nothing ss ,s')]

          

          





-- inst    ghc-options: -threaded -rtsopts



nodeList :: TVar  [Node]

nodeList = unsafePerformIO $ newTVarIO []



deriving instance Ord PortID



--myNode :: Int -> DBRef  MyNode

--myNode= getDBRef $ key $ MyNode undefined



errorMyNode f= error $ f ++ ": Node not set. initialize it with connect, listen, initNode..."



-- | Return the local node i.e. the node where this computation is running.

getMyNode ::  TransIO Node -- (MonadIO m, MonadState EventF m) => m Node

getMyNode =  do

    Connection{myNode= node} <- getSData <|> errorMyNode "getMyNode"  :: TransIO Connection

    liftIO $ readIORef node



-- | Return the list of nodes in the cluster.

getNodes :: MonadIO m => m [Node]

getNodes  = liftIO $ atomically $ readTVar  nodeList



-- getEqualNodes= getNodes 



getEqualNodes = do

    nodes <- getNodes

    let srv= nodeServices $ head nodes

    case srv of

      [] -> return $ filter (null . nodeServices) nodes 

      (srv:_)  -> return $ filter (\n ->  head (nodeServices n) == srv  ) nodes

 

matchNodes f = do

      nodes <- getNodes

      return $ map (\n -> filter f $ nodeServices n) nodes 



-- | Add a list of nodes to the list of existing cluster nodes.

addNodes :: [Node] ->  TransIO () -- (MonadIO m, MonadState EventF m) => [Node] -> m ()

addNodes   nodes=  do

--  my <- getMyNode    -- mynode must be first

  nodes' <- mapM fixNode nodes

  liftIO . atomically $ do

    prevnodes <- readTVar nodeList

    writeTVar nodeList $  nub $ prevnodes ++ nodes'



fixNode n= case connection n of

  Nothing -> do

      pool <- emptyPool

      return n{connection= Just pool}

  Just _ -> return n



-- | set the list of nodes

setNodes nodes= liftIO $ atomically $ writeTVar nodeList $  nodes





-- | Shuffle the list of cluster nodes and return the shuffled list.

shuffleNodes :: MonadIO m => m [Node]

shuffleNodes=  liftIO . atomically $ do

  nodes <- readTVar nodeList

  let nodes'= tail nodes ++ [head nodes]

  writeTVar nodeList nodes'

  return nodes'



--getInterfaces :: TransIO TransIO HostName

--getInterfaces= do

--   host <- logged $ do

--      ifs <- liftIO $ getNetworkInterfaces

--      liftIO $ mapM_ (\(i,n) ->putStrLn $ show i ++ "\t"++  show (ipv4 n) ++ "\t"++name n)$ zip [0..] ifs

--      liftIO $ putStrLn "Select one: "

--      ind <-  input ( < length ifs)

--      return $ show . ipv4 $ ifs !! ind









-- #ifndef ghcjs_HOST_OS

--instance Read NS.SockAddr where

--    readsPrec _ ('[':s)=

--       let (s',r1)= span (/=']')  s

--           [(port,r)]= readsPrec 0 $ tail $ tail r1

--       in [(NS.SockAddrInet6 port 0 (IP.toHostAddress6 $  read s') 0, r)]

--    readsPrec _ s=

--       let (s',r1)= span(/= ':') s

--           [(port,r)]= readsPrec 0 $ tail r1

--       in [(NS.SockAddrInet port (IP.toHostAddress $  read s'),r)]

-- #endif



--newtype MyNode= MyNode Node deriving(Read,Show,Typeable)





--instance Indexable MyNode where key (MyNode Node{nodePort=port}) =  "MyNode "++ show port

--

--instance Serializable MyNode where

--    serialize= BS.pack . show

--    deserialize= read . BS.unpack







-- | Add a node (first parameter) to the cluster using a node that is already

-- part of the cluster (second parameter).  The added node starts listening for

-- incoming connections and the rest of the computation is executed on this

-- newly added node.

connect ::  Node ->  Node -> Cloud ()

#ifndef ghcjs_HOST_OS

connect  node  remotenode =   do

    listen node <|> return ()

    connect' remotenode







-- | Reconcile the list of nodes in the cluster using a remote node already

-- part of the cluster. Reconciliation end up in each node in the cluster

-- having  the same list of nodes.

connect' :: Node -> Cloud ()

connect'  remotenode= loggedc $ do

    nodes <- local getNodes

    localIO $ putStr "connecting to: " >> print remotenode



    newNodes <- runAt remotenode $ interchange  nodes



    local $ return ()                                                              !> "interchange finish"



    -- add the new  nodes to the local nodes in all the nodes connected previously



    let toAdd=remotenode:tail newNodes

    callNodes' nodes  (<>) mempty $ local $ do

           liftIO $ putStr  "New nodes: " >> print toAdd !> "NEWNODES"

           addNodes toAdd



    where

    -- receive new nodes and send their own

    interchange  nodes=

        do

           newNodes <- local $ do

              conn@Connection{remoteNode=rnode, connData=Just cdata} <- getSData <|>

               error ("connect': need to be connected to a node: use wormhole/connect/listen")





              -- if is a websockets node, add only this node

              -- let newNodes = case  cdata of

              --                  Node2Web _ -> [(head nodes){nodeServices=[("relay",show remotenode)]}]

              --                  _ ->  nodes

              let newNodes= map (\n -> n{nodeServices= nodeServices n ++ [("relay",show (remotenode,n))]}) nodes



              callingNode<- fixNode $ head newNodes



              liftIO $ writeIORef rnode $ Just callingNode



              liftIO $ modifyMVar_ (fromJust $ connection callingNode) $ const $ return [conn]





              -- onException $ \(e :: SomeException) -> do

              --      liftIO $ putStr "connect:" >> print e

              --      liftIO $ putStrLn "removing node: " >> print callingNode

              --     --  topState >>= showThreads

              --      nodes <- getNodes

              --      setNodes $ nodes \\ [callingNode]



              return newNodes



           oldNodes <- local $ getNodes





           mclustered . local $ do

                liftIO $ putStrLn  "New nodes: " >> print newNodes



                addNodes newNodes  



           localIO $ atomically $ do

                  -- set the firt node (local node) as is called from outside

--                     return () !> "HOST2 set"

                     nodes <- readTVar  nodeList

                     let nodes'= (head nodes){nodeHost=nodeHost remotenode

                                             ,nodePort=nodePort remotenode}:tail nodes

                     writeTVar nodeList nodes'





           return oldNodes



#else

connect _ _= empty

connect' _ = empty

#endif