Skip to content

Commit

Permalink
Merge pull request #658 from Altinity/handle_sink_connector_client_st…
Browse files Browse the repository at this point in the history
…atus_errors

Return error code when there is an error retrieving show_replica_status
  • Loading branch information
subkanthi authored Jun 28, 2024
2 parents 1b8bb34 + 6922690 commit 346ca12
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,21 @@ public static void startRestApi(Properties props, Injector injector,
Properties finalProps1 = props;
app.get("/status", ctx -> {
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(finalProps1));

ctx.result(debeziumChangeEventCapture.getDebeziumStorageStatus(config, finalProps1));
String response = "";

try {
response = debeziumChangeEventCapture.getDebeziumStorageStatus(config, finalProps1);
} catch (Exception e) {
log.error("Client - Error getting status", e);
// Create JSON response
JSONObject jsonObject = new JSONObject();
jsonObject.put("error", e.toString());
response = jsonObject.toJSONString();
ctx.result(response);
ctx.status(HttpStatus.INTERNAL_SERVER_ERROR);
return;
}
ctx.result(response);

});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -341,29 +342,56 @@ private void createDatabaseForDebeziumStorage(ClickHouseSinkConnectorConfig conf
}

/**
* Function to get the status of Debezium storage.
*
* @param props
* @return
*/
public String getDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Properties props) throws SQLException {
String response = "";
private Pair<String, String> getDebeziumStorageDatabaseName(Properties props) {


String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
// if tablename is dbname.tablename and contains a dot.
String databaseName = "system";
// split tablename with dot.
if(tableName.contains(".")) {
String[] dbTableNameArray = tableName.split("\\.");
if(dbTableNameArray.length >= 2) {
databaseName = dbTableNameArray[0].replace("\"", "");
tableName = dbTableNameArray[1].replace("\"", "");
}
}

return Pair.of(tableName, databaseName);
}

/**
* Function to get the status of Debezium storage.
* @param props
* @return
*/
public String getDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Properties props) throws Exception {
String response = "";

Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

DBCredentials dbCredentials = parseDBConfiguration(config);

if (writer == null || writer.getConnection().isClosed() == true) {
// Json error string
log.error("**** Connection to ClickHouse is not established, re-initiating ****");
String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase());
databaseName);
ClickHouseConnection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
dbCredentials.getUserName(), dbCredentials.getPassword(), config);
writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase(), dbCredentials.getUserName(),
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, conn);
}
//DBCredentials dbCredentials = parseDBConfiguration(config);
String debeziumStorageStatusQuery = String.format("select * from %s limit 1", tableName);
String debeziumStorageStatusQuery = String.format("select * from %s limit 1", databaseName + "." + tableName);
ResultSet resultSet = writer.executeQueryWithResultSet(debeziumStorageStatusQuery);

if(resultSet != null) {
Expand Down Expand Up @@ -419,8 +447,12 @@ public long getLatestRecordTimestamp(ClickHouseSinkConnectorConfig config, Prope
long result = -1;
DBCredentials dbCredentials = parseDBConfiguration(config);

Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase(), dbCredentials.getUserName(),
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);

String latestRecordTs = new DebeziumOffsetStorage().getDebeziumLatestRecordTimestamp(props, writer);
Expand Down Expand Up @@ -453,12 +485,14 @@ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pr
String binlogFile, String binLogPosition, String gtid) throws SQLException, ParseException {


String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

DBCredentials dbCredentials = parseDBConfiguration(config);

BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase(), dbCredentials.getUserName(),
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);
String offsetValue = new DebeziumOffsetStorage().getDebeziumStorageStatusQuery(props, writer);

Expand All @@ -484,12 +518,13 @@ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pr
String lsn) throws SQLException, ParseException {


String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();
DBCredentials dbCredentials = parseDBConfiguration(config);

BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase(), dbCredentials.getUserName(),
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);
String offsetValue = new DebeziumOffsetStorage().getDebeziumStorageStatusQuery(props, writer);

Expand Down Expand Up @@ -714,6 +749,7 @@ DBCredentials parseDBConfiguration(ClickHouseSinkConnectorConfig config) {
dbCredentials.setPort(config.getInt(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_PORT.toString()));
dbCredentials.setUserName(config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_USER.toString()));
dbCredentials.setPassword(config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_PASS.toString()));
dbCredentials.setDatabase("system");

return dbCredentials;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public String getDebeziumStorageStatusQuery(
*/
public String updateBinLogInformation(String record, String binLogFile, String binLogPosition, String gtids) throws ParseException {
JSONObject jsonObject = new JSONObject();
if(record != null || !record.isEmpty()) {
if(record != null && !record.isEmpty()) {
jsonObject = (JSONObject) new JSONParser().parse(record);
} else {
jsonObject.put("ts_sec", System.currentTimeMillis() / 1000);
Expand Down

0 comments on commit 346ca12

Please sign in to comment.