Skip to content

Commit

Permalink
Introduce BinlogStreamRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
blackbeam committed Oct 23, 2023
1 parent 02e47d9 commit 439fec4
Showing 1 changed file with 109 additions and 21 deletions.
130 changes: 109 additions & 21 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ use mysql_common::{
crypto,
io::ParseBuf,
packets::{
binlog_request::BinlogRequest, AuthPlugin, AuthSwitchRequest, CommonOkPacket, ErrPacket,
HandshakePacket, HandshakeResponse, OkPacket, OkPacketDeserializer, OldAuthSwitchRequest,
OldEofPacket, ResultSetTerminator, SslRequest,
binlog_request::BinlogRequest, AuthPlugin, AuthSwitchRequest, BinlogDumpFlags,
ComRegisterSlave, CommonOkPacket, ErrPacket, HandshakePacket, HandshakeResponse, OkPacket,
OkPacketDeserializer, OldAuthSwitchRequest, OldEofPacket, ResultSetTerminator, Sid,
SslRequest,
},
proto::MySerialize,
row::Row,
Expand Down Expand Up @@ -1260,32 +1261,119 @@ impl Conn {
Ok(self)
}

async fn register_as_slave(&mut self, server_id: u32) -> Result<()> {
use mysql_common::packets::ComRegisterSlave;

async fn register_as_slave(&mut self, com_register_slave: ComRegisterSlave<'_>) -> Result<()> {
self.query_drop("SET @master_binlog_checksum='ALL'").await?;
self.write_command(&ComRegisterSlave::new(server_id))
.await?;
self.write_command(&com_register_slave).await?;

// Server will respond with OK.
self.read_packet().await?;

Ok(())
}

async fn request_binlog(&mut self, request: BinlogRequest<'_>) -> Result<()> {
self.register_as_slave(request.server_id()).await?;
self.write_command(&request.as_cmd()).await?;
async fn request_binlog(&mut self, request: BinlogStreamRequest<'_>) -> Result<()> {
self.register_as_slave(request.register_slave).await?;
self.write_command(&request.binlog_request.as_cmd()).await?;
Ok(())
}

pub async fn get_binlog_stream(mut self, request: BinlogRequest<'_>) -> Result<BinlogStream> {
/// Turns this connection into a binlog stream.
///
/// You can use SHOW BINARY LOGS to get the current logfile and position from the master.
/// If the request’s filename is empty, the server will send the binlog-stream of the first known binlog.
pub async fn get_binlog_stream(
mut self,
request: BinlogStreamRequest<'_>,
) -> Result<BinlogStream> {
self.request_binlog(request).await?;

Ok(BinlogStream::new(self))
}
}

/// Binlog stream request builder.
pub struct BinlogStreamRequest<'a> {
binlog_request: BinlogRequest<'a>,
register_slave: ComRegisterSlave<'a>,
}

impl<'a> BinlogStreamRequest<'a> {
/// Creates a new request with the given slave server id.
pub fn new(server_id: u32) -> Self {
Self {
binlog_request: BinlogRequest::new(server_id),
register_slave: ComRegisterSlave::new(server_id),
}
}

/// Enables GTID-based replication (disabled by default).
pub fn with_gtid(mut self) -> Self {
self.binlog_request = self.binlog_request.with_use_gtid(true);
self
}

/// Enables `NON_BLOCK` flag. Stream will be terminated as soon as there are no events.
pub fn with_non_blocking(mut self) -> Self {
self.binlog_request = self
.binlog_request
.with_flags(BinlogDumpFlags::BINLOG_DUMP_NON_BLOCK);
self
}

/// Sets the filename of the binlog on the master (try `SHOW BINARY LOGS`).
pub fn with_filename(mut self, filename: &'a [u8]) -> Self {
self.binlog_request = self.binlog_request.with_filename(filename);
self
}

/// Sets the start position (defaults to `4`).
pub fn with_pos(mut self, position: u64) -> Self {
self.binlog_request = self.binlog_request.with_pos(position);
self
}

/// Adds the given set of GTIDs to the request (ignored if not GTID-based).
pub fn with_gtid_set<T>(mut self, set: T) -> Self
where
T: IntoIterator<Item = Sid<'a>>,
{
self.binlog_request = self.binlog_request.with_sids(set);
self
}

/// This hostname will be reported to the server (max len 255, default to an empty string).
///
/// Usually left default.
pub fn with_hostname(mut self, hostname: &'a [u8]) -> Self {
self.register_slave = self.register_slave.with_hostname(hostname);
self
}

/// This username will be reported to the server (max len 255, default to an empty string).
///
/// Usually left default.
pub fn with_user(mut self, user: &'a [u8]) -> Self {
self.register_slave = self.register_slave.with_user(user);
self
}

/// This password will be reported to the server (max len 255, default to an empty string).
///
/// Usually left default.
pub fn with_password(mut self, password: &'a [u8]) -> Self {
self.register_slave = self.register_slave.with_password(password);
self
}

/// This port number will be reported to the server (defaults to `0`).
///
/// Usually left default.
pub fn with_port(mut self, port: u16) -> Self {
self.register_slave = self.register_slave.with_port(port);
self
}
}

#[cfg(test)]
mod test {
use bytes::Bytes;
Expand All @@ -1297,7 +1385,7 @@ mod test {
use std::time::Duration;

use crate::{
from_row, params, prelude::*, test_misc::get_opts, BinlogDumpFlags, BinlogRequest,
conn::BinlogStreamRequest, from_row, params, prelude::*, test_misc::get_opts,
ChangeUserOpts, Conn, Error, OptsBuilder, Pool, Value, WhiteListFsHandler,
};

Expand Down Expand Up @@ -1429,8 +1517,8 @@ mod test {

let mut binlog_stream = conn
.get_binlog_stream(
BinlogRequest::new(binlog_server_ids.0)
.with_filename(filename)
BinlogStreamRequest::new(binlog_server_ids.0)
.with_filename(&filename)
.with_pos(pos),
)
.await
Expand Down Expand Up @@ -1467,9 +1555,9 @@ mod test {

let mut binlog_stream = conn
.get_binlog_stream(
BinlogRequest::new(binlog_server_ids.1)
.with_use_gtid(true)
.with_filename(filename)
BinlogStreamRequest::new(binlog_server_ids.1)
.with_gtid()
.with_filename(&filename)
.with_pos(pos),
)
.await
Expand Down Expand Up @@ -1507,10 +1595,10 @@ mod test {

let mut binlog_stream = conn
.get_binlog_stream(
BinlogRequest::new(binlog_server_ids.2)
.with_filename(filename)
BinlogStreamRequest::new(binlog_server_ids.2)
.with_filename(&filename)
.with_pos(pos)
.with_flags(BinlogDumpFlags::BINLOG_DUMP_NON_BLOCK),
.with_non_blocking(),
)
.await
.unwrap();
Expand Down

0 comments on commit 439fec4

Please sign in to comment.