diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java index 1770f2fbb..58a9526d5 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumChangeEventCapture.java @@ -337,9 +337,26 @@ private void createDatabaseForDebeziumStorage(ClickHouseSinkConnectorConfig conf String createDbQuery = String.format("create database if not exists %s", dbName); log.info("CREATING DEBEZIUM STORAGE Database: " + createDbQuery); writer.executeQuery(createDbQuery); + + + // Also create view. + String view = " CREATE VIEW %s.show_replica_status\n" + + " AS\n" + + " SELECT\n" + + " now() - fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS seconds_behind_source,\n" + + " toDateTime(fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')), 'UTC') AS utc_time,\n" + + " fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS local_time,\n" + + " *\n" + + " FROM %s\n" + + " FINAL"; + String formattedView = String.format(view, dbName, tableName); + try { + writer.executeQuery(formattedView); + } catch(Exception e) { + log.error("**** Error creating VIEW **** " + formattedView); + } } } - } catch(Exception e) { log.error("Error creating Debezium storage database", e); } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java index 58d1e02f6..3fd472ce2 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/cdc/ClickHouseDelayedStartIT.java @@ -124,4 +124,47 @@ public void testClickHouseDelayedStart() throws Exception { clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture().stop(); } + @Test + @DisplayName("Validates that the debezium storage view is created successfully") + public void debeziumStorageView() throws Exception { + + Injector injector = Guice.createInjector(new AppInjector()); + + Properties props = getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer); + ClickHouseDebeziumEmbeddedApplication clickHouseDebeziumEmbeddedApplication = new ClickHouseDebeziumEmbeddedApplication(); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + clickHouseDebeziumEmbeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), + injector.getInstance(DDLParserService.class), props, false); + DebeziumEmbeddedRestApi.startRestApi(props, injector, clickHouseDebeziumEmbeddedApplication.getDebeziumEventCapture() + , new Properties()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + Thread.sleep(10000); + + // Connect to clickhouse and validate that the view was created successfully. + String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "altinity_sink_connector"); + ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", + clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>())); + BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "altinity_sink_connector", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn); + + + // Check if the view altinity_sink_connector.show_replica_status was created successfully. + ResultSet resultSet = writer.executeQueryWithResultSet("show create view altinity_sink_connector.show_replica_status"); + + boolean viewCheck = false; + while (resultSet.next()) { + viewCheck = true; + Assert.assertTrue(resultSet.getString("statement").contains("CREATE VIEW altinity_sink_connector.show_replica_status")); + break; + } + Assert.assertTrue(viewCheck); + } }