License | Apache-2.0 |
---|---|
Maintainer | gabriel.volpe@chatroulette.com |
Stability | experimental |
Safe Haskell | None |
Language | Haskell2010 |
Consider the following imports (needs the async library).
import Control.Concurrent ( threadDelay ) import Control.Concurrent.Async ( concurrently_ ) import Control.Monad ( forever ) import Pulsar
A quick example of a consumer and producer running concurrently.
resources :: Pulsar (Consumer IO, Producer IO) resources = do ctx <- connect defaultConnectData consumer <- newConsumer ctx topic "test-sub" producer <- newProducer ctx topic return (consumer, producer)
A Pulsar connection, consumers, and producers are long-lived resources that are managed accordingly for you. Once the program exits, the resources will be released in the respective order (always opposite to the order of acquisition).
main :: IO () main = runPulsar resources $ (Consumer {..}, Producer {..}) -> let c = forever $ fetch >>= (Message i m) -> print m >> ack i p = forever $ threadDelay (5 * 1000000) >> produce "hello world" in concurrently_ c p
Synopsis
- connect :: (MonadThrow m, MonadIO m, MonadManaged m) => ConnectData -> m PulsarCtx
- defaultConnectData :: ConnectData
- newConsumer :: (MonadManaged m, MonadIO f) => PulsarCtx -> Topic -> SubscriptionName -> m (Consumer f)
- newProducer :: (MonadManaged m, MonadIO f) => PulsarCtx -> Topic -> m (Producer f)
- runPulsar :: forall a b. Pulsar a -> (a -> IO b) -> IO b
- runPulsar' :: forall a b. LogOptions -> Pulsar a -> (a -> IO b) -> IO b
- data Consumer m = Consumer {}
- newtype Producer m = Producer {
- produce :: PulsarMessage -> m ()
- data Pulsar a
- data PulsarCtx
- data ConnectData
- data LogLevel
- data LogOptions = LogOptions {}
- data LogOutput
- data Topic = Topic {}
- defaultTopic :: String -> Topic
- data TopicType
- newtype Tenant = Tenant String
- newtype NameSpace = NameSpace String
- newtype TopicName = TopicName String
- newtype MsgId = MsgId MessageIdData
- data Message = Message MsgId ByteString
- newtype PulsarMessage = PulsarMessage ByteString
- newtype SubscriptionName = SubscriptionName Text
Documentation
connect :: (MonadThrow m, MonadIO m, MonadManaged m) => ConnectData -> m PulsarCtx Source #
Starts a Pulsar connection with the supplied ConnectData
defaultConnectData :: ConnectData Source #
Default connection data: "127.0.0.1:6650"
newConsumer :: (MonadManaged m, MonadIO f) => PulsarCtx -> Topic -> SubscriptionName -> m (Consumer f) Source #
Create a new Consumer
by supplying a PulsarCtx
(returned by connect
), a Topic
and a SubscriptionName
.
newProducer :: (MonadManaged m, MonadIO f) => PulsarCtx -> Topic -> m (Producer f) Source #
runPulsar :: forall a b. Pulsar a -> (a -> IO b) -> IO b Source #
Runs a Pulsar computation with default logging to standard output
runPulsar' :: forall a b. LogOptions -> Pulsar a -> (a -> IO b) -> IO b Source #
Runs a Pulsar computation with the supplied logging options
An abstract Producer
able to produce
messages of type PulsarMessage
.
Producer | |
|
The main Pulsar monad, which abstracts over a Managed
monad.
Instances
Monad Pulsar Source # | |
Functor Pulsar Source # | |
Applicative Pulsar Source # | |
MonadIO Pulsar Source # | |
Defined in Pulsar.Internal.Core | |
MonadThrow Pulsar Source # | |
Defined in Pulsar.Internal.Core | |
MonadManaged Pulsar Source # | |
Defined in Pulsar.Internal.Core |
Internal Pulsar context. You will never need to access its content (not exported) but might need to take it as argument.
data ConnectData Source #
Connection details: host and port.
Instances
Show ConnectData Source # | |
Defined in Pulsar.Connection showsPrec :: Int -> ConnectData -> ShowS # show :: ConnectData -> String # showList :: [ConnectData] -> ShowS # |
Internal logging level, part of LogOptions
. Can be used together with runPulsar
`.
data LogOptions Source #
Internal logging options. Can be used together with runPulsar
`.
Instances
Show LogOptions Source # | |
Defined in Pulsar.Internal.Core showsPrec :: Int -> LogOptions -> ShowS # show :: LogOptions -> String # showList :: [LogOptions] -> ShowS # |
Internal logging output, part of LogOptions
. Can be used together with runPulsar
`.
A Topic is in the form "type://tenant/namespace/topic-name", which is what the Show
instance does.
defaultTopic :: String -> Topic Source #
A default Topic
: "non-persistent://public/default/my-topic".
A topic can be either Persistent
or NonPersistent
.
A tenant can be any string value. Default value is "public".
A namespace can be any string value. Default value is "default".
A topic name can be any string value.
A consumed message, containing both MsgId
and payload as bytestring.
newtype PulsarMessage Source #
A produced message, containing just a payload as bytestring.
Instances
Show PulsarMessage Source # | |
Defined in Pulsar.Types showsPrec :: Int -> PulsarMessage -> ShowS # show :: PulsarMessage -> String # showList :: [PulsarMessage] -> ShowS # | |
IsString PulsarMessage Source # | |
Defined in Pulsar.Types fromString :: String -> PulsarMessage # |
newtype SubscriptionName Source #
A subscription name can be any string value.
Instances
Show SubscriptionName Source # | |
Defined in Pulsar.Types showsPrec :: Int -> SubscriptionName -> ShowS # show :: SubscriptionName -> String # showList :: [SubscriptionName] -> ShowS # | |
IsString SubscriptionName Source # | |
Defined in Pulsar.Types fromString :: String -> SubscriptionName # |