diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java index 85d5033e9..168883029 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/api/DebeziumEmbeddedRestApi.java @@ -42,8 +42,21 @@ public static void startRestApi(Properties props, Injector injector, Properties finalProps1 = props; app.get("/status", ctx -> { ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(finalProps1)); - - ctx.result(debeziumChangeEventCapture.getDebeziumStorageStatus(config, finalProps1)); + String response = ""; + + try { + response = debeziumChangeEventCapture.getDebeziumStorageStatus(config, finalProps1); + } catch (Exception e) { + log.error("Client - Error getting status", e); + // Create JSON response + JSONObject jsonObject = new JSONObject(); + jsonObject.put("error", e.toString()); + response = jsonObject.toJSONString(); + ctx.result(response); + ctx.status(HttpStatus.INTERNAL_SERVER_ERROR); + return; + } + ctx.result(response); }); diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index 03e5bcb5e..4374a405b 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -23,6 +23,7 @@ import io.debezium.engine.DebeziumEngine; import io.debezium.engine.spi.OffsetCommitPolicy; import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -341,29 +342,56 @@ private void createDatabaseForDebeziumStorage(ClickHouseSinkConnectorConfig conf } /** - * Function to get the status of Debezium storage. + * * @param props * @return */ - public String getDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Properties props) throws SQLException { - String response = ""; + private Pair getDebeziumStorageDatabaseName(Properties props) { + + String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name()); + // if tablename is dbname.tablename and contains a dot. + String databaseName = "system"; + // split tablename with dot. + if(tableName.contains(".")) { + String[] dbTableNameArray = tableName.split("\\."); + if(dbTableNameArray.length >= 2) { + databaseName = dbTableNameArray[0].replace("\"", ""); + tableName = dbTableNameArray[1].replace("\"", ""); + } + } + + return Pair.of(tableName, databaseName); + } + + /** + * Function to get the status of Debezium storage. + * @param props + * @return + */ + public String getDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Properties props) throws Exception { + String response = ""; + + Pair tableNameDatabaseName = getDebeziumStorageDatabaseName(props); + String tableName = tableNameDatabaseName.getLeft(); + String databaseName = tableNameDatabaseName.getRight(); + DBCredentials dbCredentials = parseDBConfiguration(config); if (writer == null || writer.getConnection().isClosed() == true) { // Json error string log.error("**** Connection to ClickHouse is not established, re-initiating ****"); String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(), - dbCredentials.getDatabase()); + databaseName); ClickHouseConnection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", dbCredentials.getUserName(), dbCredentials.getPassword(), config); writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(), - dbCredentials.getDatabase(), dbCredentials.getUserName(), + databaseName, dbCredentials.getUserName(), dbCredentials.getPassword(), config, conn); } //DBCredentials dbCredentials = parseDBConfiguration(config); - String debeziumStorageStatusQuery = String.format("select * from %s limit 1", tableName); + String debeziumStorageStatusQuery = String.format("select * from %s limit 1", databaseName + "." + tableName); ResultSet resultSet = writer.executeQueryWithResultSet(debeziumStorageStatusQuery); if(resultSet != null) { @@ -419,8 +447,12 @@ public long getLatestRecordTimestamp(ClickHouseSinkConnectorConfig config, Prope long result = -1; DBCredentials dbCredentials = parseDBConfiguration(config); + Pair tableNameDatabaseName = getDebeziumStorageDatabaseName(props); + String tableName = tableNameDatabaseName.getLeft(); + String databaseName = tableNameDatabaseName.getRight(); + BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(), - dbCredentials.getDatabase(), dbCredentials.getUserName(), + databaseName, dbCredentials.getUserName(), dbCredentials.getPassword(), config, this.conn); String latestRecordTs = new DebeziumOffsetStorage().getDebeziumLatestRecordTimestamp(props, writer); @@ -453,12 +485,14 @@ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pr String binlogFile, String binLogPosition, String gtid) throws SQLException, ParseException { - String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + - JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name()); + Pair tableNameDatabaseName = getDebeziumStorageDatabaseName(props); + String tableName = tableNameDatabaseName.getLeft(); + String databaseName = tableNameDatabaseName.getRight(); + DBCredentials dbCredentials = parseDBConfiguration(config); BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(), - dbCredentials.getDatabase(), dbCredentials.getUserName(), + databaseName, dbCredentials.getUserName(), dbCredentials.getPassword(), config, this.conn); String offsetValue = new DebeziumOffsetStorage().getDebeziumStorageStatusQuery(props, writer); @@ -484,12 +518,13 @@ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pr String lsn) throws SQLException, ParseException { - String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + - JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name()); + Pair tableNameDatabaseName = getDebeziumStorageDatabaseName(props); + String tableName = tableNameDatabaseName.getLeft(); + String databaseName = tableNameDatabaseName.getRight(); DBCredentials dbCredentials = parseDBConfiguration(config); BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(), - dbCredentials.getDatabase(), dbCredentials.getUserName(), + databaseName, dbCredentials.getUserName(), dbCredentials.getPassword(), config, this.conn); String offsetValue = new DebeziumOffsetStorage().getDebeziumStorageStatusQuery(props, writer); @@ -714,6 +749,7 @@ DBCredentials parseDBConfiguration(ClickHouseSinkConnectorConfig config) { dbCredentials.setPort(config.getInt(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_PORT.toString())); dbCredentials.setUserName(config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_USER.toString())); dbCredentials.setPassword(config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_PASS.toString())); + dbCredentials.setDatabase("system"); return dbCredentials; } diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java index eef531303..0fb0f61c4 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumOffsetStorage.java @@ -86,7 +86,7 @@ public String getDebeziumStorageStatusQuery( */ public String updateBinLogInformation(String record, String binLogFile, String binLogPosition, String gtids) throws ParseException { JSONObject jsonObject = new JSONObject(); - if(record != null || !record.isEmpty()) { + if(record != null && !record.isEmpty()) { jsonObject = (JSONObject) new JSONParser().parse(record); } else { jsonObject.put("ts_sec", System.currentTimeMillis() / 1000);