Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added test to replicate generated columns in postgreSQL #777

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions sink-connector-lightweight/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>io.javalin</groupId>
<artifactId>javalin-testtools</artifactId>
<version>5.5.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>okhttp</artifactId>
<groupId>com.squareup.okhttp3</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
Expand Down Expand Up @@ -308,6 +320,7 @@
<surefire-plugin.version>3.0.0-M7</surefire-plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<sink-connector-library-version>0.0.9</sink-connector-library-version>
<javalin-version>5.5.0</javalin-version>
<version.junit>5.9.1</version.junit>
<maven.compiler.source>17</maven.compiler.source>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ public Properties getProperties() throws Exception {
properties.put("slot.retry.delay.ms", "5000" );
properties.put("database.allowPublicKeyRetrieval", "true" );
properties.put("schema.include.list", "public,public2");
properties.put("table.include.list", "public.tm,public2.tm2" );
properties.put("table.include.list", "public.tm,public2.tm2,public.people" );
properties.put("column.exclude.list", "public.people.full_name_mat");
return properties;
}

@Test
@DisplayName("Integration Test - Validates postgresql replication works with multiple schemas")
@DisplayName("Integration Test - Validates postgresql replication works with multiple schemas and ignoring ALIAS columns in ClickHouse")
public void testMultipleSchemaReplication() throws Exception {
Network network = Network.newNetwork();

Expand Down Expand Up @@ -135,6 +136,32 @@ public void testMultipleSchemaReplication() throws Exception {
}
Assert.assertTrue(tm2Count == 1);

// Create a connection to postgresql and create a new table.
Connection postgresConn2 = ITCommon.connectToPostgreSQL(postgreSQLContainer);
postgresConn2.createStatement().execute("CREATE TABLE public.people( height_cm numeric PRIMARY KEY, height_in numeric GENERATED ALWAYS AS (height_cm / 2.54) STORED)");

Thread.sleep(10000);
// insert new records into the new table.
postgresConn2.createStatement().execute("insert into public.people (height_cm) values (180)");
Thread.sleep(10000);

// ClickHouse, add ALIAS column to public.people
conn.createStatement().execute("ALTER TABLE public.people ADD COLUMN full_name String ALIAS concat('John', ' ', 'Doe');");
Thread.sleep(10000);

// Add MATERIALIZED column to public.people
conn.createStatement().execute("ALTER TABLE public.people ADD COLUMN full_name_mat String MATERIALIZED toString(height_cm)");
postgresConn2.createStatement().execute("insert into public.people (height_cm) values (200)");
Thread.sleep(20000);

// Check if public.people has 2 records.
int peopleCount = 0;
ResultSet chRs3 = writer.getConnection().prepareStatement("select count(*) from public.people").executeQuery();
while(chRs3.next()) {
peopleCount = chRs3.getInt(1);
}
Assert.assertTrue(peopleCount == 2);

if(engine.get() != null) {
engine.get().stop();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.altinity.clickhouse.sink.connector.db;

import static com.altinity.clickhouse.sink.connector.db.ClickHouseDbConstants.CHECK_DB_EXISTS_SQL;

import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
Expand All @@ -12,9 +14,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TimeZone;
import java.util.*;

public class DBMetadata {

Expand Down Expand Up @@ -271,17 +271,21 @@ public boolean checkIfNewReplacingMergeTree(String currentClickHouseVersion) thr
return result;
}




/**
* Function that uses the DatabaseMetaData JDBC functionality
* to get the column name and column data type as key/value pair.
*/
public Map<String, String> getColumnsDataTypesForTable(String tableName,
ClickHouseConnection conn,
String database) {
String database,
ClickHouseSinkConnectorConfig config) {

Set<String> aliasColumns = new HashSet<>();
try {
aliasColumns = new DBMetadata().getAliasAndMaterializedColumnsForTableAndDatabase(tableName, database, conn);
} catch(Exception e) {
log.error("Error getting alias columns", e);
}
LinkedHashMap<String, String> result = new LinkedHashMap<>();
try {
if (conn == null) {
Expand All @@ -308,6 +312,10 @@ public Map<String, String> getColumnsDataTypesForTable(String tableName,
if(isGeneratedColumn != null && isGeneratedColumn.equalsIgnoreCase("YES")) {
continue;
}
if(aliasColumns.contains(columnName)) {
log.debug("Skipping alias column: " + columnName);
continue;
}
result.put(columnName, typeName);
}
} catch (SQLException sq) {
Expand All @@ -329,4 +337,29 @@ public ZoneId getServerTimeZone(ClickHouseConnection conn) {

return result;
}

/**
* Function to get the column names which are
* @return
*/
public Set<String> getAliasAndMaterializedColumnsForTableAndDatabase(String tableName, String databaseName,
ClickHouseConnection conn) throws SQLException {

Set<String> aliasColumns = new HashSet<>();
String query = "SELECT name FROM system.columns WHERE (table = '%s') AND (database = '%s') and " +
"(default_kind='ALIAS' or default_kind='MATERIALIZED')";
String formattedQuery = String.format(query, tableName, databaseName);

// Execute query
ResultSet rs = conn.createStatement().executeQuery(formattedQuery);

// Get the list of columns from rs.
if(rs != null) {
while (rs.next()) {
String response = rs.getString(1);
aliasColumns.add(response);
}
}
return aliasColumns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public DbKafkaOffsetWriter(

createOffsetTable();
this.columnNamesToDataTypesMap = new DBMetadata().getColumnsDataTypesForTable(tableName, this.getConnection(),
database);
database, config);
this.query = new QueryFormatter().getInsertQueryUsingInputFunction(tableName, columnNamesToDataTypesMap);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public DbWriter(
try {
if (this.conn != null) {
// Order of the column names and the data type has to match.
this.columnNameToDataTypeMap = new DBMetadata().getColumnsDataTypesForTable(tableName, this.conn, database);
this.columnNameToDataTypeMap = new DBMetadata().getColumnsDataTypesForTable(tableName, this.conn,
database, config);
}
DBMetadata metadata = new DBMetadata();

Expand Down Expand Up @@ -125,7 +126,7 @@ public DbWriter(
log.error("********* AUTO CREATE DISABLED, Table does not exist, please enable it by setting auto.create.tables=true");
}

this.columnNameToDataTypeMap = new DBMetadata().getColumnsDataTypesForTable(tableName, this.conn, database);
this.columnNameToDataTypeMap = new DBMetadata().getColumnsDataTypesForTable(tableName, this.conn, database, config);
response = metadata.getTableEngine(this.conn, database, tableName);
this.engine = response.getLeft();
}
Expand Down Expand Up @@ -188,7 +189,7 @@ public void createOffsetSchemaHistoryDatabase() {
}

public void updateColumnNameToDataTypeMap() throws SQLException {
this.columnNameToDataTypeMap = new DBMetadata().getColumnsDataTypesForTable(tableName, this.conn, database);
this.columnNameToDataTypeMap = new DBMetadata().getColumnsDataTypesForTable(tableName, this.conn, database, config);
MutablePair<DBMetadata.TABLE_ENGINE, String> response = new DBMetadata().getTableEngine(this.conn, database, tableName);
this.engine = response.getLeft();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ public boolean groupQueryWithRecords(List<ClickHouseStruct> records,
if(enableSchemaEvolution) {
try {
new ClickHouseAlterTable().alterTable(record.getAfterStruct().schema().fields(), tableName, connection, columnNameToDataTypeMap);
columnNameToDataTypeMap = new DBMetadata().getColumnsDataTypesForTable(tableName, connection, databaseName);
columnNameToDataTypeMap = new DBMetadata().getColumnsDataTypesForTable(tableName, connection, databaseName, config);

} catch(Exception e) {
log.error("**** ERROR ALTER TABLE: " + tableName, e);
}
}

columnNameToDataTypeMap = new DBMetadata().getColumnsDataTypesForTable(tableName, connection, databaseName, config );
result = updateQueryToRecordsMap(record, record.getAfterModifiedFields(), queryToRecordsMap, tableName, config, columnNameToDataTypeMap);
} else if(CdcRecordState.CDC_RECORD_STATE_BOTH == getCdcSectionBasedOnOperation(record.getCdcOperation())) {
if(record.getBeforeModifiedFields() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Set;

@Testcontainers

Expand Down Expand Up @@ -149,4 +150,28 @@ public void getTestGetServerTimeZone() {
Assert.assertTrue(serverTimeZone.toString().equalsIgnoreCase("America/Chicago"));

}

@Test
public void getAliasAndMaterializedColumnsList() throws SQLException {
String dbHostName = clickHouseContainer.getHost();
Integer port = clickHouseContainer.getFirstMappedPort();
String database = "default";
String userName = clickHouseContainer.getUsername();
String password = clickHouseContainer.getPassword();
String tableName = "employees";

String jdbcUrl = BaseDbWriter.getConnectionString(dbHostName, port, database);
ClickHouseConnection conn = DbWriter.createConnection(jdbcUrl, "client_1", userName, password, new ClickHouseSinkConnectorConfig(new HashMap<>()));
Set<String> aliasColumns = new DBMetadata().getAliasAndMaterializedColumnsForTableAndDatabase("people", "employees2", conn);

Assert.assertTrue(aliasColumns.size() == 2);


// Check for a table with no alias columns.
Set<String> tmAliasColumns = new DBMetadata().getAliasAndMaterializedColumnsForTableAndDatabase("tm", "public", conn);
Assert.assertTrue(tmAliasColumns.size() == 0);
// Check for a table with no alias columns.
Set<String> employeeMaterializedColumns = new DBMetadata().getAliasAndMaterializedColumnsForTableAndDatabase("employee_materialized", "employees2", conn);
Assert.assertTrue(employeeMaterializedColumns.size() == 1);
}
}
30 changes: 29 additions & 1 deletion sink-connector/src/test/resources/init_clickhouse.sql
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,32 @@ create table employees2.ma_users
)
engine = MergeTree()
ORDER BY date
SETTINGS index_granularity = 8192;
SETTINGS index_granularity = 8192;


CREATE TABLE employees2.people
(
`height_cm` Decimal(64, 18),
`_version` UInt64,
`_sign` UInt8,
`full_name` String ALIAS concat('John', ' ', 'Doe'),
`full_name2` String ALIAS concat('Alice', ' ', 'W'),
)
ENGINE = ReplacingMergeTree(_version, _sign)
PRIMARY KEY height_cm
ORDER BY height_cm
SETTINGS index_granularity = 8192;

-- CREATE TABLE with ClickHouse Materialized columns.
CREATE TABLE employees2.employee_materialized (
`id` UInt64,
`name` String,
`age` Nullable(UInt8),
`salary` Nullable(UInt32),
`_sign` UInt8,
`full_name` String MATERIALIZED concat('John', ' ', 'Doe')
)
ENGINE = MergeTree()
PRIMARY KEY (id)
ORDER BY (id)
SETTINGS index_granularity = 8192;
Loading