{-# LANGUAGE OverloadedStrings #-} {-| Module : Streaming.UDP Description : Simple fire-and-forget UDP components for Streaming library Copyright : (c) 2018 Mihai Giurgeanu Stability : experimental Portability : GHC -} module Streaming.UDP ( fromUDP, fromSocket, fromIOSocket, toUDP, toSocket, toIOSocket, ) where import Streaming (Of, Stream, lift) import qualified Streaming.Prelude as S import Network.Socket (Socket, SockAddr(SockAddrInet), PortNumber, setSocketOption) import qualified Network.Socket as Socket import Network.Socket.ByteString (recvFrom, send) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Trans.Resource (MonadResource, allocate) import Data.ByteString (ByteString) -- | gets an ip adress, a port and stream of strict `ByteString`s and -- sends all the `ByteString`s to UDP socket toUDP :: (MonadResource m) => String -> PortNumber -> Stream (Of ByteString) m r -> m r toUDP host port = toIOSocket (udpSocket host port SinkSocket) -- | Stream all incoming data to the given connected 'Socket'. Note that this function will -- not automatically close the 'Socket' when processing completes. toSocket :: (MonadIO m) => Socket -> Stream (Of ByteString) m r -> m r toSocket s = loop . S.next where loop :: (MonadIO m) => m (Either r (ByteString, Stream (Of ByteString) m r)) -> m r loop m = m >>= either return loop' loop' :: MonadIO m => (ByteString, Stream (Of ByteString) m r) -> m r loop' (msg, str') = liftIO (send s msg) >> (loop $ S.next str') -- since we are sending a datagram, can't use sendAll; ignore the value returned by send -- | An alternative to 'toSocket'. Instead of taking a pre-opened 'Socket', it -- takes an action that returns a connected 'Socket' , so that it can open it -- only when needed and close it as soon as possible. toIOSocket :: (MonadResource m) => IO Socket -> Stream (Of ByteString) m r -> m r toIOSocket connectAction str = do (_, s) <- allocate connectAction Socket.close toSocket s str -- | Fire-and-forget style UDP source. It will attempt to listen on the -- specified interface and port, and if it succeeds it can read contents -- being sent to that interface and port. -- It streams a pair of (message, source-address) allowing the downstream -- to know who sent the message. fromUDP :: (MonadResource m) => String -- ^ address to bind to -> PortNumber -- ^ port number to bind to -> Int -- ^ maximum size of a datagram -> Stream (Of (ByteString, SockAddr)) m r fromUDP host port sz = fromIOSocket (udpSocket host port SourceSocket) sz -- | Stream the contents of a bound 'Socket' as binary data. Note that this function will -- not automatically close the 'Socket' when processing completes, since it did not acquire -- the 'Socket' in the first place. fromSocket :: (MonadIO m) => Socket -- ^ the socket -> Int -- ^ max lenght of a datagram -> Stream (Of (ByteString, SockAddr)) m r -- ^ the resulting stream of messages fromSocket s sz = loop where loop = do msg <- liftIO $ recvFrom s sz S.yield msg loop -- | An alternative to 'fromSocket'. Instead of taking a pre-bound 'Socket', -- it takes an action that opens and binds a Socket, so that it can open it -- only when needed and close it as soon as possible. fromIOSocket :: (MonadResource m) => IO Socket -- ^ the 'IO' action to create the UDP socket -> Int -- ^ max length of a datagram -> Stream (Of (ByteString, SockAddr)) m r -- ^ the resulting stream of messages fromIOSocket bindAction sz = do (_, s) <- lift $ allocate bindAction Socket.close fromSocket s sz -- | Given a host, port and 'SocketType', create a socket and -- 'Network.Socket.connect' or 'Network.Socket.bind' to the address pair. udpSocket :: String -- ^ IP address to host -> PortNumber -> SocketType -> IO Socket udpSocket host port socketType = do socket <- Socket.socket Socket.AF_INET Socket.Datagram Socket.defaultProtocol setSocketOption socket Socket.ReusePort 1 address <- Socket.inet_addr host socketMode socketType socket (SockAddrInet port address) return socket -- | A type to distinguish when we want to read and write from sockets, -- simplifying the implementation data SocketType = SourceSocket -- ^ source socket, should 'Network.Socket.bind' | SinkSocket -- ^ sink socket, should 'Network.Socket.connect' -- | Use either 'Network.Socket.bind' or 'Network.Socket.connect' depending on the 'SocketType' socketMode :: SocketType -> Socket -> SockAddr -> IO () socketMode socketType = case socketType of SourceSocket -> Socket.bind SinkSocket -> Socket.connect