-----------------------------------------------------------------------------
--
-- Module      :  Transient.Move
-- Copyright   :
-- License     :  GPL-3
--
-- Maintainer  :  agocorona@gmail.com
-- Stability   :
-- Portability :
--
-- | see <https://www.fpcomplete.com/user/agocorona/moving-haskell-processes-between-nodes-transient-effects-iv>
-----------------------------------------------------------------------------
{-# LANGUAGE DeriveDataTypeable , ExistentialQuantification, OverloadedStrings
    ,ScopedTypeVariables, StandaloneDeriving, RecordWildCards, FlexibleContexts, CPP
    ,GeneralizedNewtypeDeriving #-}
module Transient.Move(

Cloud(..),runCloudIO, runCloudIO',local,onAll,lazy, loggedc, lliftIO,localIO,
listen, Transient.Move.connect, connect', fullStop,

wormhole, teleport, copyData,

beamTo, forkTo, streamFrom, callTo, runAt, atRemote,

clustered, mclustered,

newMailbox, putMailbox,getMailbox,cleanMailbox,

#ifndef ghcjs_HOST_OS
setBuffSize, getBuffSize,
#endif

createNode, createWebNode, createNodeServ, getMyNode, getNodes,
addNodes, shuffleNodes,

-- * low level

 getWebServerNode, Node(..), nodeList, Connection(..), Service(),
 isBrowserInstance, Prefix(..), addPrefix
 ,defConnection


) where
import Transient.Base
import Transient.Internals(IDynamic(..),killChildren,getCont,runCont,EventF(..),LogElem(..),Log(..)
       ,onNothing,RemoteStatus(..),getCont,StateIO,readsPrec')
import Transient.Logged
import Transient.Indeterminism(choose)
import Transient.EVars
import Data.Typeable
import Control.Applicative
#ifndef ghcjs_HOST_OS
import Network
import Network.Info
import qualified Data.IP as IP
import qualified Network.Socket as NS
import qualified Network.BSD as BSD
import qualified Network.WebSockets as NWS(RequestHead(..))

import qualified Network.WebSockets.Connection   as WS

import Network.WebSockets.Stream   hiding(parse)
import qualified Data.ByteString       as B             (ByteString,concat)
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy.Internal as BLC
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BS
import Network.Socket.ByteString as SBS(send,sendMany,sendAll,recv)
import qualified Network.Socket.ByteString.Lazy as SBSL
import Data.CaseInsensitive(mk)
import Data.Char(isSpace)
--import GHCJS.Perch (JSString)
#else
import  JavaScript.Web.WebSocket
import  qualified JavaScript.Web.MessageEvent as JM
import GHCJS.Prim (JSVal)
import GHCJS.Marshal(fromJSValUnchecked)
import qualified Data.JSString as JS


import           JavaScript.Web.MessageEvent.Internal
import           GHCJS.Foreign.Callback.Internal (Callback(..))
import qualified GHCJS.Foreign.Callback          as CB
import Data.JSString  (JSString(..), pack)

#endif

import qualified Data.Text as T
import Control.Monad.State
import System.IO
import Control.Exception
import Data.Maybe
import Unsafe.Coerce

--import System.Directory
import Control.Monad

import System.IO.Unsafe
import Control.Concurrent.STM as STM
import Control.Concurrent.MVar

import Data.Monoid
import qualified Data.Map as M
import Data.List (nub,(\\),find, insert)
import Data.IORef



import System.IO

import Control.Concurrent

import System.Random



import Data.Dynamic
import Data.String


#ifdef ghcjs_HOST_OS
type HostName  = String
newtype PortID = PortNumber Int deriving (Read, Show, Eq, Typeable)
#endif

data Node= Node{ nodeHost   :: HostName
               , nodePort   :: Int
               , connection :: MVar Pool
               , nodeServices   :: [Service]
               }

         deriving (Typeable)

instance Ord Node where
   compare node1 node2= compare (nodeHost node1,nodePort node1)(nodeHost node2,nodePort node2)


-- The cloud monad is a thin layer over Transient in order to make sure that the type system
-- forces the logging of intermediate results
newtype Cloud a= Cloud {runCloud ::TransIO a} deriving (Functor,Applicative,Monoid,Alternative, Monad, MonadState EventF)
--
--
--instance Applicative Cloud  where
--
--   pure = return
--
--   x <*> y= Cloud . Transient $ do
--       l1 <- liftIO $ newIORef Nothing
--       l2 <- liftIO $ newIORef Nothing
--       runTrans $ do
--           Log _  _ full <- getData `onNothing` error "instance Applicative: no log"
--           r <- runCloud (eval l1 x) <*> runCloud (eval l2 y)
--           Just v1 <- localIO $ readIORef l1
--           Just v2 <- localIO $ readIORef l2
--           let full' = Var (toIDyn v1) : Var (toIDyn v2) : full
--           setData $ Log False full' full'
--           return r
--       where
--       eval l x= x >>= \v -> localIO (writeIORef l $ Just v) >> return v
--
--
--instance Monoid a => Monoid (Cloud a) where
--   mappend x y = mappend <$> x <*> y
--   mempty= return mempty

-- | Means that this computation will be executed in the current node. the result will be logged
-- so the closure will be recovered if the computation is translated to other node by means of
-- primitives like `beamTo`, `forkTo`, `runAt`, `teleport`, `clustered`, `mclustered` etc
local :: Loggable a => TransIO a -> Cloud a
local =  Cloud . logged

--stream :: Loggable a => TransIO a -> Cloud (StreamVar a)
--stream= Cloud . transport

-- #ifndef ghcjs_HOST_OS
-- | run the cloud computation.
runCloudIO :: Cloud a -> IO a
runCloudIO (Cloud mx)= keep mx

-- | run the cloud computation with no console input
runCloudIO' :: Cloud a -> IO a
runCloudIO' (Cloud mx)= keep' mx

-- #endif

-- | alternative to `local` It means that if the computation is translated to other node
-- this will be executed again if this has not been executed inside a `local` computation.
--
-- > onAll foo
-- > local foo'
-- > local $ do
-- >       bar
-- >       runCloud $ do
-- >               onAll baz
-- >               runAt node ....
-- > callTo node' .....
--
-- Here foo will be executed in node' but foo' bar and baz don't.
--
-- However foo bar and baz will e executed in node.
--

onAll ::  TransIO a -> Cloud a
onAll =  Cloud

lazy :: TransIO a -> Cloud a
lazy mx= onAll $ getCont >>= \st -> Transient $
        return $ unsafePerformIO $  runStateT (runTrans mx) st >>=  return .fst

-- log the result a cloud computation. like `loogged`, This eliminated all the log produced by computations
-- inside and substitute it for that single result when the computation is completed.
loggedc :: Loggable a => Cloud a -> Cloud a
loggedc (Cloud mx)= Cloud $ logged mx

-- | the `Cloud` monad has no `MonadIO` instance. `lliftIO= local . liftIO`
lliftIO :: Loggable a => IO a -> Cloud a
lliftIO= local . liftIO

-- | locally perform IO. `localIO = lliftIO`
localIO :: Loggable a => IO a -> Cloud a
localIO= lliftIO

--remote :: Loggable a => TransIO a -> Cloud a
--remote x= Cloud $ step' x $ \full x ->  Transient $ do
--            let add= Wormhole: full
--            setData $ Log False add add
--
--            r <-  runTrans x
--
--            let add= WaitRemote: full
--            (setData $ Log False add add)     -- !!> "AFTER STEP"
--            return  r

-- | stop the current computation and does not execute any alternative computation
fullStop :: Cloud stop
fullStop= onAll $ setData WasRemote >> stop


-- | continue the execution in a new node
-- all the previous actions from `listen` to this statement must have been logged
beamTo :: Node -> Cloud ()
beamTo node =  wormhole node teleport


-- | execute in the remote node a process with the same execution state
forkTo  :: Node -> Cloud ()
forkTo node= beamTo node <|> return()

-- | open a wormhole to another node and executes an action on it.
-- currently by default it keep open the connection to receive additional requests
-- and responses (streaming)
callTo :: Loggable a => Node -> Cloud a -> Cloud a
callTo node  remoteProc=
   wormhole node $ atRemote remoteProc

-- |  Within an open connection to other node opened by `wormhole`, it run the computation in the remote node and return
-- the result back  to the original node.
atRemote proc= loggedc $ do
     teleport                    -- !> "teleport 1111"
     r <- Cloud $ runCloud proc <** setData WasRemote
     teleport                    -- !> "teleport 2222"
     return r

-- | synonymous of `callTo`
runAt :: Loggable a => Node -> Cloud a -> Cloud a
runAt= callTo


msend :: Loggable a => Connection -> StreamData a -> TransIO ()


#ifndef ghcjs_HOST_OS

msend (Connection _(Just (Node2Node _ sock _)) _ _ blocked _ _ ) r= do
  r <- liftIO $ do
       withMVar blocked $
             const $ do
                 SBS.send sock $ BC.pack (show r)
                 return Nothing
            `catch` (\(e::SomeException) -> return $ Just e)
  case r of
      Nothing -> return()
      juste -> finish juste


msend (Connection _(Just (Node2Web sconn)) _ _ blocked _  _) r=liftIO $
  withMVar blocked $ const $ WS.sendTextData sconn $ BS.pack (show r)


#else

msend (Connection _ (Just (Web2Node sconn)) _ _ blocked _  _) r= liftIO $
  withMVar blocked $ const $ JavaScript.Web.WebSocket.send  (JS.pack $ show r) sconn   -- !!> "MSEND SOCKET"



#endif

msend (Connection _ Nothing _ _  _ _ _ ) _= error "msend out of wormhole context"

mread :: Loggable a => Connection -> TransIO (StreamData a)


#ifdef ghcjs_HOST_OS


mread (Connection _ (Just (Web2Node sconn)) _ _ _ _  _)=  wsRead sconn



wsRead :: Loggable a => WebSocket  -> TransIO  a
wsRead ws= do
  dat <- react (hsonmessage ws) (return ())
  case JM.getData dat of
    JM.StringData str  ->  return (read' $ JS.unpack str)
--                                    !> ("Browser webSocket read", str)  !> "<------<----<----<------"
    JM.BlobData   blob -> error " blob"
    JM.ArrayBufferData arrBuffer -> error "arrBuffer"


{-
wsRead1 :: Loggable a => WebSocket  -> TransIO (StreamData a)
wsRead1 ws= do
  reactStream (makeCallback MessageEvent) (js_onmessage ws) CB.releaseCallback (return ())
  where
  reactStream createHandler setHandler removeHandler iob= Transient $ do
        cont    <- getCont
        hand <- liftIO . createHandler $ \dat ->do
              runStateT (setData dat >> runCont cont) cont
              iob
        mEvData <- getSessionData
        case mEvData of
          Nothing -> liftIO $ do
                        setHandler hand
                        return Nothing

          Just dat -> do
             liftIO $ print "callback called 2*****"
             delSessionData dat
             dat' <- case getData dat of
                 StringData str  -> liftIO $ putStrLn "WSREAD RECEIVED " >> print str >> return (read $ JS.unpack str)
                 BlobData   blob -> error " blob"
                 ArrayBufferData arrBuffer -> error "arrBuffer"
             liftIO $ case dat' of
               SDone -> do
                        removeHandler $ Callback hand
                        empty
               sl@(SLast x) -> do
                        removeHandler $ Callback hand     -- !!> "REMOVEHANDLER"
                        return $ Just sl
               SError e -> do
                        removeHandler $ Callback hand
                        print e
                        empty
               more -> return (Just  more)
-}


wsOpen :: JS.JSString -> TransIO WebSocket
wsOpen url= do
   ws <-  liftIO $ js_createDefault url      --  !> ("wsopen",url)
   react (hsopen ws) (return ())             -- !!> "react"
   return ws                                 -- !!> "AFTER ReACT"

foreign import javascript safe
    "window.location.hostname"
   js_hostname ::    JSVal

foreign import javascript safe
   "(function(){var res=window.location.href.split(':')[2];if (res === undefined){return 80} else return res.split('/')[0];})()"
   js_port ::   JSVal

foreign import javascript safe
    "$1.onmessage =$2;"
   js_onmessage :: WebSocket  -> JSVal  -> IO ()


getWebServerNode :: TransIO Node
getWebServerNode = liftIO $

   createNode   <$> (fromJSValUnchecked js_hostname)
                <*> (fromIntegral <$> (fromJSValUnchecked js_port :: IO Int))




hsonmessage ::WebSocket -> (MessageEvent ->IO()) -> IO ()
hsonmessage ws hscb= do
  cb <- makeCallback MessageEvent hscb
  js_onmessage ws cb

foreign import javascript safe
             "$1.onopen =$2;"
   js_open :: WebSocket  -> JSVal  -> IO ()

newtype OpenEvent = OpenEvent JSVal deriving Typeable
hsopen ::  WebSocket -> (OpenEvent ->IO()) -> IO ()
hsopen ws hscb= do
   cb <- makeCallback OpenEvent hscb
   js_open ws cb

makeCallback :: (JSVal -> a) ->  (a -> IO ()) -> IO JSVal

makeCallback f g = do
   Callback cb <- CB.syncCallback1 CB.ContinueAsync (g . f)
   return cb


foreign import javascript safe
   "new WebSocket($1)" js_createDefault :: JS.JSString -> IO WebSocket


#else
mread (Connection _(Just (Node2Node _ _ _)) _ _ blocked _ _ ) =  parallelReadHandler -- !> "mread"


mread (Connection node  (Just (Node2Web sconn )) bufSize events blocked _ _)=
        parallel $ do
            s <- WS.receiveData sconn
            return . read' $  BS.unpack s
--                 !>  ("WS MREAD RECEIVED ---->", s)

--           `catch`(\(e ::SomeException) -> return $ SError e)

getWebServerNode :: TransIO Node
getWebServerNode = getMyNode
#endif

read' s= case readsPrec' 0 s of
       [(x,"")] -> x
       _  -> error $ "reading " ++ s

-- | A wormhole opens a connection with another node anywhere in a computation.
-- `teleport` uses this connection to translate the computation back and forth between the two nodes connected
wormhole :: Loggable a => Node -> Cloud a -> Cloud a
wormhole node (Cloud comp) = local $ Transient $ do

   moldconn <- getData :: StateIO (Maybe Connection)
   mclosure <- getData :: StateIO (Maybe Closure)

   logdata@(Log rec log fulLog) <- getData `onNothing` return (Log False [][])

   mynode <- runTrans getMyNode     -- debug
   if not rec                                    --  !> ("wormhole recovery", rec)
            then runTrans $ (do

                    conn <-  mconnect node       --  !> (mynode,"connecting node ",  node)
                    setData  conn{calling= True}
#ifdef ghcjs_HOST_OS
                    addPrefix    -- for the DOM identifiers
#endif
                    comp )
                  <*** do when (isJust moldconn) . setData $ fromJust moldconn
                          when (isJust mclosure).  setData $ fromJust mclosure

                    -- <** is not enough
            else do
             let conn = fromMaybe (error "wormhole: no connection in remote node") moldconn
--             conn <- getData `onNothing` error "wormhole: no connection in remote node"

             setData $ conn{calling= False}

             runTrans $  comp
                     <*** do
--                          when (null log) $ setData WasRemote    !> "NULLLOG"
                          when (isJust mclosure) . setData $ fromJust mclosure




#ifndef ghcjs_HOST_OS
type JSString= String
pack= id



#endif

newtype Prefix= Prefix JSString deriving(Read,Show)

addPrefix= Transient $ do
   r <- liftIO $ replicateM  5 (randomRIO ('a','z'))
   setData $ Prefix $ pack  r
   return $ Just ()


-- | translates computations back and forth
-- reusing a connection opened by `wormhole`
teleport ::   Cloud ()
teleport =  do
  local $ Transient $ do

     cont <- get

     -- send log with closure at head
     Log rec log fulLog <- getData `onNothing` return (Log False [][])
     if not rec   -- !> ("teleport rec,loc fulLog=",rec,log,fulLog)
                  -- if is not recovering in the remote node then it is active
      then  do
         conn@Connection{closures= closures,calling= calling} <- getData
             `onNothing` error "teleport: No connection defined: use wormhole"

         --read this Closure
         Closure closRemote  <- getData `onNothing` return (Closure 0 )

         --set his own closure in his Node data

         let closLocal = sum $ map (\x-> case x of Wait-> 1000; _ -> 1) fulLog
--         closLocal  <-   liftIO $ randomRIO (0,1000000)

         liftIO $ modifyMVar_ closures $ \map -> return $ M.insert closLocal (fulLog,cont) map

         let tosend= reverse $ if closRemote==0 then fulLog else  log -- drop offset  $ reverse fulLog  !> ("fulLog", fulLog)

         runTrans $ msend conn $ SMore (closRemote,closLocal, tosend )
--                                                  !> ("teleport sending", tosend )
--                                                  !> "--------->------>---------->"

         setData $ if (not calling) then  WasRemote else WasParallel

         return Nothing

      else do

         delData WasRemote                -- !> "deleting wasremote in teleport"
                                          -- it is recovering, therefore it will be the
                                          -- local, not remote

         return (Just ())                           --  !> "TELEPORT remote"




-- | copy a session data variable from the local to the remote node.
-- If there is none set in the local node, The parameter is the default value.
-- In this case, the default value is also set in the local node.
copyData def = do
  r <- local getSData <|> return def
  onAll $ setData r
  return r

-- | `callTo` can stream data but can not inform the receiving process about the finalization. This call
-- does it.
streamFrom :: Loggable a => Node -> Cloud (StreamData a) -> Cloud  (StreamData a)
streamFrom = callTo


--release (Node h p rpool _) hand= liftIO $ do
----    print "RELEASED"
--    atomicModifyIORef rpool $  \ hs -> (hand:hs,())
--      -- !!> "RELEASED"

mclose :: Connection -> IO ()

#ifndef ghcjs_HOST_OS

mclose (Connection _
   (Just (Node2Node _  sock _ )) _ _ _ _ _)= NS.sClose sock

mclose (Connection node
   (Just (Node2Web sconn ))
   bufSize events blocked _  _)=
    WS.sendClose sconn ("closemsg" :: BS.ByteString)

#else

mclose (Connection _ (Just (Web2Node sconn)) _ _ blocked _ _)=
    JavaScript.Web.WebSocket.close Nothing Nothing sconn

#endif

mconnect :: Node -> TransIO  Connection
mconnect  node@(Node _ _ _ _ )=  do
  nodes <- getNodes

  let fnode =  filter (==node) nodes
  case fnode of
   [] -> addNodes [node] >> mconnect node
   [Node host port  pool _] -> do


    plist <- liftIO $ readMVar pool
    case plist  of
      handle:_ -> do
                  delData $ Closure undefined
                  return  handle                        -- !>   "REUSED!"

      _ -> do
--        liftIO $ putStr "*****CONNECTING NODE: " >> print node
        my <- getMyNode
--        liftIO  $ putStr "OPENING CONNECTION WITH :" >> print port
        Connection{comEvent= ev} <- getSData <|> error "connect: listen not set for this node"
#ifndef ghcjs_HOST_OS

        conn <- liftIO $ do
          let size=8192
          sock <-  connectTo' size  host $ PortNumber $ fromIntegral port  --   !> ("CONNECTING ",node)

          let conn= (defConnection 8100){myNode=my,comEvent= ev,connData= Just $ Node2Node u  sock (error $ "addr: outgoing connection")}

          SBS.send sock "CLOS a b\n\n"   -- !> "sending CLOS"


          return conn

#else
        conn <- do

          ws <- connectToWS host $ PortNumber $ fromIntegral port
          let conn= (defConnection 8100){comEvent= ev,connData= Just $ Web2Node ws}

          return conn    -- !>  ("websocker CONNECION")
#endif
        liftIO $ modifyMVar_ pool $  \plist -> return $ conn:plist

        putMailbox "connections" (conn,node)

        delData $ Closure undefined




        return  conn

  where u= undefined

-- mconnect _ = empty



#ifndef ghcjs_HOST_OS
connectTo' bufSize hostname (PortNumber port) =  do
        proto <- BSD.getProtocolNumber "tcp"
        bracketOnError
            (NS.socket NS.AF_INET NS.Stream proto)
            (sClose)  -- only done if there's an error
            (\sock -> do
              NS.setSocketOption sock NS.RecvBuffer bufSize
              NS.setSocketOption sock NS.SendBuffer bufSize
              he <- BSD.getHostByName hostname
              NS.connect sock (NS.SockAddrInet port (BSD.hostAddress he))
              return sock
--              NS.socketToHandle sock ReadWriteMode
            )
#else
connectToWS  h (PortNumber p) =
   wsOpen $ JS.pack $ "ws://"++ h++ ":"++ show p
#endif
#ifndef ghcjs_HOST_OS
-- | A connectionless version of callTo for long running remote calls
callTo' :: (Show a, Read a,Typeable a) => Node -> Cloud a -> Cloud a
callTo' node remoteProc=  do
    mynode <-  local getMyNode
    beamTo node
    r <-  remoteProc
    beamTo mynode
    return r
#endif

type Blocked= MVar ()
type BuffSize = Int
data ConnectionData=
#ifndef ghcjs_HOST_OS
                   Node2Node{port :: PortID
                            ,socket ::Socket
                            ,remoteNode :: NS.SockAddr
                             }

                   | Node2Web{webSocket :: WS.Connection}
#else

                   Web2Node{webSocket :: WebSocket}
#endif



data Connection= Connection{myNode     :: Node
                           ,connData   :: Maybe(ConnectionData)
                           ,bufferSize :: BuffSize
                           ,comEvent   :: IORef (M.Map T.Text (EVar Dynamic))
                           ,blocked    :: Blocked
                           ,calling    :: Bool
                           ,closures   :: MVar (M.Map Int ([LogElem], EventF))}

                  deriving Typeable


-- Mailboxes are node-wide, for all processes that share the same connection data, that is, are under the
-- same `listen`  or `connect`
-- while EVars are only visible by the process that initialized  it and his children.
-- Internally, the mailbox is in a well known EVar stored by `listen` in the `Connection` state.
newMailbox :: T.Text -> TransIO ()
newMailbox name= do
--   return ()  -- !> "newMailBox"
   Connection{comEvent= mv} <- getData `onNothing` errorMailBox
--   onFinish . const $ liftIO $ do
----         return ()                        --  !> "NEWMAILBOX finish"
--         mailboxes <- readIORef mv
--         let me = M.lookup name  mailboxes
--         case me of
--             Nothing -> empty             -- !> "EMPTY"
--             Just (EVar id rn ref1) -> do
--                 n <- atomically $ do
--                      (n,n') <- readTVar rn
--                      writeTVar rn (n-1,n'-1)        -- !> ("decreased rn",n-1)
--                      return $ n-1
--                 when (n==0) $ atomicModifyIORef mv $ \mboxes -> (M.delete name mboxes,())
   ev <- newEVar
   liftIO $ atomicModifyIORef mv $ \mailboxes ->   (M.insert name ev mailboxes,())


-- | write to the mailbox
putMailbox :: Typeable a => T.Text -> a -> TransIO ()
putMailbox name dat= do --  sendNodeEvent (name, Just dat)
   Connection{comEvent= mv} <- getData `onNothing` errorMailBox
   mbs <- liftIO $ readIORef mv
   let mev =  M.lookup name mbs
   case mev of
     Nothing ->newMailbox name >> putMailbox name dat
     Just ev -> writeEVar ev $ toDyn dat

errorMailBox= error "MailBox: No connection open.Use wormhole"

-- | get messages from the mailbox that matches with the type expected.
-- The order of reading is defined by `readTChan`
-- This is reactive. it means that each new message trigger the execution of the continuation
-- each message wake up all the `getMailbox` computations waiting for it.

getMailbox name= do
   Connection{comEvent= mv} <- getData `onNothing` errorMailBox
   mbs <- liftIO $ readIORef mv
   let mev =  M.lookup name mbs
   case mev of
     Nothing ->newMailbox name >> getMailbox name
     Just ev ->do
          d <- readEVar ev
          case fromDynamic d of  -- !> "getMailBox" of
             Nothing -> empty
             Just x -> return x

--getMailBox :: Typeable a => String -> TransIO a
--getMailBox name= Transient $ do
--     return () !> "getMailBox"
--     Connection{comEvent=(EVar id rn ref1)} <- getData `onNothing` error "getMailBox: accessing network events out of listen"
--     runTrans  $ do
--         liftIO $ atomically $ readTVar rn >>= \(n,n') -> writeTVar rn (n +1,n'+1)
--         r <- parallel $  do
--                  n' <- atomically $ do
--                              (n,n') <- readTVar rn
--                              writeTVar rn $ if n' > 1 then   (n,n'-1) else (n, n)
--                              return n'
--                  return () !> ("rear rn",name,n')
--                  atomically $ do
--                    d <- if n'> 1 then  peekTChan ref1 else readTChan ref1
--
--
--                    return () !> ("readmailbox",name,n')
--                    case d of
--                        SDone -> return SDone
--                        SMore x ->  case fromDynamic x of
--                              Nothing -> return $ SMore Nothing
--                              Just (nam, Nothing) -> do
--                                readTVar rn >>= \(n,n') -> writeTVar rn $ (n,n' -1)
--                                return SDone
--                              Just (nam,Just dat) ->
--                                return $ SMore $  if nam /= name then  Nothing else Just dat
----                        SLast x -> case fromDynamic x of
----                              Nothing -> retry
----                              Just (nam, Nothing) -> return SDone
----                              Just (nam,Just dat) ->
----                                if nam /= name !>(nam,name) then retry else return $ SLast dat
--                        SError e -> return $ SError e
--
--         case r of
--            SDone -> empty
--            SMore Nothing -> empty
--            SMore (Just x) -> return x
--            SLast (Just x) -> return x
--            SError e -> error $ show e

-- | delete all subscriptions for that mailbox expecting this kind of data.
cleanMailbox :: Typeable a =>  T.Text ->  a -> TransIO ()
cleanMailbox name witness= do
   Connection{comEvent= mv} <- getData `onNothing` error "getMailBox: accessing network events out of listen"
   mbs <- liftIO $readIORef mv
   let mev =  M.lookup name mbs
   case mev of
     Nothing -> return()
     Just ev -> do cleanEVar ev
                   liftIO $ atomicModifyIORef mv $ \mbs -> (M.delete name mbs,())





defConnection :: Int -> Connection

-- #ifndef ghcjs_HOST_OS
defConnection size=
  Connection (createNode "program" 0) Nothing  size
                 (error "defConnection: accessing network events out of listen")
                 (unsafePerformIO $ newMVar ())
                 False (unsafePerformIO $ newMVar M.empty)



#ifndef ghcjs_HOST_OS
setBuffSize :: Int -> TransIO ()
setBuffSize size= Transient $ do
   conn<- getData `onNothing` return (defConnection 8192)
   setData $ conn{bufferSize= size}
   return $ Just ()

getBuffSize=
  (do getSData >>= return . bufferSize) <|> return  8192




listen ::  Node ->  Cloud ()
listen  (node@(Node _   port _ _ )) = onAll $ do
   addThreads 1

   setData $ Log False [] []

   conn' <- getSData <|> return (defConnection 8192)
   ev <- liftIO $ newIORef M.empty
   let conn= conn'{myNode=node, comEvent=ev}

   setData conn
   addNodes [node]
   mlog <- listenNew (fromIntegral port) conn <|> listenResponses


   execLog mlog



listenNew port conn= do --  node bufSize events blocked port= do


   sock <- liftIO . listenOn  $ PortNumber port

   let bufSize= bufferSize conn
   liftIO $ do NS.setSocketOption sock NS.RecvBuffer bufSize
               NS.setSocketOption sock NS.SendBuffer bufSize


   (sock,addr) <- waitEvents $ NS.accept sock         -- !!> "BEFORE ACCEPT"


--   case addr of
--     NS.SockAddrInet port host -> liftIO $ print("connection from", port, host)
--     NS.SockAddrInet6  a b c d -> liftIO $ print("connection from", a, b,c,d)


   initFinish
   onFinish $ const $ do
             return()                                 -- !> "onFinish closures receivedd with LISTEN"
             let Connection{closures=closures}= conn  -- !> "listenNew closures empty"
             liftIO $ modifyMVar_ closures $ const $ return M.empty


   (method,uri, headers) <- receiveHTTPHead sock

   case method of
     "CLOS" ->
          do
--           return ()                          !> "CLOS detected"
           setData $ conn{connData=Just (Node2Node (PortNumber port) sock addr)}

--           killOnFinish $ parallel $ readHandler          -- !> "read Listen"  -- :: TransIO (StreamData [LogElem])
           parallelReadHandler

     _ -> do
           sconn <- httpMode (method, uri, headers) sock -- stay serving pages until a websocket request is received

           setData conn{connData= Just (Node2Web sconn )}


           killOnFinish $ parallel $ do
               msg <- WS.receiveData sconn             -- WebSockets
               return . read $ BC.unpack msg
--                                                      !> ("Server WebSocket msg read",msg)  !> "<-------<---------<--------------"







--instance Read PortNumber where
--  readsPrec n str= let [(n,s)]=   readsPrec n str in [(fromIntegral n,s)]


--deriving instance Read PortID
--deriving instance Typeable PortID
#endif


listenResponses= do

      (conn, node) <- getMailbox "connections"
      setData conn
#ifndef ghcjs_HOST_OS
      case conn of
             Connection _(Just (Node2Node _ sock _)) _ _ _ _ _ -> do
                 input <- liftIO $ SBSL.getContents sock
                 setData $ (ParseContext (error "SHOULD NOT READ 2") input :: ParseContext BS.ByteString)
#endif
      initFinish
      onFinish $ const $ do
             liftIO $ putStrLn "removing node: ">> print node
             nodes <- getNodes
             setNodes $ nodes \\ [node]


             let Connection{closures=closures}= conn
             liftIO $ modifyMVar_ closures $ const $ return M.empty


      killOnFinish $ mread conn


type IdClosure= Int

data Closure= Closure IdClosure

execLog mlog = Transient $ do
       case  mlog    of                       -- !> ("RECEIVED ", mlog ) of
             SError e -> do
                 runTrans $ finish $ Just e
                 return Nothing

             SDone   -> runTrans(finish Nothing) >> return Nothing
             SMore r -> process r False
             SLast r -> process r True

   where
   process (closl,closr,log) deleteClosure= do
              Connection {closures=closures} <- getData `onNothing` error "Listen: myNode not set"

              if closl== 0 then do
                   setData $ Log True log  $ reverse log
                   setData $ Closure closr
                   return $ Just ()                        --  !> "executing top level closure"

               else do
                 mcont <- liftIO $ modifyMVar closures  $ \map ->
                                               return (if deleteClosure then
                                                           M.delete closl map
                                                         else map, M.lookup closl map)
                                                           -- !> ("closures=", M.size map)
                 case mcont of
                   Nothing -> error ("received non existent closure: " ++  show closl)
                   -- execute the closure
                   Just (fulLog,cont) -> liftIO $ runStateT (do
                                     let nlog= reverse log ++  fulLog -- dropWhile (\x -> case x of
                                                                        --  Exec -> True
                                                                        --  _ -> False) fulLog
                                     setData $ Log True  log  nlog

                                     setData $ Closure closr
                                     runCont cont) cont   -- !> ("executing closure",closr)


                 return Nothing                     -- !> "FINISH CLOSURE"

#ifdef ghcjs_HOST_OS
listen node = onAll $ do
        addNodes [node]
        events <- liftIO $ newIORef M.empty
        let conn=  (defConnection 8192){myNode=node,comEvent=events}
        setData conn
        r <- listenResponses
        execLog r
#endif

type Pool= [Connection]
type Package= String
type Program= String
type Service= (Package, Program)


-- * Level 2: connections node lists and operations with the node list


{-# NOINLINE emptyPool #-}
emptyPool :: MonadIO m => m (MVar Pool)
emptyPool= liftIO $ newMVar  []


createNodeServ :: HostName -> Integer -> [Service] -> Node
createNodeServ h p svs= Node h ( fromInteger p) (unsafePerformIO emptyPool) svs




createNode :: HostName -> Integer -> Node
createNode h p= createNodeServ h p []

createWebNode :: Node
createWebNode= Node "webnode" ( fromInteger 0) (unsafePerformIO emptyPool)
                      [("webnode","")]


instance Eq Node where
    Node h p _ _ ==Node h' p' _ _= h==h' && p==p'


instance Show Node where
    show (Node h p _ servs )= show (h,p, servs)



instance Read Node where

    readsPrec _ s=
          let r= readsPrec' 0 s
          in case r of
            [] -> []
            [((h,p,ss),s')] ->  [(Node h p empty
              ( ss),s')]
          where
          empty= unsafePerformIO  emptyPool


-- #ifndef ghcjs_HOST_OS
--instance Read NS.SockAddr where
--    readsPrec _ ('[':s)=
--       let (s',r1)= span (/=']')  s
--           [(port,r)]= readsPrec 0 $ tail $ tail r1
--       in [(NS.SockAddrInet6 port 0 (IP.toHostAddress6 $  read s') 0, r)]
--    readsPrec _ s=
--       let (s',r1)= span(/= ':') s
--           [(port,r)]= readsPrec 0 $ tail r1
--       in [(NS.SockAddrInet port (IP.toHostAddress $  read s'),r)]
-- #endif

--newtype MyNode= MyNode Node deriving(Read,Show,Typeable)


--instance Indexable MyNode where key (MyNode Node{nodePort=port}) =  "MyNode "++ show port
--
--instance Serializable MyNode where
--    serialize= BS.pack . show
--    deserialize= read . BS.unpack

nodeList :: TVar  [Node]
nodeList = unsafePerformIO $ newTVarIO []

deriving instance Ord PortID

--myNode :: Int -> DBRef  MyNode
--myNode= getDBRef $ key $ MyNode undefined






errorMyNode f= error $ f ++ ": Node not set. initialize it with connect, listen, initNode..."


getMyNode :: TransIO Node -- (MonadIO m, MonadState EventF m) => m Node
getMyNode =  do
    Connection{myNode= node}  <- getSData   <|> errorMyNode "getMyNode" :: TransIO Connection
    return node



-- | return the list of nodes connected to the local node
getNodes :: MonadIO m => m [Node]
getNodes  = liftIO $ atomically $ readTVar  nodeList

-- | add nodes to the list of nodes
addNodes :: [Node] ->  TransIO () -- (MonadIO m, MonadState EventF m) => [Node] -> m ()
addNodes   nodes=  do
  my <- getMyNode    -- mynode must be first
  liftIO . atomically $ do
    prevnodes <- readTVar nodeList

    writeTVar nodeList $ my: (( nub $ nodes ++ prevnodes) \\[my])

-- | set the list of nodes
setNodes nodes= liftIO $ atomically $ writeTVar nodeList $  nodes


shuffleNodes :: MonadIO m => m [Node]
shuffleNodes=  liftIO . atomically $ do
  nodes <- readTVar nodeList
  let nodes'= tail nodes ++ [head nodes]
  writeTVar nodeList nodes'
  return nodes'

--getInterfaces :: TransIO TransIO HostName
--getInterfaces= do
--   host <- logged $ do
--      ifs <- liftIO $ getNetworkInterfaces
--      liftIO $ mapM_ (\(i,n) ->putStrLn $ show i ++ "\t"++  show (ipv4 n) ++ "\t"++name n)$ zip [0..] ifs
--      liftIO $ putStrLn "Select one: "
--      ind <-  input ( < length ifs)
--      return $ show . ipv4 $ ifs !! ind


-- | execute a Transient action in each of the nodes connected.
--
-- The response of each node is received by the invoking node and processed by the rest of the procedure.
-- By default, each response is processed in a new thread. To restrict the number of threads
-- use the thread control primitives.
--
-- this snippet receive a message from each of the simulated nodes:
--
-- > main = keep $ do
-- >    let nodes= map createLocalNode [2000..2005]
-- >    addNodes nodes
-- >    (foldl (<|>) empty $ map listen nodes) <|> return ()
-- >
-- >    r <- clustered $ do
-- >               Connection (Just(PortNumber port, _, _, _)) _ <- getSData
-- >               return $ "hi from " ++ show port++ "\n"
-- >    liftIO $ putStrLn r
-- >    where
-- >    createLocalNode n= createNode "localhost" (PortNumber n)
clustered :: Loggable a  => Cloud a -> Cloud a
clustered proc= callNodes (<|>) empty proc


-- A variant of `clustered` that wait for all the responses and `mappend` them
mclustered :: (Monoid a, Loggable a)  => Cloud a -> Cloud a
mclustered proc= callNodes (<>) mempty proc


callNodes op init proc= loggedc $ do
    nodes <-  local getNodes
    let nodes' = filter (not . isWebNode) nodes
    foldr op init $ map (\node -> runAt node proc) nodes'  -- !> ("mclustered",nodes')
    where
    isWebNode Node {nodeServices=srvs}
         | ("webnode","") `elem` srvs = True
         | otherwise = False

-- | set the rest of the computation as a new node (first parameter) and connect it
-- to an existing node (second parameter). then it uses `connect`` to synchronize the list of nodes

connect ::  Node ->  Node -> Cloud ()

#ifndef ghcjs_HOST_OS
connect  node  remotenode =   do
    listen node <|> return () -- listen1 node remotenode
    connect' remotenode

-- | synchronize the list of nodes with a remote node and all the nodes connected to it
-- the final effect is that all the nodes reachable share the same list of nodes
connect'  remotenode= do
    nodes <- local $ getNodes
    local $ liftIO $ putStrLn $ "connecting to: "++ show remotenode

    newNodes <- runAt remotenode $  do
           local $ do
              conn@(Connection _(Just (Node2Node _ _ _)) _ _ _ _ _) <- getSData <|>
               error ("connect': need to be connected to a node: use wormhole/connect/listen")
              let nodeConnecting= head nodes
              liftIO $ modifyMVar_ (connection nodeConnecting) $ const $ return [conn]


              onFinish . const $ do
                   liftIO $ putStrLn "removing node: ">> print nodeConnecting
                   nodes <- getNodes
                   setNodes $ nodes \\ [nodeConnecting]

--              renameMyNode remotenode
              return nodes

           mclustered . local . addNodes $ nodes



           local $ do
               allNodes <- getNodes
               liftIO $ putStrLn "Known nodes: " >> print  allNodes
               return allNodes

    let n = newNodes \\ nodes
    when (not $ null n) $ mclustered $ local $ do
        liftIO $ putStrLn  "New  nodes: " >> print n
        addNodes n   -- add the new discovered nodes

    local $ do
        addNodes  nodes
        nodes <- getNodes
        liftIO $ putStrLn  "Known nodes: " >> print nodes

--    where
--    renameMyNode new=  do
--       con <- getSData  <|> error "connection not set. please initialize it"
--       mynode <- liftIO $ readIORef $ myNode con
--       liftIO $ writeIORef (myNode con) new
--
--       nodes <- getNodes      !> ("renaming", mynode, new)
--       setNodes $ new:(nodes \\[mynode])
#else
connect _ _= empty
connect' _ = empty
#endif


--------------------------------------------



#ifndef ghcjs_HOST_OS

readFrom :: Socket         -- ^ Connected socket
            -> IO BS.ByteString  -- ^ Data received
readFrom sock =  loop where
-- do
--    s <- SBS.recv sock 40980
--    BLC.Chunk <$> return s <*> return  BLC.Empty
  loop = unsafeInterleaveIO $ do
    s <- SBS.recv sock 4098
    if BC.null s
      then  return BLC.Empty  -- !>  "EMPTY SOCK"
      else BLC.Chunk s `liftM` loop

toStrict= B.concat . BS.toChunks

httpMode (method,uri, headers) conn  = do
--   return ()                        !> ("HTTP request",method,uri, headers)
   if isWebSocketsReq headers
     then  liftIO $ do

         stream <- makeStream                  -- !!> "WEBSOCKETS request"
            (do
                bs <-  SBS.recv conn 4096 -- readFrom conn
                return $ if BC.null bs then Nothing else Just  bs)
            (\mbBl -> case mbBl of
                Nothing -> return ()
                Just bl ->  SBS.sendMany conn (BL.toChunks bl) >> return())   -- !!> show ("SOCK RESP",bl)

         let
             pc = WS.PendingConnection
                { WS.pendingOptions     = WS.defaultConnectionOptions
                , WS.pendingRequest     =  NWS.RequestHead  uri  headers False -- RequestHead (BC.pack $ show uri)
                                                      -- (map parseh headers) False
                , WS.pendingOnAccept    = \_ -> return ()
                , WS.pendingStream      = stream
                }


         sconn    <- WS.acceptRequest pc               -- !!> "accept request"
         WS.forkPingThread sconn 30
         return sconn



     else do
          let uri'= BC.tail $ uriPath uri               -- !> "HTTP REQUEST"
              file= if BC.null uri' then "index.html" else uri'

          content <- liftIO $  BL.readFile ( "./static/out.jsexe/"++ BC.unpack file)
                            `catch` (\(e:: SomeException) ->
                                return  "Not found file: index.html<br/> please compile with ghcjs<br/> ghcjs program.hs -o static/out")

          n <- liftIO $ SBS.sendMany conn   $  ["HTTP/1.0 200 OK\nContent-Type: text/html\nConnection: close\nContent-Length: " <> BC.pack (show $ BL.length content) <>"\n\n"] ++
                                  (BL.toChunks content )


          empty

      where
      uriPath = BC.dropWhile (/= '/')



isWebSocketsReq = not  . null
    . filter ( (== mk "Sec-WebSocket-Key") . fst)



data ParseContext a = IsString a => ParseContext (IO  a) a deriving Typeable


--giveData s= do
--    r <- readFrom s     -- 80000
--    return r               -- !> ( "giveData ", r)


receiveHTTPHead s = do
  input <-  liftIO $ SBSL.getContents s
  setData $ (ParseContext (error "request truncated. Maybe the browser program does not match the server one. \nRecompile the program again with ghcjs <prog>  -o static/out") input
             ::ParseContext BS.ByteString)
  (method, uri, vers) <- (,,) <$> getMethod <*> getUri <*> getVers
  headers <- many $ (,) <$> (mk <$> getParam) <*> getParamValue    -- !>  (method, uri, vers)
  return (method, toStrict uri, headers)                                    -- !>  (method, uri, headers)

  where

  getMethod= getString
  getUri= getString
  getVers= getString
  getParam= do
      dropSpaces
      r <- tTakeWhile (\x -> x /= ':' && not (endline x))
      if BS.null r || r=="\r"  then  empty  else  dropChar >> return(toStrict r)

  getParamValue= toStrict <$> ( dropSpaces >> tTakeWhile  (\x -> not (endline x)))




dropSpaces= parse $ \str ->((),BS.dropWhile isSpace str)

dropChar= parse  $ \r -> ((), BS.tail r)

endline c= c== '\n' || c =='\r'

--tGetLine= tTakeWhile . not . endline

readStream :: Read a =>  BS.ByteString -> [StreamData a]
readStream s=  readStream1 $ BS.unpack s
 where
 readStream1 s=
   let [(x,r)] = reads s
   in x : readStream1 r



parallelReadHandler :: Loggable a => TransIO (StreamData a)
parallelReadHandler= do
      ParseContext readit str <- getSData <|> error "parallelReadHandler: ParseContext not found"
                                        :: (TransIO (ParseContext BS.ByteString))
      r <-  killOnFinish $ choose $ readStream str


      return r
--                !> ("read",r)
--                !> "<-------<----------<--------<----------"


getString= do
    dropSpaces
    tTakeWhile (not . isSpace)

tTakeWhile :: (Char -> Bool) -> TransIO BS.ByteString
tTakeWhile cond= parse (BS.span cond)


parse :: Monoid b => (BS.ByteString -> (b, BS.ByteString)) -> TransIO b
parse split= do
    ParseContext readit str <- getSData
                                <|> error "parse: ParseContext not found"
                                :: TransIO (ParseContext BS.ByteString)
    if  str == mempty
     then do
          str3 <- liftIO  readit

          setData $ ParseContext readit str3                     -- !> str3

          if str3== mempty   then empty   else  parse split
     else if BS.take 2 str =="\n\n"  then do setData $ ParseContext readit  (BS.drop 2 str) ; empty
     else if BS.take 4 str== "\r\n\r\n" then do setData $ ParseContext readit  (BS.drop 4 str) ; empty
     else do

          let (ret,str3) = split str
          setData $ ParseContext readit str3

          if str3== mempty
            then   return ret  <> (parse split <|> return mempty)

            else   return ret




#endif



#ifdef ghcjs_HOST_OS
isBrowserInstance= True

#else
isBrowserInstance= False
#endif