Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return error code when there is an error retrieving show_replica_status #658

Merged
merged 4 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,21 @@ public static void startRestApi(Properties props, Injector injector,
Properties finalProps1 = props;
app.get("/status", ctx -> {
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(finalProps1));

ctx.result(debeziumChangeEventCapture.getDebeziumStorageStatus(config, finalProps1));
String response = "";

try {
response = debeziumChangeEventCapture.getDebeziumStorageStatus(config, finalProps1);
} catch (Exception e) {
log.error("Client - Error getting status", e);
// Create JSON response
JSONObject jsonObject = new JSONObject();
jsonObject.put("error", e.toString());
response = jsonObject.toJSONString();
ctx.result(response);
ctx.status(HttpStatus.INTERNAL_SERVER_ERROR);
return;
}
ctx.result(response);

});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -341,29 +342,56 @@ private void createDatabaseForDebeziumStorage(ClickHouseSinkConnectorConfig conf
}

/**
* Function to get the status of Debezium storage.
*
* @param props
* @return
*/
public String getDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Properties props) throws SQLException {
String response = "";
private Pair<String, String> getDebeziumStorageDatabaseName(Properties props) {


String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
// if tablename is dbname.tablename and contains a dot.
String databaseName = "system";
// split tablename with dot.
if(tableName.contains(".")) {
String[] dbTableNameArray = tableName.split("\\.");
if(dbTableNameArray.length >= 2) {
databaseName = dbTableNameArray[0].replace("\"", "");
tableName = dbTableNameArray[1].replace("\"", "");
}
}

return Pair.of(tableName, databaseName);
}

/**
* Function to get the status of Debezium storage.
* @param props
* @return
*/
public String getDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Properties props) throws Exception {
String response = "";

Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

DBCredentials dbCredentials = parseDBConfiguration(config);

if (writer == null || writer.getConnection().isClosed() == true) {
// Json error string
log.error("**** Connection to ClickHouse is not established, re-initiating ****");
String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase());
databaseName);
ClickHouseConnection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
dbCredentials.getUserName(), dbCredentials.getPassword(), config);
writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase(), dbCredentials.getUserName(),
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, conn);
}
//DBCredentials dbCredentials = parseDBConfiguration(config);
String debeziumStorageStatusQuery = String.format("select * from %s limit 1", tableName);
String debeziumStorageStatusQuery = String.format("select * from %s limit 1", databaseName + "." + tableName);
ResultSet resultSet = writer.executeQueryWithResultSet(debeziumStorageStatusQuery);

if(resultSet != null) {
Expand Down Expand Up @@ -419,8 +447,12 @@ public long getLatestRecordTimestamp(ClickHouseSinkConnectorConfig config, Prope
long result = -1;
DBCredentials dbCredentials = parseDBConfiguration(config);

Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase(), dbCredentials.getUserName(),
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);

String latestRecordTs = new DebeziumOffsetStorage().getDebeziumLatestRecordTimestamp(props, writer);
Expand Down Expand Up @@ -453,12 +485,14 @@ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pr
String binlogFile, String binLogPosition, String gtid) throws SQLException, ParseException {


String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

DBCredentials dbCredentials = parseDBConfiguration(config);

BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase(), dbCredentials.getUserName(),
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);
String offsetValue = new DebeziumOffsetStorage().getDebeziumStorageStatusQuery(props, writer);

Expand All @@ -484,12 +518,13 @@ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pr
String lsn) throws SQLException, ParseException {


String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();
DBCredentials dbCredentials = parseDBConfiguration(config);

BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase(), dbCredentials.getUserName(),
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);
String offsetValue = new DebeziumOffsetStorage().getDebeziumStorageStatusQuery(props, writer);

Expand Down Expand Up @@ -714,6 +749,7 @@ DBCredentials parseDBConfiguration(ClickHouseSinkConnectorConfig config) {
dbCredentials.setPort(config.getInt(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_PORT.toString()));
dbCredentials.setUserName(config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_USER.toString()));
dbCredentials.setPassword(config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_PASS.toString()));
dbCredentials.setDatabase("system");

return dbCredentials;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public String getDebeziumStorageStatusQuery(
*/
public String updateBinLogInformation(String record, String binLogFile, String binLogPosition, String gtids) throws ParseException {
JSONObject jsonObject = new JSONObject();
if(record != null || !record.isEmpty()) {
if(record != null && !record.isEmpty()) {
jsonObject = (JSONObject) new JSONParser().parse(record);
} else {
jsonObject.put("ts_sec", System.currentTimeMillis() / 1000);
Expand Down
Loading