module Data.Conduit.Network
(
sourceSocket
, sinkSocket
, SN.AppData
, appSource
, appSink
, SN.appSockAddr
, SN.appLocalAddr
, SN.ServerSettings
, serverSettings
, SN.runTCPServer
, SN.runTCPServerWithHandle
, forkTCPServer
, runGeneralTCPServer
, SN.ClientSettings
, clientSettings
, SN.runTCPClient
, runGeneralTCPClient
, SN.getPort
, SN.getHost
, SN.getAfterBind
, SN.getNeedLocalAddr
, SN.setPort
, SN.setHost
, SN.setAfterBind
, SN.setNeedLocalAddr
, SN.HostPreference
) where
import Prelude hiding (catch)
import Data.Conduit
import qualified Network.Socket as NS
import Network.Socket (Socket)
import Network.Socket.ByteString (sendAll, recv)
import Data.ByteString (ByteString)
import qualified GHC.Conc as Conc (yield)
import qualified Data.ByteString as S
import qualified Data.ByteString.Char8 as S8
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Exception (throwIO, SomeException, try, finally, bracket, IOException, catch)
import Control.Monad (unless, void)
import Control.Monad.Trans.Control (MonadBaseControl, control, liftBaseWith)
import Control.Monad.Trans.Class (lift)
import Control.Concurrent (forkIO, newEmptyMVar, putMVar, takeMVar, MVar, ThreadId)
import qualified Data.Streaming.Network as SN
sourceSocket :: MonadIO m => Socket -> Producer m ByteString
sourceSocket socket =
loop
where
loop = do
bs <- lift $ liftIO $ SN.safeRecv socket 4096
if S.null bs
then return ()
else yield bs >> loop
sinkSocket :: MonadIO m => Socket -> Consumer ByteString m ()
sinkSocket socket =
loop
where
loop = await >>= maybe (return ()) (\bs -> lift (liftIO $ sendAll socket bs) >> loop)
serverSettings = SN.serverSettingsTCP
clientSettings = SN.clientSettingsTCP
appSource :: (SN.HasReadWrite ad, MonadIO m) => ad -> Producer m ByteString
appSource ad =
loop
where
read' = SN.appRead ad
loop = do
bs <- liftIO read'
unless (S.null bs) $ do
yield bs
loop
appSink :: (SN.HasReadWrite ad, MonadIO m) => ad -> Consumer ByteString m ()
appSink ad = awaitForever $ \d -> liftIO $ SN.appWrite ad d >> Conc.yield
addBoundSignal::MVar ()-> SN.ServerSettings -> SN.ServerSettings
addBoundSignal isBound set = SN.setAfterBind ( \socket -> originalAfterBind socket >> signalBound socket) set
where originalAfterBind :: Socket -> IO ()
originalAfterBind = SN.getAfterBind set
signalBound :: Socket -> IO ()
signalBound _socket = putMVar isBound ()
forkTCPServer :: MonadBaseControl IO m
=> SN.ServerSettings
-> (SN.AppData -> m ())
-> m ThreadId
forkTCPServer set f =
liftBaseWith $ \run -> do
isBound <- newEmptyMVar
let setWithWaitForBind = addBoundSignal isBound set
threadId <- forkIO . void . run $ runGeneralTCPServer setWithWaitForBind f
takeMVar isBound
return threadId
runGeneralTCPServer :: MonadBaseControl IO m
=> SN.ServerSettings
-> (SN.AppData -> m ())
-> m ()
runGeneralTCPServer set f = liftBaseWith $ \run ->
SN.runTCPServer set $ void . run . f
runGeneralTCPClient :: MonadBaseControl IO m
=> SN.ClientSettings
-> (SN.AppData -> m a)
-> m a
runGeneralTCPClient set f = control $ \run ->
SN.runTCPClient set $ run . f