Skip to content

Commit

Permalink
Added new class to store column overrides
Browse files Browse the repository at this point in the history
  • Loading branch information
subkanthi committed Nov 23, 2023
1 parent 492f007 commit 63bb852
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,37 +107,47 @@ public void testCreateTable() throws Exception {
Assert.assertTrue(timestampTable.get("Null_Value").equalsIgnoreCase("Nullable(DateTime64(3))"));

writer.getConnection().close();
//Thread.sleep(10000);
Thread.sleep(10000);

writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);
// Validate temporal_types_DATE data.
ResultSet dateResult = writer.executeQueryWithResultSet("select * from temporal_types_DATE");

boolean dateResultValueChecked = false;
while(dateResult.next()) {
dateResultValueChecked = true;
Assert.assertTrue(dateResult.getDate("Minimum_Value").toString().equalsIgnoreCase("1925-01-01"));
Assert.assertTrue(dateResult.getDate("Mid_Value").toString().equalsIgnoreCase("2022-09-29"));
Assert.assertTrue(dateResult.getDate("Maximum_Value").toString().equalsIgnoreCase("2283-11-11"));
}
Assert.assertTrue(dateResultValueChecked);

// Validate temporal_types_DATETIME data.
ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME");
boolean dateTimeResultValueChecked = false;

while(dateTimeResult.next()) {
System.out.println("DATE TIME");
dateTimeResultValueChecked = true;
Assert.assertTrue(dateTimeResult.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1925-01-01 00:00:00.0"));
Assert.assertTrue(dateTimeResult.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-29 01:47:46.0"));
Assert.assertTrue(dateTimeResult.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2283-11-11 23:59:59.999"));
}
Assert.assertTrue(dateTimeResultValueChecked);

// DATETIME1
boolean dateTimeResult1ValueChecked = false;

ResultSet dateTimeResult1 = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME1");
while(dateTimeResult1.next()) {
System.out.println("DATE TIME 1");
dateTimeResult1ValueChecked = true;

Assert.assertTrue(dateTimeResult1.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1925-01-01 00:00:00.0"));
Assert.assertTrue(dateTimeResult1.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-29 01:48:25.1"));
Assert.assertTrue(dateTimeResult1.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2283-11-11 23:59:59.999"));
}
Assert.assertTrue(dateTimeResult1ValueChecked);

// DATETIME2
ResultSet dateTimeResult2 = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME2");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.altinity.clickhouse.sink.connector.db;

import java.util.HashMap;
import java.util.Map;

/**
* Class that maps overrides of column data types.
*/
public class ColumnOverrides {

static Map<String, String> columnOverridesMap = new HashMap<>();

static {
columnOverridesMap.put("DateTime", "String");
columnOverridesMap.put("Nullable(DateTime", "Nullable(String)");
}
public ColumnOverrides() {

}

public static String getColumnOverride(String dataType) {
for(String key: columnOverridesMap.keySet()){
if(dataType.contains(key)) {
return columnOverridesMap.get(key);
}
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ public MutablePair<String, Map<String, Integer>> getInsertQueryUsingInputFunctio
// Get Field Name and lookup in the Clickhouse column to datatype map.
String dataType = ClickHouseUtils.escape(columnNameToDataTypeMap.get(entry.getKey()), '\'');

if(dataType.contains("DateTime")) {
dataType = "String";
if(ColumnOverrides.getColumnOverride(dataType) != null) {
dataType = ColumnOverrides.getColumnOverride(dataType);
}

if(dataType != null) {
// Is the column a kafka metadata column.
if(isKafkaMetaDataColumn(sourceColumnName)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.altinity.clickhouse.sink.connector.db;

import com.clickhouse.data.ClickHouseDataType;
import org.junit.Assert;
import org.junit.jupiter.api.Test;

public class ColumnOverridesTest {

@Test
public void testMapping() {
String dateTime64Type = "DateTime64(3)";
String dataTime64OverrideType = ColumnOverrides.getColumnOverride(dateTime64Type);
Assert.assertTrue(dataTime64OverrideType.equalsIgnoreCase("String"));

String nullableDateTime64Type = "Nullable(DateTime64)";
String nullableDataTime64OverrideType = ColumnOverrides.getColumnOverride(nullableDateTime64Type);
Assert.assertTrue(nullableDataTime64OverrideType.equalsIgnoreCase("Nullable(String)"));

Assert.assertTrue(ColumnOverrides.getColumnOverride(ClickHouseDataType.DateTime.name()).equalsIgnoreCase("String"));
Assert.assertNull(ColumnOverrides.getColumnOverride(ClickHouseDataType.Decimal.name()));


Assert.assertNull(ColumnOverrides.getColumnOverride(ClickHouseDataType.Int16.name()));
Assert.assertTrue(ColumnOverrides.getColumnOverride(ClickHouseDataType.DateTime32.name()).equalsIgnoreCase(ClickHouseDataType.String.name()));
}
}

0 comments on commit 63bb852

Please sign in to comment.