CloudEvents Haskell SDK
An unofficial Haskell SDK for the CloudEvents specification, particularly for version 1.0.2 of the spec. (JSON Schema)
Overview
CloudEvents is a vendor-neutral specification for defining the format of event data. This SDK provides a type-safe and idiomatic way to work with CloudEvents in Haskell, making it easier to produce and consume CloudEvents across different services and platforms.
The CloudEvents Haskell SDK allows you to:
- Create CloudEvents with strongly typed event data and extensions
- Validate CloudEvents against the specification
- Encode and decode CloudEvents in JSON format
- Integrate with message brokers like Apache Kafka
Installation
Add the package to your project's dependencies:
Using Cabal
# In your .cabal file
build-depends: cloudevents-haskell
Using Stack
dependencies:
- cloudevents-haskell
Key Features
- Type-safe CloudEvent representation - Events modeled as native Haskell types with proper validation
- Flexible event data - Support for arbitrary event data types via polymorphism
- Extension mechanism - Define and use custom extension attributes
- Protocol bindings - Integration with Apache Kafka (binary and structured modes)
- JSON encoding/decoding - Automatic serialization with Aeson via Autodocodec
- Lenses - Convenient access and modification of CloudEvent fields
- Validation - Validate CloudEvents against the JSON schema
Quick Start Examples
Creating a Simple CloudEvent
import CloudEvent.V1.Event.Data
import Data.Text (Text)
simpleEvent :: CloudEvent EmptyExtension Text
simpleEvent =
mkCloudEvent
MkCloudEventArg
{ ceId = "1234-1234-1234"
, ceSource =
, ceEventType = "com.example.event.created"
, ceDataContentType = Just "text/plain"
, ceDataSchema = Nothing
, ceSubject = Just "resource123"
, ceTime =
}
"This is the event payload as plain text"
EmptyExtension
Working with Structured Data
import CloudEvent.V1.Event.Data
import Data.Aeson (FromJSON, ToJSON)
import Data.Text (Text)
import GHC.Generics (Generic)
import Autodocodec
data UserCreatedEvent = UserCreatedEvent
{ userId :: Text
, username :: Text
, email :: Text
}
deriving stock (Eq, Show, Generic)
deriving (FromJSON, ToJSON) via (Autodocodec UserCreatedEvent)
instance HasCodec UserCreatedEvent where
codec =
object "UserCreatedEvent" $
UserCreatedEvent
<$> requiredField' "userId" .= (.userId)
<*> requiredField' "username" .= (.username)
<*> requiredField' "email" .= (.email)
userCreatedEvent :: CloudEvent EmptyExtension UserCreatedEvent
userCreatedEvent =
mkCloudEvent
MkCloudEventArg
{ ceId = "user-123-456"
, ceSource =
, ceEventType = "com.example.user.created"
, ceDataContentType = Just "application/json"
, ceDataSchema = Nothing
, ceSubject = Just "resource123"
, ceTime =
}
UserCreatedEvent
{ userId = "123"
, username = "johndoe"
, email = "john@example.com"
}
EmptyExtension
Using Custom Extensions
import CloudEvent.V1.Event.Data
import Data.Aeson (FromJSON, ToJSON)
import Data.Text (Text)
import GHC.Generics (Generic)
import Autodocodec
data TraceExtension = TraceExtension
{ traceId :: Text
, spanId :: Text
}
deriving stock (Eq, Show, Generic)
deriving (FromJSON, ToJSON) via (Autodocodec TraceExtension)
instance HasCodec TraceExtension where
codec = object "TraceExtension" objectCodec
instance HasObjectCodec TraceExtension where
objectCodec =
TraceExtension
<$> requiredField' "traceId" .= (.traceId)
<*> requiredField' "spanId" .= (.spanId)
eventWithTracing :: CloudEvent TraceExtension Text
eventWithTracing =
mkCloudEvent
MkCloudEventArg
{ ceId = "evt-789-abc"
, ceSource =
, ceEventType = "com.example.order.processed"
, ceDataContentType = Just "text/plain"
, ceDataSchema = Nothing
, ceSubject = Just "order/456"
, ceTime =
}
"Order 456 has been processed successfully"
TraceExtension
{ traceId = "trace-123456789"
, spanId = "span-abcdef"
}
Validating CloudEvents
The SDK provides functionality to validate CloudEvents against the specification:
import CloudEvent.V1.Event.Data
import CloudEvent.V1.Event.Validation
import Data.Aeson (Value, toJSON)
validateJsonCloudEvent :: Value -> Bool
validateJsonCloudEvent jsonValue =
isValidCloudEvent @MyEventData @MyExtensions jsonValue
Kafka Integration
The SDK provides integration with Apache Kafka through the CloudEvent.V1.Bindings.Kafka
module:
import CloudEvent.V1.Bindings.Kafka
import CloudEvent.V1.Event.Data
import Data.Text (Text)
import Kafka.Producer (TopicName)
kafkaEvent :: CloudEvent KafkaCloudEventExtAttributes Text
kafkaEvent =
mkCloudEvent
MkCloudEventArg
{ ceId = "evt-kafka-123"
, ceSource =
, ceEventType = "com.example.inventory.updated"
, ceDataContentType = Just "text/plain"
, ceDataSchema = Nothing
, ceSubject = Just "product/123"
, ceTime =
}
"Product 123 inventory updated to 42 units"
KafkaCloudEventExtAttributes{partitionKey = "product-123"}
sendBinary :: TopicName -> CloudEvent KafkaCloudEventExtAttributes Text -> ProducerRecord
sendBinary topic event = toBinaryKafkaMessage topic event
sendStructured :: TopicName -> CloudEvent KafkaCloudEventExtAttributes Text -> ProducerRecord
sendStructured topic event = toStructuredKafkaMessage topic event
Using Lenses
The SDK provides lenses for convenient access and modification of CloudEvent fields:
import CloudEvent.V1.Event.Data
import CloudEvent.V1.Event.Lens
import Control.Lens
import Data.Text (Text)
getEventId :: CloudEvent ext eventData -> Text
getEventId event = event ^. ceId
updateSubject :: Text -> CloudEvent ext eventData -> CloudEvent ext eventData
updateSubject newSubject event =
event & ceSubject .~ Just newSubject
Complete Example
Here's a more complete example showing how to create, validate, and send a CloudEvent to Kafka:
module Example where
import Autodocodec
import CloudEvent.V1.Bindings.Kafka
import CloudEvent.V1.Event.Data
import CloudEvent.V1.Event.Lens
import CloudEvent.V1.Event.Validation
import Control.Lens
import Data.Aeson (FromJSON, ToJSON, toJSON)
import Data.Text (Text)
import GHC.Generics (Generic)
import Kafka.Producer
data OrderCreatedEvent = OrderCreatedEvent
{ orderId :: Text
, customerId :: Text
, amount :: Double
, items :: [Text]
}
deriving stock (Eq, Show, Generic)
deriving (FromJSON, ToJSON) via (Autodocodec OrderCreatedEvent)
instance HasCodec OrderCreatedEvent where
codec =
object "OrderCreatedEvent" $
OrderCreatedEvent
<$> requiredField' "orderId" .= (.orderId)
<*> requiredField' "customerId" .= (.customerId)
<*> requiredField' "amount" .= (.amount)
<*> requiredField' "items" .= (.items)
data OrderExtension = OrderExtension
{ partitionKey :: Text
, region :: Text
}
deriving stock (Eq, Show, Generic)
instance HasCodec OrderExtension where
codec = object "OrderExtension" objectCodec
instance HasObjectCodec OrderExtension where
objectCodec =
OrderExtension
<$> requiredField' "partitionKey" .= (.partitionKey)
<*> requiredField' "region" .= (.region)
createOrderEvent :: Text -> OrderCreatedEvent -> OrderExtension -> CloudEvent OrderExtension OrderCreatedEvent
createOrderEvent orderId orderData ext =
mkCloudEvent
MkCloudEventArg
{ ceId = "order-" <> orderId
, ceSource =
, ceEventType = "com.example.order.created"
, ceDataContentType = Just "application/json"
, ceDataSchema = Nothing
, ceSubject = Just ("order/" <> orderId)
, ceTime =
}
orderData
ext
processOrder :: Text -> Text -> Double -> [Text] -> Text -> IO ()
processOrder orderId customerId amount items region = do
let orderData = OrderCreatedEvent
{ orderId = orderId
, customerId = customerId
, amount = amount
, items = items
}
extension = OrderExtension
{ partitionKey = orderId
, region = region
}
event = createOrderEvent orderId orderData extension
topic = "orders"
producerRecord = toBinaryKafkaMessage topic event
let isValid = isValidCloudEvent @OrderCreatedEvent @OrderExtension (toJSON event)
if isValid
then do
putStrLn "Event is valid, sending to Kafka..."
pure ()
else
putStrLn "Event validation failed!"
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the BSD-3-Clause License - see the LICENSE file for details.