module Control.Distributed.Session.STChannel (
Message(..),
STSendPort(..),
STReceivePort(..),
STChan,
STChanBi,
UTChan,
newSTChan,
newSTChanBi,
newUTChan,
toSTChan,
toSTChanBi,
sendProxy,
recvProxy,
sendSTChan,
recvSTChan,
STSplit(..),
STRec(..),
STChannelT(..),
sendSTChanM,
recvSTChanM,
sel1ChanM,
sel2ChanM,
off1ChanM,
off2ChanM,
recChanM,
wkChanM,
varChanM,
epsChanM
) where
import qualified Control.SessionTypes.Indexed as IM
import Control.SessionTypes.Types
import qualified Control.Distributed.Process as P
import Control.Distributed.Process.Serializable
import qualified Data.ByteString.Lazy as BSL
import Data.Binary
import Data.Typeable
import Data.Kind (Type)
import Unsafe.Coerce
data Message = forall a. Serializable a => Message a deriving Typeable
instance Binary Message where
put (Message msg) = put $ BSL.toChunks (encode msg)
get = (Message . BSL.fromChunks) <$> get
data STSendPort (l :: Cap Type) = STSendPort (P.SendPort Message)
data STReceivePort (l :: Cap Type) = STReceivePort (P.ReceivePort Message)
type STChan s = (STSendPort (RemoveRecv s), STReceivePort (RemoveSend s))
type STChanBi s r = (STSendPort (RemoveRecv s), STReceivePort (RemoveSend r))
type UTChan = (P.SendPort Message, P.ReceivePort Message)
newSTChan :: Proxy s -> P.Process (STChan s)
newSTChan _ = do
(s, r) <- P.newChan
return $ (STSendPort s, STReceivePort r)
newSTChanBi :: Proxy s -> Proxy r -> P.Process (STChanBi s r)
newSTChanBi _ _ = do
(s, r) <- P.newChan
return $ (STSendPort s, STReceivePort r)
newUTChan :: P.Process UTChan
newUTChan = P.newChan
toSTChan :: UTChan -> Proxy s -> STChan s
toSTChan (sport, rport) _ = (STSendPort sport, STReceivePort rport)
toSTChanBi :: UTChan -> Proxy s -> Proxy r -> STChanBi s r
toSTChanBi (sport, rport) _ _ = (STSendPort sport, STReceivePort rport)
sendProxy :: STSendPort s -> Proxy s
sendProxy _ = Proxy
recvProxy :: STReceivePort s -> Proxy s
recvProxy _ = Proxy
sendSTChan :: Serializable a => STSendPort ('Cap ctx (a :!> l)) -> a -> P.Process (STSendPort ('Cap ctx l))
sendSTChan (STSendPort s) a = do
P.sendChan s $ Message a
return $ STSendPort s
recvSTChan :: Serializable a => STReceivePort ('Cap ctx (a :?> l)) -> P.Process (a, STReceivePort ('Cap ctx l))
recvSTChan (STReceivePort p) = do
(Message b) <- P.receiveChan p
return (unsafeCoerce b, STReceivePort p)
class STSplit (m :: Cap Type -> Type) where
sel1Chan :: m ('Cap ctx (Sel (s ': xs))) -> m ('Cap ctx s)
sel2Chan :: m ('Cap ctx (Sel (s ': t ': xs))) -> m ('Cap ctx (Sel (t ': xs)))
off1Chan :: m ('Cap ctx (Off (s ': xs))) -> m ('Cap ctx s)
off2Chan :: m ('Cap ctx (Off (s ': t ': xs))) -> m ('Cap ctx (Off (t ': xs)))
instance STSplit STSendPort where
sel1Chan (STSendPort s) = STSendPort s
sel2Chan (STSendPort s) = STSendPort s
off1Chan (STSendPort s) = STSendPort s
off2Chan (STSendPort s) = STSendPort s
instance STSplit STReceivePort where
sel1Chan (STReceivePort s) = STReceivePort s
sel2Chan (STReceivePort s) = STReceivePort s
off1Chan (STReceivePort s) = STReceivePort s
off2Chan (STReceivePort s) = STReceivePort s
class STRec (m :: Cap Type -> Type) where
recChan :: m ('Cap ctx (R s)) -> m ('Cap (s ': ctx) s)
wkChan :: m ('Cap (t ': ctx) (Wk s)) -> m ('Cap ctx s)
varChan :: m ('Cap (s ': ctx) V) -> m ('Cap (s ': ctx) s)
instance STRec STSendPort where
recChan (STSendPort s) = STSendPort s
wkChan (STSendPort s) = STSendPort s
varChan (STSendPort s) = STSendPort s
instance STRec STReceivePort where
recChan (STReceivePort s) = STReceivePort s
wkChan (STReceivePort s) = STReceivePort s
varChan (STReceivePort s) = STReceivePort s
data STChannelT m (p :: Prod Type) (q :: Prod Type) a = STChannelT {
runSTChannelT :: ( (STSendPort (Left p), STReceivePort (Right p)) ->
m (a, (STSendPort (Left q), STReceivePort (Right q))))
}
instance Monad m => IM.IxFunctor (STChannelT m) where
fmap f (STChannelT g) = STChannelT $ \c -> g c >>= \(a, c') -> return (f a, c')
instance Monad m => IM.IxApplicative (STChannelT m) where
pure = IM.return
(STChannelT f) <*> (STChannelT g) = STChannelT $ \c -> f c >>= \(f', c') -> g c' >>= \(a, c'') -> return (f' a, c'')
instance Monad m => IM.IxMonad (STChannelT m) where
return a = STChannelT $ \c -> return (a, c)
(STChannelT f) >>= g = STChannelT $ \c -> f c >>= \(a, c') -> runSTChannelT (g a) c'
instance Monad m => IM.IxMonadT STChannelT m where
lift m = STChannelT $ \c -> m >>= \a -> return (a, c)
sendSTChanM :: Serializable a => a -> STChannelT P.Process ('Cap ctx (a :!> l) :*: r) ('Cap ctx l :*: r) ()
sendSTChanM a = STChannelT $ \(sp, rp) -> sendSTChan sp a >>= \sp' -> return ((), (sp', rp))
recvSTChanM :: Serializable a => STChannelT P.Process (l :*: ('Cap ctx (a :?> r))) (l :*: 'Cap ctx r) a
recvSTChanM = STChannelT $ \(sp, rp) -> recvSTChan rp >>= \(a, rp') -> return (a, (sp, rp'))
sel1ChanM :: STChannelT P.Process ('Cap lctx (Sel (l ': ls)) :*: ('Cap rctx (Sel (r ': rs)))) ('Cap lctx l :*: 'Cap rctx r) ()
sel1ChanM = STChannelT $ \(sp, rp) -> return ((), (sel1Chan sp, sel1Chan rp))
sel2ChanM :: STChannelT P.Process ('Cap lctx (Sel (s1 ': t1 ': xs1)) :*: 'Cap rctx (Sel (s2 ': t2 ': xs2))) ('Cap lctx (Sel (t1 ': xs1)) :*: 'Cap rctx (Sel (t2 ': xs2))) ()
sel2ChanM = STChannelT $ \(sp, rp) -> return ((), (sel2Chan sp, sel2Chan rp))
off1ChanM :: STChannelT P.Process ('Cap lctx (Off (l ': ls)) :*: ('Cap rctx (Off (r ': rs)))) ('Cap lctx l :*: 'Cap rctx r) ()
off1ChanM = STChannelT $ \(sp, rp) -> return ((), (off1Chan sp, off1Chan rp))
off2ChanM :: STChannelT P.Process ('Cap lctx (Off (s1 ': t1 ': xs1)) :*: 'Cap rctx (Off (s2 ': t2 ': xs2))) ('Cap lctx (Off (t1 ': xs1)) :*: 'Cap rctx (Off (t2 ': xs2))) ()
off2ChanM = STChannelT $ \(sp, rp) -> return ((), (off2Chan sp, off2Chan rp))
recChanM :: STChannelT P.Process ('Cap sctx (R s) :*: 'Cap rctx (R r)) ('Cap (s ': sctx) s :*: 'Cap (r ': rctx) r) ()
recChanM = STChannelT $ \(sp, rp) -> return ((), (recChan sp, recChan rp))
wkChanM :: STChannelT P.Process ('Cap (t ': sctx) (Wk s) :*: 'Cap (k ': rctx) (Wk r)) ('Cap sctx s :*: 'Cap rctx r) ()
wkChanM = STChannelT $ \(sp, rp) -> return ((), (wkChan sp, wkChan rp))
varChanM :: STChannelT P.Process (('Cap (s ': sctx) V) :*: ('Cap (r ': rctx) V)) ('Cap (s ': sctx) s :*: 'Cap (r ': rctx) r) ()
varChanM = STChannelT $ \(sp, rp) -> return ((), (varChan sp, varChan rp))
epsChanM :: STChannelT P.Process ('Cap ctx Eps :*: 'Cap ctx Eps) ('Cap ctx Eps :*: 'Cap ctx Eps) ()
epsChanM = STChannelT $ \utchan -> return ((), utchan)