Franz
Franz is an append-only container format, forked from liszt.
Each stream is stored as a pair of concatenated payloads with an array of their
byte offsets.
Design requirements
- The writer must be integrated so that no server failure blocks the application.
- There's a way to archive streams into one file.
- There's a way to fetch data in a period of time efficiently.
- In particular, the server should be able to search by timestamps, rather than performing binary search by the client.
- The server must not take too long to restart.
Usecase
- Instances of franzd are running on a remote server and a local gateway.
- The application produces franz files locally using the writer API.
- On the local gateway, a proxy connects to the remote server and downsamples the file.
- Clients can connect to the gateway. When needed, they may also connect directly to the remote server.
The on-disk representation of a franz stream comprises the following files:
payloads
: concatenated payloads
offsets
: A sequence of N-tuples of 64-bit little endian integers representing
- 0th: byte offsets of payloads
- nth, n ∈ [1..N]: the value of nth index, where N is the number of index names
indices
: Line-separated list of index names. An index represents a 64 bit little-endian integer attached to a payload.
A stream is stored as a directory containing the files above.
The Franz reader also supports a squashfs image, provided that the content is a valid franz stream.
franzd
franzd is a read-only server which follows franz files and gives access on wire.
Where to look for streams can be specified as a command-line argument, separately for live streams and squashfs images.
Each stream is stored as a pair of concatenated payloads with an array of their
byte offsets.
franzd --live /path/to/live --archive /path/to/archive
Why not Kafka
- None of us want to debug/contribute to kafka.
- Trying to read from a stream creates the stream (this is a problem due to the way we name our streams and rely on
latest
)
- Can't delete a stream as long as there is a reader existing
- Lack of understanding of it (but there is a lot of good documentation out there. recommended)
- Kafka takes a long time to start up after an abnormal shutdown on the server side
- Supports clustering but sometimes makes the reliability of the whole system worse
Client API
Database.Franz.Client
exposes the client API.
You can obtain a Connection
to a remote franz file with withConnection
.
It tries to mount a squashfs image at path
. This is shared between connections, and unmounts when the last client closes the connection.
toFranzPath :: String -> Either String FranzPath
withConnection :: (MonadIO m, MonadMask m)
=> FranzPath
-> (Connection -> m r) -> m r
data RequestType = AllItems | LastItem deriving (Show, Generic)
data ItemRef = BySeqNum !Int -- ^ sequential number
| ByIndex !IndexName Int -- ^ index name and value
data Query = Query
{ reqStream :: !StreamName
, reqFrom :: !ItemRef -- ^ name of the index to search
, reqTo :: !ItemRef -- ^ name of the index to search
, reqType :: !RequestType
} deriving (Show, Generic)
-- | When it is 'Right', it blocks until the content is available on the server.
type Response = Either Contents (STM Contents)
fetch :: Connection
-> Query
-> (STM Response -> IO r)
-- ^ running the STM action blocks until the response arrives
-> IO r
Contents
is a datatype containing triples of sequential numbers, indices and payloads. It is recommended to import Database.Franz.Contents
qualified.
data Contents
data Item = Item
{ seqNo :: !Int
, indices :: !(U.Vector Int64)
, payload :: !B.ByteString
} deriving (Show, Eq)
toList :: Contents -> [Item]
Writer API
Database.Franz.Writer
provides the writer interface.
withWriter :: Foldable f
=> f String
-> FilePath
-> (WriterHandle f -> IO a)
-> IO a
withWriter
acquires a handle. The f String
parameter represents a list of index names.
write :: Foldable f
=> WriterHandle f
-> f Int64 -- ^ index values
-> Builder -- ^ payload
-> IO Int
flush :: WriterHandle f -> IO ()
write
appends a payload to the stream. f Int64
is the list of index values, and it has to have the same length as the one you specified in withWriter
. Changes will be written to disk whenever the buffer gets full or you call flush
.
If you don't need the index mechanism, you can use Database.Franz.Writer.Simple
instead.