Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;

/**
Expand Down Expand Up @@ -98,7 +99,9 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) {

// 1 - Add referenced columns
RecordData.FieldGetter[] fieldGetters = tableInfo.getPreTransformedFieldGetters();
for (String originalColumnName : projectionColumn.getOriginalColumnNames()) {
LinkedHashSet<String> originalColumnNames =
new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
for (String originalColumnName : originalColumnNames) {
switch (originalColumnName) {
case TransformParser.DEFAULT_NAMESPACE_NAME:
params.add(tableInfo.getNamespace());
Expand Down Expand Up @@ -142,7 +145,8 @@ private TransformExpressionKey generateTransformExpressionKey() {
List<Class<?>> paramTypes = new ArrayList<>();
List<Column> columns = tableInfo.getPreTransformedSchema().getColumns();
String scriptExpression = projectionColumn.getScriptExpression();
List<String> originalColumnNames = projectionColumn.getOriginalColumnNames();
LinkedHashSet<String> originalColumnNames =
new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
for (String originalColumnName : originalColumnNames) {
for (Column column : columns) {
if (column.getName().equals(originalColumnName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.stream.Stream;

Expand Down Expand Up @@ -91,14 +92,12 @@ private Tuple2<List<String>, List<Class<?>>> generateArguments() {
List<Class<?>> argTypes = new ArrayList<>();
String scriptExpression = transformFilter.getScriptExpression();
List<Column> columns = tableInfo.getPreTransformedSchema().getColumns();
List<String> columnNames = transformFilter.getColumnNames();
LinkedHashSet<String> columnNames = new LinkedHashSet<>(transformFilter.getColumnNames());
for (String columnName : columnNames) {
for (Column column : columns) {
if (column.getName().equals(columnName)) {
if (!argNames.contains(columnName)) {
argNames.add(columnName);
argTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
}
argNames.add(columnName);
argTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,16 @@ public class PostTransformOperatorTest {
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
.build();

private static final TableId COLUMN_SQUARE_TABLE =
TableId.tableId("my_company", "my_branch", "column_square");
private static final Schema COLUMN_SQUARE_SCHEMA =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.INT())
.physicalColumn("col2", DataTypes.INT())
.physicalColumn("square_col2", DataTypes.INT())
.primaryKey("col1")
.build();

@Test
void testDataChangeEventTransform() throws Exception {
PostTransformOperator transform =
Expand Down Expand Up @@ -560,6 +570,67 @@ void testMetadataASTransform() throws Exception {
.isEqualTo(new StreamRecord<>(insertEventExpect));
}

@Test
void testDataChangeEventTransformWithDuplicateColumns() throws Exception {
PostTransformOperator transform =
PostTransformOperator.newBuilder()
.addTransform(
COLUMN_SQUARE_TABLE.identifier(),
"col1, col2, col2 * col2 as square_col2",
"col2 < 3 OR col2 > 5")
.build();
EventOperatorTestHarness<PostTransformOperator, Event>
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
transformFunctionEventEventOperatorTestHarness.open();
// Create table
CreateTableEvent createTableEvent =
new CreateTableEvent(COLUMN_SQUARE_TABLE, COLUMN_SQUARE_SCHEMA);
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator(((RowType) COLUMN_SQUARE_SCHEMA.toRowDataType()));
// Insert
DataChangeEvent insertEvent =
DataChangeEvent.insertEvent(
COLUMN_SQUARE_TABLE,
recordDataGenerator.generate(new Object[] {1, 1, null}));
DataChangeEvent insertEventExpect =
DataChangeEvent.insertEvent(
COLUMN_SQUARE_TABLE, recordDataGenerator.generate(new Object[] {1, 1, 1}));

DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
COLUMN_SQUARE_TABLE,
recordDataGenerator.generate(new Object[] {6, 6, null}));
DataChangeEvent insertEventExpect2 =
DataChangeEvent.insertEvent(
COLUMN_SQUARE_TABLE, recordDataGenerator.generate(new Object[] {6, 6, 36}));

DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
COLUMN_SQUARE_TABLE,
recordDataGenerator.generate(new Object[] {4, 4, null}));

transform.processElement(new StreamRecord<>(createTableEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(
new StreamRecord<>(
new CreateTableEvent(COLUMN_SQUARE_TABLE, COLUMN_SQUARE_SCHEMA)));
transform.processElement(new StreamRecord<>(insertEvent));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect));
transform.processElement(new StreamRecord<>(insertEvent2));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(insertEventExpect2));
transform.processElement(new StreamRecord<>(insertEvent3));
Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isNull();
}

@Test
void testTimestampTransform() throws Exception {
PostTransformOperator transform =
Expand Down