{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE CPP #-}
module Data.Conduit.Network
(
sourceSocket
, sinkSocket
, SN.AppData
, appSource
, appSink
, SN.appSockAddr
, SN.appLocalAddr
, SN.ServerSettings
, serverSettings
, SN.runTCPServer
, SN.runTCPServerWithHandle
, forkTCPServer
, runGeneralTCPServer
, SN.ClientSettings
, clientSettings
, SN.runTCPClient
, runGeneralTCPClient
, SN.getPort
, SN.getHost
, SN.getAfterBind
, SN.getNeedLocalAddr
, SN.setPort
, SN.setHost
, SN.setAfterBind
, SN.setNeedLocalAddr
, SN.HostPreference
) where
import Prelude
import Data.Conduit
import Network.Socket (Socket)
import Network.Socket.ByteString (sendAll)
import Data.ByteString (ByteString)
import qualified GHC.Conc as Conc (yield)
import qualified Data.ByteString as S
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad (unless, void)
import Control.Monad.Trans.Control (MonadBaseControl, control, liftBaseWith)
import Control.Monad.Trans.Class (lift)
import Control.Concurrent (forkIO, newEmptyMVar, putMVar, takeMVar, MVar, ThreadId)
import qualified Data.Streaming.Network as SN
sourceSocket :: MonadIO m => Socket -> Producer m ByteString
sourceSocket socket =
loop
where
loop = do
bs <- lift $ liftIO $ SN.safeRecv socket 4096
if S.null bs
then return ()
else yield bs >> loop
sinkSocket :: MonadIO m => Socket -> Consumer ByteString m ()
sinkSocket socket =
loop
where
loop = await >>= maybe (return ()) (\bs -> lift (liftIO $ sendAll socket bs) >> loop)
serverSettings :: Int -> SN.HostPreference -> SN.ServerSettings
serverSettings = SN.serverSettingsTCP
clientSettings :: Int -> ByteString -> SN.ClientSettings
clientSettings = SN.clientSettingsTCP
appSource :: (SN.HasReadWrite ad, MonadIO m) => ad -> Producer m ByteString
appSource ad =
loop
where
read' = SN.appRead ad
loop = do
bs <- liftIO read'
unless (S.null bs) $ do
yield bs
loop
appSink :: (SN.HasReadWrite ad, MonadIO m) => ad -> Consumer ByteString m ()
appSink ad = awaitForever $ \d -> liftIO $ SN.appWrite ad d >> Conc.yield
addBoundSignal::MVar ()-> SN.ServerSettings -> SN.ServerSettings
addBoundSignal isBound set = SN.setAfterBind ( \socket -> originalAfterBind socket >> signalBound socket) set
where originalAfterBind :: Socket -> IO ()
originalAfterBind = SN.getAfterBind set
signalBound :: Socket -> IO ()
signalBound _socket = putMVar isBound ()
forkTCPServer :: MonadBaseControl IO m
=> SN.ServerSettings
-> (SN.AppData -> m ())
-> m ThreadId
forkTCPServer set f =
liftBaseWith $ \run -> do
isBound <- newEmptyMVar
let setWithWaitForBind = addBoundSignal isBound set
threadId <- forkIO . void . run $ runGeneralTCPServer setWithWaitForBind f
takeMVar isBound
return threadId
runGeneralTCPServer :: MonadBaseControl IO m
=> SN.ServerSettings
-> (SN.AppData -> m ())
-> m a
runGeneralTCPServer set f = liftBaseWith $ \run ->
SN.runTCPServer set $ void . run . f
runGeneralTCPClient :: MonadBaseControl IO m
=> SN.ClientSettings
-> (SN.AppData -> m a)
-> m a
runGeneralTCPClient set f = control $ \run ->
SN.runTCPClient set $ run . f