Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

773 add sink connector client step to purge schema history so that newer tables can be added #779

37 changes: 37 additions & 0 deletions sink-connector-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
UPDATE_BINLOG_COMMAND = "change_replication_source"
UPDATE_LSN_COMMAND = "lsn"
DELETE_OFFSETS_COMMAND = "delete_offsets"
DELETE_SCHEMA_HISTORY_COMMAND = "delete_schema_history"
)

const (
Expand All @@ -44,6 +45,7 @@ const (
UPDATE_BINLOG = "binlog"
UPDATE_LSN = "lsn"
DELETE_OFFSETS = "offsets"
DELETE_SCHEMA_HISTORY = "schema-history"
)

// Fetches the repos for the given Github users
Expand Down Expand Up @@ -234,6 +236,14 @@ func main() {
return nil
},
},
{
Name: DELETE_SCHEMA_HISTORY_COMMAND,
Usage: "Delete schema history from the sink connector",
Action: func(c *cli.Context) error {
handleDeleteSchemaHistory(c)
return nil
},
},
}
app.Version = "1.0"
app.Run(os.Args)
Expand Down Expand Up @@ -264,6 +274,33 @@ func handleDeleteOffsets(c *cli.Context) bool {
return false
}
}

func handleDeleteSchemaHistory(c *cli.Context) bool {
log.Println("***** Delete schema history from the sink connector *****")
log.Println("Are you sure you want to continue? (y/n): ")
var userInput string
fmt.Scanln(&userInput)
if userInput != "y" {
log.Println("Exiting...")
return false
} else {
log.Println("Continuing...")
}
// Call a REST DELETE API to delete offsets from the sink connector
var deleteOffsetsUrl = getServerUrl(DELETE_SCHEMA_HISTORY, c)
log.Println("Sending request to URL: " + deleteOffsetsUrl)
resp := getHTTPDeleteCall(deleteOffsetsUrl)
time.Sleep(5 * time.Second)
if resp.StatusCode == 200 {
log.Println("Schema history deleted successfully")
return true
} else {
log.Println("Response Status Code:", resp.StatusCode)
log.Println("Error deleting schema history")
return false
}
}

func handleUpdateLsn(c *cli.Context) bool {
var lsnPosition = c.String("lsn")
log.Println("***** lsn position:", lsnPosition+" *****")
Expand Down
Binary file modified sink-connector-client/sink-connector-client
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,22 @@ public static void startRestApi(Properties props, Injector injector,
gtid);
log.info("Received update-binlog request: " + body);
});
//Delete offsets
app.delete("/schema-history", ctx -> {
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(finalProps1));
String response = "";

try {
debeziumChangeEventCapture.deleteSchemaHistory(config, finalProps1);
} catch (Exception e) {
log.error("Client - Error deleting offsets", e);
ctx.result(e.toString());
ctx.status(HttpStatus.INTERNAL_SERVER_ERROR);
return;
}
ctx.result(response);

});

app.post("/lsn", ctx -> {
String body = ctx.body();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.clickhouse.jdbc.ClickHouseConnection;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.embedded.Connect;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
Expand All @@ -30,7 +31,6 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.units.qual.C;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
Expand Down Expand Up @@ -317,7 +317,7 @@ private void createDatabaseForDebeziumStorage(ClickHouseSinkConnectorConfig conf
"system", dbCredentials.getUserName(),
dbCredentials.getPassword(), config, conn);

Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
Pair<String, String> tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props);
String databaseName = tableNameDatabaseName.getRight();

String createDbQuery = String.format("create database if not exists %s", databaseName);
Expand Down Expand Up @@ -348,7 +348,7 @@ private void createViewForShowReplicaStatus(ClickHouseSinkConnectorConfig config
BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
"system", dbCredentials.getUserName(),
dbCredentials.getPassword(), config, conn);
Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
Pair<String, String> tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props);

String tableName = tableNameDatabaseName.getLeft();
String dbName = tableNameDatabaseName.getRight();
Expand All @@ -364,15 +364,30 @@ private void createViewForShowReplicaStatus(ClickHouseSinkConnectorConfig config
}

/**
*
* Function to get the database name and table name for the offset storage table.
* @param props
* @return
*/
private Pair<String, String> getDebeziumStorageDatabaseName(Properties props) {
private Pair<String, String> getDebeziumOffsetStorageDatabaseName(Properties props) {


String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
return splitTableName(tableName);
}

/**
*
* @param props
* @return
*/
private Pair<String, String> getDebeziumSchemaHistoryDatabaseName(Properties props) {
String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
return splitTableName(tableName);
}

private Pair<String, String> splitTableName(String tableName) {
// if tablename is dbname.tablename and contains a dot.
String databaseName = "system";
// split tablename with dot.
Expand All @@ -386,7 +401,6 @@ private Pair<String, String> getDebeziumStorageDatabaseName(Properties props) {

return Pair.of(tableName, databaseName);
}

/**
* Function to delete offsets from Debezium storage.
* @param props
Expand All @@ -409,7 +423,7 @@ public void deleteOffsets(Properties props) throws SQLException {
public String getDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Properties props) throws Exception {
String response = "";

Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
Pair<String, String> tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

Expand Down Expand Up @@ -487,7 +501,7 @@ public long getLatestRecordTimestamp(ClickHouseSinkConnectorConfig config, Prope
long result = -1;
DBCredentials dbCredentials = parseDBConfiguration(config);

Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
Pair<String, String> tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

Expand Down Expand Up @@ -525,7 +539,7 @@ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pr
String binlogFile, String binLogPosition, String gtid) throws SQLException, ParseException {


Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
Pair<String, String> tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

Expand All @@ -546,6 +560,28 @@ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pr

}

/**
*
* @param config
* @param props
* @throws SQLException
*/
public void deleteSchemaHistory(ClickHouseSinkConnectorConfig config, Properties props) throws SQLException {
DBCredentials dbCredentials = parseDBConfiguration(config);
Pair<String, String> tableNameDatabaseName = getDebeziumSchemaHistoryDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();

BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);

// Get topic.prefix from config
String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name());
new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableNameDatabaseName.getRight() + "."
+ tableNameDatabaseName.getLeft(),writer);

}
/**
* Function to update the status of Debezium storage (LSN).
* @param config
Expand All @@ -558,7 +594,7 @@ public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pr
String lsn) throws SQLException, ParseException {


Pair<String, String> tableNameDatabaseName = getDebeziumStorageDatabaseName(props);
Pair<String, String> tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props);
String tableName = tableNameDatabaseName.getLeft();
String databaseName = tableNameDatabaseName.getRight();
DBCredentials dbCredentials = parseDBConfiguration(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ public void deleteOffsetStorageRow(String offsetKey,
writer.executeQuery(debeziumStorageStatusQuery);
}

/**
* Function to truncate the schema history table
* @param offsetKey
* @param writer
* @throws SQLException
*/
public void deleteSchemaHistoryTable(String offsetKey,
String tableName,
BaseDbWriter writer) throws SQLException {


String debeziumStorageStatusQuery = String.format("delete from %s where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='\"%s\"" , tableName, offsetKey);
writer.executeQuery(debeziumStorageStatusQuery);
}
/**
* Function to get the latest timestamp of the record in the table
* @param props
Expand Down
Loading