Skip to content

Commit b2db57d

Browse files
committed
Use flat List for projection column lookup instead of HashMap (should be faster)
1 parent 11ab3ae commit b2db57d

File tree

2 files changed

+28
-21
lines changed

2 files changed

+28
-21
lines changed

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import javax.annotation.Nullable;
3232

3333
import java.util.ArrayList;
34-
import java.util.HashMap;
3534
import java.util.List;
3635
import java.util.Map;
3736
import java.util.concurrent.ConcurrentHashMap;
@@ -57,7 +56,7 @@ public class PostTransformProcessor {
5756
private @Nullable TransformFilter transformFilter;
5857
private String timezone;
5958
private Map<String, ProjectionColumnProcessor> projectionColumnProcessorMap;
60-
private Map<String, ProjectionColumn> cachedProjectionColumns;
59+
private List<ProjectionColumn> cachedProjectionColumns;
6160

6261
public PostTransformProcessor(
6362
TableInfo tableInfo,
@@ -117,9 +116,11 @@ public Schema processSchemaChangeEvent(Schema schema) {
117116

118117
public BinaryRecordData processData(BinaryRecordData payload, long epochTime) {
119118
List<Object> valueList = new ArrayList<>();
120-
for (Column column : tableInfo.getSchema().getColumns()) {
121-
ProjectionColumn projectionColumn = cachedProjectionColumns.get(column.getName());
122-
if (cachedProjectionColumns.containsKey(column.getName())) {
119+
List<Column> columns = tableInfo.getSchema().getColumns();
120+
121+
for (int i = 0; i < columns.size(); i++) {
122+
if (cachedProjectionColumns.get(i) != null) {
123+
ProjectionColumn projectionColumn = cachedProjectionColumns.get(i);
123124
projectionColumnProcessorMap.putIfAbsent(
124125
projectionColumn.getColumnName(),
125126
ProjectionColumnProcessor.of(tableInfo, projectionColumn, timezone));
@@ -130,6 +131,7 @@ public BinaryRecordData processData(BinaryRecordData payload, long epochTime) {
130131
projectionColumnProcessor.evaluate(payload, epochTime),
131132
projectionColumn.getDataType()));
132133
} else {
134+
Column column = columns.get(i);
133135
valueList.add(
134136
getValueFromBinaryRecordData(
135137
column.getName(),
@@ -139,6 +141,7 @@ public BinaryRecordData processData(BinaryRecordData payload, long epochTime) {
139141
tableInfo.getOriginalFieldGetters()));
140142
}
141143
}
144+
142145
return tableInfo.getRecordDataGenerator().generate(valueList.toArray(new Object[0]));
143146
}
144147

@@ -157,23 +160,25 @@ private Object getValueFromBinaryRecordData(
157160
return null;
158161
}
159162

160-
private Map<String, ProjectionColumn> cacheProjectionColumnMap(
163+
private List<ProjectionColumn> cacheProjectionColumnMap(
161164
TableInfo tableInfo, TransformProjection transformProjection) {
162-
Map<String, ProjectionColumn> cachedMap = new HashMap<>();
165+
List<ProjectionColumn> cachedProjectionColumns = new ArrayList<>();
163166
if (!hasTableInfo()) {
164-
return cachedMap;
167+
return cachedProjectionColumns;
165168
}
166169

167170
for (Column column : tableInfo.getSchema().getColumns()) {
171+
ProjectionColumn matchedProjectionColumn = null;
168172
for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) {
169173
if (column.getName().equals(projectionColumn.getColumnName())
170174
&& projectionColumn.isValidTransformedProjectionColumn()) {
171-
cachedMap.put(column.getName(), projectionColumn);
175+
matchedProjectionColumn = projectionColumn;
172176
break;
173177
}
174178
}
179+
cachedProjectionColumns.add(matchedProjectionColumn);
175180
}
176181

177-
return cachedMap;
182+
return cachedProjectionColumns;
178183
}
179184
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@
2828
import javax.annotation.Nullable;
2929

3030
import java.util.ArrayList;
31-
import java.util.HashSet;
3231
import java.util.List;
33-
import java.util.Set;
3432

3533
/**
3634
* The processor of pre-transform projection in {@link PreTransformOperator}.
@@ -47,7 +45,7 @@ public class PreTransformProcessor {
4745
private TableChangeInfo tableChangeInfo;
4846
private TransformProjection transformProjection;
4947
private @Nullable TransformFilter transformFilter;
50-
private Set<String> cachedProjectionColumns;
48+
private List<Boolean> cachedProjectionColumnsState;
5149

5250
public PreTransformProcessor(
5351
TableChangeInfo tableChangeInfo,
@@ -56,7 +54,7 @@ public PreTransformProcessor(
5654
this.tableChangeInfo = tableChangeInfo;
5755
this.transformProjection = transformProjection;
5856
this.transformFilter = transformFilter;
59-
this.cachedProjectionColumns =
57+
this.cachedProjectionColumnsState =
6058
cacheIsProjectionColumnMap(tableChangeInfo, transformProjection);
6159
}
6260

@@ -82,19 +80,21 @@ public CreateTableEvent preTransformCreateTableEvent(CreateTableEvent createTabl
8280

8381
public BinaryRecordData processFillDataField(BinaryRecordData data) {
8482
List<Object> valueList = new ArrayList<>();
85-
for (Column column : tableChangeInfo.getTransformedSchema().getColumns()) {
86-
if (cachedProjectionColumns.contains(column.getName())) {
83+
List<Column> columns = tableChangeInfo.getTransformedSchema().getColumns();
84+
85+
for (int i = 0; i < columns.size(); i++) {
86+
if (cachedProjectionColumnsState.get(i)) {
8787
valueList.add(null);
88-
break;
8988
} else {
9089
valueList.add(
9190
getValueFromBinaryRecordData(
92-
column.getName(),
91+
columns.get(i).getName(),
9392
data,
9493
tableChangeInfo.getOriginalSchema().getColumns(),
9594
tableChangeInfo.getFieldGetters()));
9695
}
9796
}
97+
9898
return tableChangeInfo.getRecordDataGenerator().generate(valueList.toArray(new Object[0]));
9999
}
100100

@@ -112,21 +112,23 @@ private Object getValueFromBinaryRecordData(
112112
return null;
113113
}
114114

115-
private Set<String> cacheIsProjectionColumnMap(
115+
private List<Boolean> cacheIsProjectionColumnMap(
116116
TableChangeInfo tableChangeInfo, TransformProjection transformProjection) {
117-
Set<String> cachedMap = new HashSet<>();
117+
List<Boolean> cachedMap = new ArrayList<>();
118118
if (!hasTableChangeInfo()) {
119119
return cachedMap;
120120
}
121121

122122
for (Column column : tableChangeInfo.getTransformedSchema().getColumns()) {
123+
boolean isProjectionColumn = false;
123124
for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) {
124125
if (column.getName().equals(projectionColumn.getColumnName())
125126
&& projectionColumn.isValidTransformedProjectionColumn()) {
126-
cachedMap.add(column.getName());
127+
isProjectionColumn = true;
127128
break;
128129
}
129130
}
131+
cachedMap.add(isProjectionColumn);
130132
}
131133

132134
return cachedMap;

0 commit comments

Comments
 (0)