module Database.MySQL.BinLog
(
SlaveID
, BinLogTracker(..)
, registerPesudoSlave
, dumpBinLog
, RowBinLogEvent(..)
, decodeRowBinLogEvent
, getLastBinLogTracker
, isCheckSumEnabled
, isSemiSyncEnabled
, module Database.MySQL.BinLogProtocol.BinLogEvent
, module Database.MySQL.BinLogProtocol.BinLogValue
, module Database.MySQL.BinLogProtocol.BinLogMeta
) where
import Control.Exception (throwIO)
import Control.Monad
import Data.Binary.Put
import Data.ByteString (ByteString)
import Data.IORef (IORef, newIORef,
readIORef,
writeIORef)
import Data.Text.Encoding (encodeUtf8)
import Data.Word
import Database.MySQL.Base
import Database.MySQL.BinLogProtocol.BinLogEvent
import Database.MySQL.BinLogProtocol.BinLogMeta
import Database.MySQL.BinLogProtocol.BinLogValue
import Database.MySQL.Connection
import GHC.Generics (Generic)
import System.IO.Streams (InputStream)
import qualified System.IO.Streams as Stream
type SlaveID = Word32
data BinLogTracker = BinLogTracker
{ BinLogTracker -> ByteString
btFileName :: {-# UNPACK #-} !ByteString
, BinLogTracker -> Word32
btNextPos :: {-# UNPACK #-} !Word32
} deriving (Int -> BinLogTracker -> ShowS
[BinLogTracker] -> ShowS
BinLogTracker -> String
(Int -> BinLogTracker -> ShowS)
-> (BinLogTracker -> String)
-> ([BinLogTracker] -> ShowS)
-> Show BinLogTracker
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BinLogTracker -> ShowS
showsPrec :: Int -> BinLogTracker -> ShowS
$cshow :: BinLogTracker -> String
show :: BinLogTracker -> String
$cshowList :: [BinLogTracker] -> ShowS
showList :: [BinLogTracker] -> ShowS
Show, BinLogTracker -> BinLogTracker -> Bool
(BinLogTracker -> BinLogTracker -> Bool)
-> (BinLogTracker -> BinLogTracker -> Bool) -> Eq BinLogTracker
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: BinLogTracker -> BinLogTracker -> Bool
== :: BinLogTracker -> BinLogTracker -> Bool
$c/= :: BinLogTracker -> BinLogTracker -> Bool
/= :: BinLogTracker -> BinLogTracker -> Bool
Eq, (forall x. BinLogTracker -> Rep BinLogTracker x)
-> (forall x. Rep BinLogTracker x -> BinLogTracker)
-> Generic BinLogTracker
forall x. Rep BinLogTracker x -> BinLogTracker
forall x. BinLogTracker -> Rep BinLogTracker x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. BinLogTracker -> Rep BinLogTracker x
from :: forall x. BinLogTracker -> Rep BinLogTracker x
$cto :: forall x. Rep BinLogTracker x -> BinLogTracker
to :: forall x. Rep BinLogTracker x -> BinLogTracker
Generic)
registerPesudoSlave :: MySQLConn -> SlaveID -> IO OK
registerPesudoSlave :: MySQLConn -> Word32 -> IO OK
registerPesudoSlave MySQLConn
conn Word32
sid = MySQLConn -> Command -> IO OK
command MySQLConn
conn (Word32
-> ByteString
-> ByteString
-> ByteString
-> Word16
-> Word32
-> Word32
-> Command
COM_REGISTER_SLAVE Word32
sid ByteString
"" ByteString
"" ByteString
"" Word16
0 Word32
0 Word32
0)
dumpBinLog :: MySQLConn
-> SlaveID
-> BinLogTracker
-> Bool
-> IO (FormatDescription, IORef ByteString, InputStream BinLogPacket)
dumpBinLog :: MySQLConn
-> Word32
-> BinLogTracker
-> Bool
-> IO
(FormatDescription, IORef ByteString, InputStream BinLogPacket)
dumpBinLog conn :: MySQLConn
conn@(MySQLConn InputStream Packet
is Packet -> IO ()
wp IO ()
_ IORef Bool
consumed) Word32
sid (BinLogTracker ByteString
initfn Word32
initpos) Bool
wantAck = do
MySQLConn -> IO ()
guardUnconsumed MySQLConn
conn
Bool
checksum <- MySQLConn -> IO Bool
isCheckSumEnabled MySQLConn
conn
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
checksum (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO OK -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO OK -> IO ()) -> IO OK -> IO ()
forall a b. (a -> b) -> a -> b
$ MySQLConn -> Query -> IO OK
execute_ MySQLConn
conn Query
"SET @master_binlog_checksum = @@global.binlog_checksum"
Bool
semiAck <- MySQLConn -> IO Bool
isSemiSyncEnabled MySQLConn
conn
let needAck :: Bool
needAck = Bool
semiAck Bool -> Bool -> Bool
&& Bool
wantAck
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
needAck (IO () -> IO ()) -> (IO OK -> IO ()) -> IO OK -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO OK -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO OK -> IO ()) -> IO OK -> IO ()
forall a b. (a -> b) -> a -> b
$ MySQLConn -> Query -> IO OK
execute_ MySQLConn
conn Query
"SET @rpl_semi_sync_slave = 1"
Command -> (Packet -> IO ()) -> IO ()
writeCommand (Word32 -> Word16 -> Word32 -> ByteString -> Command
COM_BINLOG_DUMP Word32
initpos Word16
0x00 Word32
sid ByteString
initfn) Packet -> IO ()
wp
IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
consumed Bool
False
BinLogPacket
rp <- IO (Maybe BinLogPacket) -> BinLogEventType -> IO BinLogPacket
skipToPacketT (Bool -> Bool -> InputStream Packet -> IO (Maybe BinLogPacket)
readBinLogPacket Bool
checksum Bool
needAck InputStream Packet
is) BinLogEventType
BINLOG_ROTATE_EVENT
RotateEvent
re <- Get RotateEvent -> BinLogPacket -> IO RotateEvent
forall a. Get a -> BinLogPacket -> IO a
getFromBinLogPacket Get RotateEvent
getRotateEvent BinLogPacket
rp
IORef ByteString
fref <- ByteString -> IO (IORef ByteString)
forall a. a -> IO (IORef a)
newIORef (RotateEvent -> ByteString
rFileName RotateEvent
re)
BinLogPacket
p <- IO (Maybe BinLogPacket) -> BinLogEventType -> IO BinLogPacket
skipToPacketT (Bool -> Bool -> InputStream Packet -> IO (Maybe BinLogPacket)
readBinLogPacket Bool
checksum Bool
needAck InputStream Packet
is) BinLogEventType
BINLOG_FORMAT_DESCRIPTION_EVENT
Bool
-> BinLogPacket -> IORef ByteString -> (Packet -> IO ()) -> IO ()
replyAck Bool
needAck BinLogPacket
p IORef ByteString
fref Packet -> IO ()
wp
FormatDescription
fmt <- Get FormatDescription -> BinLogPacket -> IO FormatDescription
forall a. Get a -> BinLogPacket -> IO a
getFromBinLogPacket Get FormatDescription
getFormatDescription BinLogPacket
p
InputStream BinLogPacket
es <- IO (Maybe BinLogPacket) -> IO (InputStream BinLogPacket)
forall a. IO (Maybe a) -> IO (InputStream a)
Stream.makeInputStream (IO (Maybe BinLogPacket) -> IO (InputStream BinLogPacket))
-> IO (Maybe BinLogPacket) -> IO (InputStream BinLogPacket)
forall a b. (a -> b) -> a -> b
$ do
Maybe BinLogPacket
q <- Bool -> Bool -> InputStream Packet -> IO (Maybe BinLogPacket)
readBinLogPacket Bool
checksum Bool
needAck InputStream Packet
is
case Maybe BinLogPacket
q of
Maybe BinLogPacket
Nothing -> IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
consumed Bool
True IO () -> IO (Maybe BinLogPacket) -> IO (Maybe BinLogPacket)
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe BinLogPacket -> IO (Maybe BinLogPacket)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe BinLogPacket
forall a. Maybe a
Nothing
Just BinLogPacket
q' -> do Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (BinLogPacket -> BinLogEventType
blEventType BinLogPacket
q' BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_ROTATE_EVENT) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
RotateEvent
e <- Get RotateEvent -> BinLogPacket -> IO RotateEvent
forall a. Get a -> BinLogPacket -> IO a
getFromBinLogPacket Get RotateEvent
getRotateEvent BinLogPacket
q'
IORef ByteString -> ByteString -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef' IORef ByteString
fref (RotateEvent -> ByteString
rFileName RotateEvent
e)
Bool
-> BinLogPacket -> IORef ByteString -> (Packet -> IO ()) -> IO ()
replyAck Bool
needAck BinLogPacket
q' IORef ByteString
fref Packet -> IO ()
wp
Maybe BinLogPacket -> IO (Maybe BinLogPacket)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe BinLogPacket
q
(FormatDescription, IORef ByteString, InputStream BinLogPacket)
-> IO
(FormatDescription, IORef ByteString, InputStream BinLogPacket)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (FormatDescription
fmt, IORef ByteString
fref, InputStream BinLogPacket
es)
where
skipToPacketT :: IO (Maybe BinLogPacket) -> BinLogEventType -> IO BinLogPacket
skipToPacketT IO (Maybe BinLogPacket)
iop BinLogEventType
typ = do
Maybe BinLogPacket
p <- IO (Maybe BinLogPacket)
iop
case Maybe BinLogPacket
p of
Just BinLogPacket
p' -> do
if BinLogPacket -> BinLogEventType
blEventType BinLogPacket
p' BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
typ then BinLogPacket -> IO BinLogPacket
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return BinLogPacket
p' else IO (Maybe BinLogPacket) -> BinLogEventType -> IO BinLogPacket
skipToPacketT IO (Maybe BinLogPacket)
iop BinLogEventType
typ
Maybe BinLogPacket
Nothing -> NetworkException -> IO BinLogPacket
forall e a. Exception e => e -> IO a
throwIO NetworkException
NetworkException
replyAck :: Bool
-> BinLogPacket -> IORef ByteString -> (Packet -> IO ()) -> IO ()
replyAck Bool
needAck BinLogPacket
p IORef ByteString
fref Packet -> IO ()
wp' = Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
needAck Bool -> Bool -> Bool
&& BinLogPacket -> Bool
blSemiAck BinLogPacket
p) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
ByteString
fn <- IORef ByteString -> IO ByteString
forall a. IORef a -> IO a
readIORef IORef ByteString
fref
Packet -> IO ()
wp' (Word64 -> ByteString -> Packet
makeSemiAckPacket (BinLogPacket -> Word64
blLogPos BinLogPacket
p) ByteString
fn)
makeSemiAckPacket :: Word64 -> ByteString -> Packet
makeSemiAckPacket Word64
pos ByteString
fn = Word8 -> Put -> Packet
putToPacket Word8
0 (Put -> Packet) -> Put -> Packet
forall a b. (a -> b) -> a -> b
$ do
Word8 -> Put
putWord8 Word8
0xEF
Word64 -> Put
putWord64le Word64
pos
ByteString -> Put
putByteString ByteString
fn
readBinLogPacket :: Bool -> Bool -> InputStream Packet -> IO (Maybe BinLogPacket)
readBinLogPacket Bool
checksum Bool
needAck InputStream Packet
is' = do
Packet
p <- InputStream Packet -> IO Packet
readPacket InputStream Packet
is'
if | Packet -> Bool
isOK Packet
p -> BinLogPacket -> Maybe BinLogPacket
forall a. a -> Maybe a
Just (BinLogPacket -> Maybe BinLogPacket)
-> IO BinLogPacket -> IO (Maybe BinLogPacket)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get BinLogPacket -> Packet -> IO BinLogPacket
forall a. Get a -> Packet -> IO a
getFromPacket (Bool -> Bool -> Get BinLogPacket
getBinLogPacket Bool
checksum Bool
needAck) Packet
p
| Packet -> Bool
isEOF Packet
p -> Maybe BinLogPacket -> IO (Maybe BinLogPacket)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe BinLogPacket
forall a. Maybe a
Nothing
| Packet -> Bool
isERR Packet
p -> Packet -> IO ERR
forall a. Binary a => Packet -> IO a
decodeFromPacket Packet
p IO ERR
-> (ERR -> IO (Maybe BinLogPacket)) -> IO (Maybe BinLogPacket)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ERRException -> IO (Maybe BinLogPacket)
forall e a. Exception e => e -> IO a
throwIO (ERRException -> IO (Maybe BinLogPacket))
-> (ERR -> ERRException) -> ERR -> IO (Maybe BinLogPacket)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ERR -> ERRException
ERRException
| Bool
otherwise -> String -> IO (Maybe BinLogPacket)
forall a. HasCallStack => String -> a
error (String
"unkown package" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Packet -> String
forall a. Show a => a -> String
show Packet
p)
data RowBinLogEvent
= RowQueryEvent {-# UNPACK #-} !Word32 !BinLogTracker !QueryEvent'
| RowDeleteEvent {-# UNPACK #-} !Word32 !BinLogTracker !TableMapEvent !DeleteRowsEvent
| RowWriteEvent {-# UNPACK #-} !Word32 !BinLogTracker !TableMapEvent !WriteRowsEvent
| RowUpdateEvent {-# UNPACK #-} !Word32 !BinLogTracker !TableMapEvent !UpdateRowsEvent
deriving (Int -> RowBinLogEvent -> ShowS
[RowBinLogEvent] -> ShowS
RowBinLogEvent -> String
(Int -> RowBinLogEvent -> ShowS)
-> (RowBinLogEvent -> String)
-> ([RowBinLogEvent] -> ShowS)
-> Show RowBinLogEvent
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RowBinLogEvent -> ShowS
showsPrec :: Int -> RowBinLogEvent -> ShowS
$cshow :: RowBinLogEvent -> String
show :: RowBinLogEvent -> String
$cshowList :: [RowBinLogEvent] -> ShowS
showList :: [RowBinLogEvent] -> ShowS
Show, RowBinLogEvent -> RowBinLogEvent -> Bool
(RowBinLogEvent -> RowBinLogEvent -> Bool)
-> (RowBinLogEvent -> RowBinLogEvent -> Bool) -> Eq RowBinLogEvent
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RowBinLogEvent -> RowBinLogEvent -> Bool
== :: RowBinLogEvent -> RowBinLogEvent -> Bool
$c/= :: RowBinLogEvent -> RowBinLogEvent -> Bool
/= :: RowBinLogEvent -> RowBinLogEvent -> Bool
Eq, (forall x. RowBinLogEvent -> Rep RowBinLogEvent x)
-> (forall x. Rep RowBinLogEvent x -> RowBinLogEvent)
-> Generic RowBinLogEvent
forall x. Rep RowBinLogEvent x -> RowBinLogEvent
forall x. RowBinLogEvent -> Rep RowBinLogEvent x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. RowBinLogEvent -> Rep RowBinLogEvent x
from :: forall x. RowBinLogEvent -> Rep RowBinLogEvent x
$cto :: forall x. Rep RowBinLogEvent x -> RowBinLogEvent
to :: forall x. Rep RowBinLogEvent x -> RowBinLogEvent
Generic)
decodeRowBinLogEvent :: (FormatDescription, IORef ByteString, InputStream BinLogPacket)
-> IO (InputStream RowBinLogEvent)
decodeRowBinLogEvent :: (FormatDescription, IORef ByteString, InputStream BinLogPacket)
-> IO (InputStream RowBinLogEvent)
decodeRowBinLogEvent (FormatDescription
fd', IORef ByteString
fref', InputStream BinLogPacket
is') = IO (Maybe RowBinLogEvent) -> IO (InputStream RowBinLogEvent)
forall a. IO (Maybe a) -> IO (InputStream a)
Stream.makeInputStream (FormatDescription
-> IORef ByteString
-> InputStream BinLogPacket
-> IO (Maybe RowBinLogEvent)
loop FormatDescription
fd' IORef ByteString
fref' InputStream BinLogPacket
is')
where
loop :: FormatDescription
-> IORef ByteString
-> InputStream BinLogPacket
-> IO (Maybe RowBinLogEvent)
loop FormatDescription
fd IORef ByteString
fref InputStream BinLogPacket
is = do
Maybe BinLogPacket
p <- InputStream BinLogPacket -> IO (Maybe BinLogPacket)
forall a. InputStream a -> IO (Maybe a)
Stream.read InputStream BinLogPacket
is
case Maybe BinLogPacket
p of
Maybe BinLogPacket
Nothing -> Maybe RowBinLogEvent -> IO (Maybe RowBinLogEvent)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe RowBinLogEvent
forall a. Maybe a
Nothing
Just BinLogPacket
p' -> do
let t :: BinLogEventType
t = BinLogPacket -> BinLogEventType
blEventType BinLogPacket
p'
if | BinLogEventType
t BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_ROWS_QUERY_EVENT -> do
BinLogTracker
tr <- BinLogPacket -> IORef ByteString -> IO BinLogTracker
track BinLogPacket
p' IORef ByteString
fref
QueryEvent'
e <- Get QueryEvent' -> BinLogPacket -> IO QueryEvent'
forall a. Get a -> BinLogPacket -> IO a
getFromBinLogPacket Get QueryEvent'
getQueryEvent' BinLogPacket
p'
Maybe RowBinLogEvent -> IO (Maybe RowBinLogEvent)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RowBinLogEvent -> Maybe RowBinLogEvent
forall a. a -> Maybe a
Just (Word32 -> BinLogTracker -> QueryEvent' -> RowBinLogEvent
RowQueryEvent (BinLogPacket -> Word32
blTimestamp BinLogPacket
p') BinLogTracker
tr QueryEvent'
e))
| BinLogEventType
t BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_TABLE_MAP_EVENT -> do
TableMapEvent
tme <- Get TableMapEvent -> BinLogPacket -> IO TableMapEvent
forall a. Get a -> BinLogPacket -> IO a
getFromBinLogPacket (FormatDescription -> Get TableMapEvent
getTableMapEvent FormatDescription
fd) BinLogPacket
p'
Maybe BinLogPacket
q <- InputStream BinLogPacket -> IO (Maybe BinLogPacket)
forall a. InputStream a -> IO (Maybe a)
Stream.read InputStream BinLogPacket
is
case Maybe BinLogPacket
q of
Maybe BinLogPacket
Nothing -> Maybe RowBinLogEvent -> IO (Maybe RowBinLogEvent)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe RowBinLogEvent
forall a. Maybe a
Nothing
Just BinLogPacket
q' -> do
let u :: BinLogEventType
u = BinLogPacket -> BinLogEventType
blEventType BinLogPacket
q'
if | BinLogEventType
u BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_WRITE_ROWS_EVENTv1 Bool -> Bool -> Bool
|| BinLogEventType
u BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_WRITE_ROWS_EVENTv2 -> do
BinLogTracker
tr <- BinLogPacket -> IORef ByteString -> IO BinLogTracker
track BinLogPacket
q' IORef ByteString
fref
WriteRowsEvent
e <- (BinLogEventType -> Get WriteRowsEvent)
-> BinLogPacket -> IO WriteRowsEvent
forall a. (BinLogEventType -> Get a) -> BinLogPacket -> IO a
getFromBinLogPacket' (FormatDescription
-> TableMapEvent -> BinLogEventType -> Get WriteRowsEvent
getWriteRowEvent FormatDescription
fd TableMapEvent
tme) BinLogPacket
q'
Maybe RowBinLogEvent -> IO (Maybe RowBinLogEvent)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RowBinLogEvent -> Maybe RowBinLogEvent
forall a. a -> Maybe a
Just (Word32
-> BinLogTracker
-> TableMapEvent
-> WriteRowsEvent
-> RowBinLogEvent
RowWriteEvent (BinLogPacket -> Word32
blTimestamp BinLogPacket
q') BinLogTracker
tr TableMapEvent
tme WriteRowsEvent
e))
| BinLogEventType
u BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_DELETE_ROWS_EVENTv1 Bool -> Bool -> Bool
|| BinLogEventType
u BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_DELETE_ROWS_EVENTv2 -> do
BinLogTracker
tr <- BinLogPacket -> IORef ByteString -> IO BinLogTracker
track BinLogPacket
q' IORef ByteString
fref
DeleteRowsEvent
e <- (BinLogEventType -> Get DeleteRowsEvent)
-> BinLogPacket -> IO DeleteRowsEvent
forall a. (BinLogEventType -> Get a) -> BinLogPacket -> IO a
getFromBinLogPacket' (FormatDescription
-> TableMapEvent -> BinLogEventType -> Get DeleteRowsEvent
getDeleteRowEvent FormatDescription
fd TableMapEvent
tme) BinLogPacket
q'
Maybe RowBinLogEvent -> IO (Maybe RowBinLogEvent)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RowBinLogEvent -> Maybe RowBinLogEvent
forall a. a -> Maybe a
Just (Word32
-> BinLogTracker
-> TableMapEvent
-> DeleteRowsEvent
-> RowBinLogEvent
RowDeleteEvent (BinLogPacket -> Word32
blTimestamp BinLogPacket
q') BinLogTracker
tr TableMapEvent
tme DeleteRowsEvent
e))
| BinLogEventType
u BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_UPDATE_ROWS_EVENTv1 Bool -> Bool -> Bool
|| BinLogEventType
u BinLogEventType -> BinLogEventType -> Bool
forall a. Eq a => a -> a -> Bool
== BinLogEventType
BINLOG_UPDATE_ROWS_EVENTv2 -> do
BinLogTracker
tr <- BinLogPacket -> IORef ByteString -> IO BinLogTracker
track BinLogPacket
q' IORef ByteString
fref
UpdateRowsEvent
e <- (BinLogEventType -> Get UpdateRowsEvent)
-> BinLogPacket -> IO UpdateRowsEvent
forall a. (BinLogEventType -> Get a) -> BinLogPacket -> IO a
getFromBinLogPacket' (FormatDescription
-> TableMapEvent -> BinLogEventType -> Get UpdateRowsEvent
getUpdateRowEvent FormatDescription
fd TableMapEvent
tme) BinLogPacket
q'
Maybe RowBinLogEvent -> IO (Maybe RowBinLogEvent)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (RowBinLogEvent -> Maybe RowBinLogEvent
forall a. a -> Maybe a
Just (Word32
-> BinLogTracker
-> TableMapEvent
-> UpdateRowsEvent
-> RowBinLogEvent
RowUpdateEvent (BinLogPacket -> Word32
blTimestamp BinLogPacket
q') BinLogTracker
tr TableMapEvent
tme UpdateRowsEvent
e))
| Bool
otherwise -> FormatDescription
-> IORef ByteString
-> InputStream BinLogPacket
-> IO (Maybe RowBinLogEvent)
loop FormatDescription
fd IORef ByteString
fref InputStream BinLogPacket
is
| Bool
otherwise -> FormatDescription
-> IORef ByteString
-> InputStream BinLogPacket
-> IO (Maybe RowBinLogEvent)
loop FormatDescription
fd IORef ByteString
fref InputStream BinLogPacket
is
track :: BinLogPacket -> IORef ByteString -> IO BinLogTracker
track BinLogPacket
p IORef ByteString
fref = ByteString -> Word32 -> BinLogTracker
BinLogTracker (ByteString -> Word32 -> BinLogTracker)
-> IO ByteString -> IO (Word32 -> BinLogTracker)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef ByteString -> IO ByteString
forall a. IORef a -> IO a
readIORef IORef ByteString
fref IO (Word32 -> BinLogTracker) -> IO Word32 -> IO BinLogTracker
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (Word32 -> IO Word32
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Word32 -> IO Word32)
-> (BinLogPacket -> Word32) -> BinLogPacket -> IO Word32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word64 -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Word32)
-> (BinLogPacket -> Word64) -> BinLogPacket -> Word32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BinLogPacket -> Word64
blLogPos) BinLogPacket
p
getLastBinLogTracker :: MySQLConn -> IO (Maybe BinLogTracker)
getLastBinLogTracker :: MySQLConn -> IO (Maybe BinLogTracker)
getLastBinLogTracker MySQLConn
conn = do
([ColumnDef]
_, InputStream [MySQLValue]
is) <- MySQLConn -> Query -> IO ([ColumnDef], InputStream [MySQLValue])
query_ MySQLConn
conn Query
"SHOW MASTER STATUS"
Maybe [MySQLValue]
row <- InputStream [MySQLValue] -> IO (Maybe [MySQLValue])
forall a. InputStream a -> IO (Maybe a)
Stream.read InputStream [MySQLValue]
is
InputStream [MySQLValue] -> IO ()
forall a. InputStream a -> IO ()
Stream.skipToEof InputStream [MySQLValue]
is
case Maybe [MySQLValue]
row of
Just (MySQLText Text
fn : MySQLInt64U Word64
pos : [MySQLValue]
_) -> Maybe BinLogTracker -> IO (Maybe BinLogTracker)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe BinLogTracker -> IO (Maybe BinLogTracker))
-> (BinLogTracker -> Maybe BinLogTracker)
-> BinLogTracker
-> IO (Maybe BinLogTracker)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BinLogTracker -> Maybe BinLogTracker
forall a. a -> Maybe a
Just (BinLogTracker -> IO (Maybe BinLogTracker))
-> BinLogTracker -> IO (Maybe BinLogTracker)
forall a b. (a -> b) -> a -> b
$ ByteString -> Word32 -> BinLogTracker
BinLogTracker (Text -> ByteString
encodeUtf8 Text
fn) (Word64 -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
pos)
Maybe [MySQLValue]
_ -> Maybe BinLogTracker -> IO (Maybe BinLogTracker)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe BinLogTracker
forall a. Maybe a
Nothing
isCheckSumEnabled :: MySQLConn -> IO Bool
isCheckSumEnabled :: MySQLConn -> IO Bool
isCheckSumEnabled MySQLConn
conn = do
([ColumnDef]
_, InputStream [MySQLValue]
is) <- MySQLConn -> Query -> IO ([ColumnDef], InputStream [MySQLValue])
query_ MySQLConn
conn Query
"SHOW GLOBAL VARIABLES LIKE 'binlog_checksum'"
Maybe [MySQLValue]
row <- InputStream [MySQLValue] -> IO (Maybe [MySQLValue])
forall a. InputStream a -> IO (Maybe a)
Stream.read InputStream [MySQLValue]
is
InputStream [MySQLValue] -> IO ()
forall a. InputStream a -> IO ()
Stream.skipToEof InputStream [MySQLValue]
is
case Maybe [MySQLValue]
row of
Just [MySQLValue
_, MySQLText Text
"CRC32"] -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Maybe [MySQLValue]
_ -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
isSemiSyncEnabled :: MySQLConn -> IO Bool
isSemiSyncEnabled :: MySQLConn -> IO Bool
isSemiSyncEnabled MySQLConn
conn = do
([ColumnDef]
_, InputStream [MySQLValue]
is) <- MySQLConn -> Query -> IO ([ColumnDef], InputStream [MySQLValue])
query_ MySQLConn
conn Query
"SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'"
Maybe [MySQLValue]
row <- InputStream [MySQLValue] -> IO (Maybe [MySQLValue])
forall a. InputStream a -> IO (Maybe a)
Stream.read InputStream [MySQLValue]
is
InputStream [MySQLValue] -> IO ()
forall a. InputStream a -> IO ()
Stream.skipToEof InputStream [MySQLValue]
is
case Maybe [MySQLValue]
row of
Just [MySQLValue
_, MySQLText Text
"ON"] -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Maybe [MySQLValue]
_ -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False