Skip to content

Commit

Permalink
Merge pull request #779 from Altinity/773-add-sink-connector-client-s…
Browse files Browse the repository at this point in the history
…tep-to-purge-schema-history-so-that-newer-tables-can-be-added

773 add sink connector client step to purge schema history so that newer tables can be added
  • Loading branch information
subkanthi authored Oct 16, 2024
2 parents ad4eebe + 671ac16 commit e67e180
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 9 deletions.
37 changes: 37 additions & 0 deletions sink-connector-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
UPDATE_BINLOG_COMMAND = "change_replication_source"
UPDATE_LSN_COMMAND = "lsn"
DELETE_OFFSETS_COMMAND = "delete_offsets"
DELETE_SCHEMA_HISTORY_COMMAND = "delete_schema_history"
)

const (
Expand All @@ -44,6 +45,7 @@ const (
UPDATE_BINLOG = "binlog"
UPDATE_LSN = "lsn"
DELETE_OFFSETS = "offsets"
DELETE_SCHEMA_HISTORY = "schema-history"
)

// Fetches the repos for the given Github users
Expand Down Expand Up @@ -234,6 +236,14 @@ func main() {
return nil
},
},
{
Name: DELETE_SCHEMA_HISTORY_COMMAND,
Usage: "Delete schema history from the sink connector",
Action: func(c *cli.Context) error {
handleDeleteSchemaHistory(c)
return nil
},
},
}
app.Version = "1.0"
app.Run(os.Args)
Expand Down Expand Up @@ -264,6 +274,33 @@ func handleDeleteOffsets(c *cli.Context) bool {
return false
}
}

func handleDeleteSchemaHistory(c *cli.Context) bool {
log.Println("***** Delete schema history from the sink connector *****")
log.Println("Are you sure you want to continue? (y/n): ")
var userInput string
fmt.Scanln(&userInput)
if userInput != "y" {
log.Println("Exiting...")
return false
} else {
log.Println("Continuing...")
}
// Call a REST DELETE API to delete offsets from the sink connector
var deleteOffsetsUrl = getServerUrl(DELETE_SCHEMA_HISTORY, c)
log.Println("Sending request to URL: " + deleteOffsetsUrl)
resp := getHTTPDeleteCall(deleteOffsetsUrl)
time.Sleep(5 * time.Second)
if resp.StatusCode == 200 {
log.Println("Schema history deleted successfully")
return true
} else {
log.Println("Response Status Code:", resp.StatusCode)
log.Println("Error deleting schema history")
return false
}
}

func handleUpdateLsn(c *cli.Context) bool {
var lsnPosition = c.String("lsn")
log.Println("***** lsn position:", lsnPosition+" *****")
Expand Down
Binary file modified sink-connector-client/sink-connector-client
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,22 @@ public static void startRestApi(Properties props, Injector injector,
gtid);
log.info("Received update-binlog request: " + body);
});
//Delete offsets
app.delete("/schema-history", ctx -> {
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(finalProps1));
String response = "";

try {
debeziumChangeEventCapture.deleteSchemaHistory(config, finalProps1);
} catch (Exception e) {
log.error("Client - Error deleting offsets", e);
ctx.result(e.toString());
ctx.status(HttpStatus.INTERNAL_SERVER_ERROR);
return;
}
ctx.result(response);

});

app.post("/lsn", ctx -> {
String body = ctx.body();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.clickhouse.jdbc.ClickHouseConnection;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.embedded.Connect;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
Expand Down Expand Up @@ -345,7 +346,7 @@ private void createDatabaseForDebeziumStorage(ClickHouseSinkConnectorConfig conf
"system", dbCredentials.getUserName(),
dbCredentials.getPassword(), config, conn);

Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
Pair<String, String> tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props);
String databaseName = tableNameDatabaseName.getRight();

String createDbQuery = String.format("create database if not exists %s", databaseName);
Expand Down Expand Up @@ -376,7 +377,7 @@ private void createViewForShowReplicaStatus(ClickHouseSinkConnectorConfig config
BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
"system", dbCredentials.getUserName(),
dbCredentials.getPassword(), config, conn);
Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
Pair<String, String> tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props);

String tableName = tableNameDatabaseName.getLeft();
String dbName = tableNameDatabaseName.getRight();
Expand All @@ -392,15 +393,30 @@ private void createViewForShowReplicaStatus(ClickHouseSinkConnectorConfig config
}

/**
*
* Function to get the database name and table name for the offset storage table.
* @param props
* @return
*/
private Pair<String, String> getDebeziumStorageDatabaseName(Properties props) {
private Pair<String, String> getDebeziumOffsetStorageDatabaseName(Properties props) {


String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
return splitTableName(tableName);
}

/**
*
* @param props
* @return
*/
private Pair<String, String> getDebeziumSchemaHistoryDatabaseName(Properties props) {
String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
return splitTableName(tableName);
}

private Pair<String, String> splitTableName(String tableName) {
// if tablename is dbname.tablename and contains a dot.
String databaseName = "system";
// split tablename with dot.
Expand All @@ -414,7 +430,6 @@ private Pair<String, String> getDebeziumStorageDatabaseName(Properties props) {

return Pair.of(tableName, databaseName);
}

/**
* Function to delete offsets from Debezium storage.
* @param props
Expand All @@ -437,7 +452,7 @@ public void deleteOffsets(Properties props) throws SQLException {
public String getDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Properties props) throws Exception {
String response = "";

Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
Pair<String, String> tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

Expand Down Expand Up @@ -515,7 +530,7 @@ public long getLatestRecordTimestamp(ClickHouseSinkConnectorConfig config, Prope
long result = -1;
DBCredentials dbCredentials = parseDBConfiguration(config);

Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
Pair<String, String> tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

Expand Down Expand Up @@ -553,7 +568,7 @@ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pr
String binlogFile, String binLogPosition, String gtid) throws SQLException, ParseException {


Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
Pair<String, String> tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

Expand All @@ -574,6 +589,28 @@ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pr

}

/**
*
* @param config
* @param props
* @throws SQLException
*/
public void deleteSchemaHistory(ClickHouseSinkConnectorConfig config, Properties props) throws SQLException {
DBCredentials dbCredentials = parseDBConfiguration(config);
Pair<String, String> tableNameDatabaseName = getDebeziumSchemaHistoryDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);

// Get topic.prefix from config
String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name());
new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableNameDatabaseName.getRight() + "."
+ tableNameDatabaseName.getLeft(),writer);

}
/**
* Function to update the status of Debezium storage (LSN).
* @param config
Expand All @@ -586,7 +623,7 @@ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pr
String lsn) throws SQLException, ParseException {


Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
Pair<String, String> tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();
DBCredentials dbCredentials = parseDBConfiguration(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ public void deleteOffsetStorageRow(String offsetKey,
writer.executeQuery(debeziumStorageStatusQuery);
}

/**
* Function to truncate the schema history table
* @param offsetKey
* @param writer
* @throws SQLException
*/
public void deleteSchemaHistoryTable(String offsetKey,
String tableName,
BaseDbWriter writer) throws SQLException {


String debeziumStorageStatusQuery = String.format("delete from %s where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='\"%s\"" , tableName, offsetKey);
writer.executeQuery(debeziumStorageStatusQuery);
}
/**
* Function to get the latest timestamp of the record in the table
* @param props
Expand Down

0 comments on commit e67e180

Please sign in to comment.