Skip to content

Commit

Permalink
Merge pull request #549 from Altinity/528-show_replica_status-view
Browse files Browse the repository at this point in the history
[528] Added logic to create view for replica_source_info table
  • Loading branch information
subkanthi authored Apr 23, 2024
2 parents e6db8f2 + ceb2ec0 commit e6b4f4e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,26 @@ private void createDatabaseForDebeziumStorage(ClickHouseSinkConnectorConfig conf
String createDbQuery = String.format("create database if not exists %s", dbName);
log.info("CREATING DEBEZIUM STORAGE Database: " + createDbQuery);
writer.executeQuery(createDbQuery);


// Also create view.
String view = " CREATE VIEW %s.show_replica_status\n" +
" AS\n" +
" SELECT\n" +
" now() - fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS seconds_behind_source,\n" +
" toDateTime(fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')), 'UTC') AS utc_time,\n" +
" fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS local_time,\n" +
" *\n" +
" FROM %s\n" +
" FINAL";
String formattedView = String.format(view, dbName, tableName);
try {
writer.executeQuery(formattedView);
} catch(Exception e) {
log.error("**** Error creating VIEW **** " + formattedView);
}
}
}

} catch(Exception e) {
log.error("Error creating Debezium storage database", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,47 @@ public void testClickHouseDelayedStart() throws Exception {
clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().stop();
}

@Test
@DisplayName("Validates that the debezium storage view is created successfully")
public void debeziumStorageView() throws Exception {

Injector injector = Guice.createInjector(new AppInjector());

Properties props = getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer);
ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication();

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture()
, new Properties());
} catch (Exception e) {
throw new RuntimeException(e);
}
});

Thread.sleep(10000);

// Connect to clickhouse and validate that the view was created successfully.
String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"altinity_sink_connector");
ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"altinity_sink_connector", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);


// Check if the view altinity_sink_connector.show_replica_status was created successfully.
ResultSet resultSet = writer.executeQueryWithResultSet("show create view altinity_sink_connector.show_replica_status");

boolean viewCheck = false;
while (resultSet.next()) {
viewCheck = true;
Assert.assertTrue(resultSet.getString("statement").contains("CREATE VIEW altinity_sink_connector.show_replica_status"));
break;
}
Assert.assertTrue(viewCheck);
}
}

0 comments on commit e6b4f4e

Please sign in to comment.