Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Strings;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -275,7 +276,7 @@ public boolean isGtidModeEnabled() {
public String knownGtidSet() {
try {
return queryAndMap(
"SHOW MASTER STATUS",
DebeziumUtils.getBinlogStatusCommand(this),
rs -> {
if (rs.next() && rs.getMetaData().getColumnCount() > 4) {
return rs.getString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,17 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.Predicate;

/** Utilities related to Debezium. */
public class DebeziumUtils {
private static final String QUOTED_CHARACTER = "`";
private static final String SQL_SELECT_VERSION = "SELECT VERSION()";
private static final String SQL_SHOW_MASTER_STATUS = "SHOW MASTER STATUS";
private static final String SQL_SHOW_BINARY_LOG_STATUS = "SHOW BINARY LOG STATUS";

private static final Logger LOG = LoggerFactory.getLogger(DebeziumUtils.class);

Expand Down Expand Up @@ -118,7 +122,7 @@ public static MySqlDatabaseSchema createMySqlDatabaseSchema(

/** Fetch current binlog offsets in MySql Server. */
public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) {
final String showMasterStmt = "SHOW MASTER STATUS";
final String showMasterStmt = getBinlogStatusCommand(jdbc);
try {
return jdbc.queryAndMap(
showMasterStmt,
Expand Down Expand Up @@ -336,4 +340,44 @@ private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile
}
return binlogTimestamps.take();
}

/**
* Get the appropriate SHOW BINLOG STATUS command based on MySQL version.
* MySQL 8.4+ uses SHOW BINARY LOG STATUS, earlier versions use SHOW MASTER STATUS.
*
* @param jdbc the JDBC connection
* @return the appropriate command string
*/
public static String getBinlogStatusCommand(JdbcConnection jdbc) {
try {
String version = jdbc.queryAndMap(
SQL_SELECT_VERSION,
rs -> rs.next() ? rs.getString(1) : "");

// Parse version numbers (major and minor)
Integer[] versionNumbers = Arrays.stream(version.split("\\."))
.limit(2)
.map(s -> {
try {
// Handle version strings like "8.4.0-1.el8"
String numStr = s.split("-")[0];
return Integer.parseInt(numStr);
} catch (NumberFormatException e) {
// If version number is not numeric, treat as 0 (fallback to old command)
LOG.warn("Failed to parse MySQL version component '{}', treating as 0", s);
return 0;
}
})
.toArray(Integer[]::new);

// MySQL 8.4+ removed SHOW MASTER STATUS, use SHOW BINARY LOG STATUS instead
boolean useBinaryLogStatus = (versionNumbers[0] == 8 && versionNumbers[1] >= 4)
|| versionNumbers[0] > 8;

return useBinaryLogStatus ? SQL_SHOW_BINARY_LOG_STATUS : SQL_SHOW_MASTER_STATUS;
} catch (Exception e) {
LOG.warn("Unexpected error while determining MySQL version, using SHOW MASTER STATUS", e);
return SQL_SHOW_MASTER_STATUS;
}
}
}