Safe Haskell | None |
---|---|
Language | Haskell2010 |
Primary module containing everything needed to create stateful functions with Flink.
All stateful functions should have a single record type that represents the entire internal state
of the function. Stateful functions API provides many "slots" to store state, but for the purposes of this library
that is hardcoded to the single key flink_state
which you can see in the example module.yaml
.
The FlinkState typeclass abstracts serialization away from the library so that users can decide how
state should be serialized. Aeson is very convenient so I use it in the example, but protobuf or any other
binary format is also acceptable. Flink essentially stores function state as an opaque ByteString
regardless.
When running your program don't forget to pass +RTS -N
to your binary to run on all cores.
Synopsis
- class MonadIO m => StatefulFunc s m | m -> s where
- makeConcrete :: (FlinkState s, Message a) => (a -> Function s ()) -> ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString))
- createApp :: FunctionTable -> Application
- flinkServer :: FunctionTable -> Server FlinkApi
- flinkApi :: Proxy FlinkApi
- class FlinkState s where
- decodeState :: ByteString -> Either String s
- encodeState :: s -> ByteString
- type FunctionTable = Map (Text, Text) (ByteString, ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString)))
Documentation
class MonadIO m => StatefulFunc s m | m -> s where Source #
Used to represent all Flink stateful function capabilities.
Contexts are received from Flink and deserialized into s
all modifications to state are shipped back to Flink at the end of the
batch to be persisted.
Message passing is also queued up and passed back at the end of the current batch.
Example of a stateless function (done by setting s
to ()
) that adds one
to a number and puts the protobuf response on Kafka via an egress message:
adder :: StatefulFunc () m => AdderRequest -> m () adder msg = sendEgressMsg ("adder", "added") (kafkaRecord "added" name added) where num = msg ^. AdderRequest.num added = defMessage & AdderResponse.num .~ (num + 1)
Example of a stateful function:
newtype GreeterState = GreeterState { greeterStateCount :: Int } deriving (Generic, Show, ToJSON, FromJSON) instance FlinkState GreeterState where decodeState = eitherDecode . BSL.fromStrict encodeState = BSL.toStrict . Data.Aeson.encode counter :: StatefulFunc GreeterState m => EX.GreeterRequest -> m () counter msg = do newCount <- (+ 1) <$> insideCtx greeterStateCount let respMsg = "Saw " <> T.unpack name <> " " <> show newCount <> " time(s)" sendEgressMsg ("greeting", "greets") (kafkaRecord "greets" name $ response (T.pack respMsg)) modifyCtx (old -> old {greeterStateCount = newCount}) where name = msg ^. EX.name response :: Text -> EX.GreeterResponse response greeting = defMessage & EX.greeting .~ greeting
This will respond to each event by counting how many times it has been called for the name it was passed. The final state is taken and sent back to Flink. Failures of any kind will cause state to rollback to previous values seamlessly without double counting.
setInitialCtx, insideCtx, getCtx, setCtx, modifyCtx, sendMsg, sendMsgDelay, sendEgressMsg
insideCtx :: (s -> a) -> m a Source #
modifyCtx :: (s -> s) -> m () Source #
:: Message a | |
=> (Text, Text, Text) | Function address (namespace, type, id) |
-> a | protobuf message to send |
-> m () |
Instances
FlinkState s => StatefulFunc s (Function s) Source # | |
Defined in Network.Flink.Internal.Stateful setInitialCtx :: s -> Function s () insideCtx :: (s -> a) -> Function s a Source # getCtx :: Function s s Source # setCtx :: s -> Function s () Source # modifyCtx :: (s -> s) -> Function s () Source # sendMsg :: Message a => (Text, Text, Text) -> a -> Function s () Source # sendMsgDelay :: Message a => (Text, Text, Text) -> Int -> a -> Function s () Source # sendEgressMsg :: Message a => (Text, Text) -> a -> Function s () Source # |
makeConcrete :: (FlinkState s, Message a) => (a -> Function s ()) -> ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString)) Source #
Takes a function taking an abstract state/message type and converts it to take concrete ByteString
s
This allows each function in the FunctionTable
to take its own individual type of state and just expose
a function accepting ByteString
to the library code.
createApp :: FunctionTable -> Application Source #
Takes function table and creates a wai Application
to serve flink requests
flinkServer :: FunctionTable -> Server FlinkApi Source #
Takes function table and creates a servant Server
to serve flink requests
class FlinkState s where Source #
Provides functions for Flink state SerDe
decodeState :: ByteString -> Either String s Source #
decodes Flink state types from strict ByteString
s
encodeState :: s -> ByteString Source #
encodes Flink state types to strict ByteString
s
Instances
FlinkState () Source # | |
Defined in Network.Flink.Internal.Stateful decodeState :: ByteString -> Either String () Source # encodeState :: () -> ByteString Source # |
type FunctionTable = Map (Text, Text) (ByteString, ByteString -> Env -> ToFunction'InvocationBatchRequest -> IO (Either FlinkError (FunctionState ByteString))) Source #