Skip to content

Commit 26ff6d2

Browse files
committed
[FLINK-35237][cdc-common] Improve the interfaces and reorganize the directory.
1 parent 9a4dede commit 26ff6d2

File tree

8 files changed

+68
-47
lines changed

8 files changed

+68
-47
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunction.java renamed to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/function/HashFunction.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.cdc.common.sink;
18+
package org.apache.flink.cdc.common.function;
1919

20-
import org.apache.flink.cdc.common.event.DataChangeEvent;
20+
import org.apache.flink.cdc.common.annotation.Internal;
2121

22-
import java.util.function.Function;
22+
/**
23+
* The hash function used to calculate the hash code from a given event.
24+
*
25+
* @param <T> the type of given event.
26+
*/
27+
@Internal
28+
public interface HashFunction<T> {
2329

24-
/** use for {@code PrePartitionOperator} when calculating hash code of primary key. */
25-
public interface HashFunction extends Function<DataChangeEvent, Integer> {
26-
@Override
27-
Integer apply(DataChangeEvent event);
30+
int hashcode(T event);
2831
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunctionProvider.java renamed to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/function/HashFunctionProvider.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,27 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.cdc.common.sink;
18+
package org.apache.flink.cdc.common.function;
1919

20+
import org.apache.flink.cdc.common.annotation.Internal;
2021
import org.apache.flink.cdc.common.event.DataChangeEvent;
2122
import org.apache.flink.cdc.common.event.TableId;
2223
import org.apache.flink.cdc.common.schema.Schema;
24+
import org.apache.flink.cdc.common.sink.DataSink;
25+
26+
import javax.annotation.Nullable;
2327

2428
import java.io.Serializable;
2529

2630
/**
27-
* Provide {@link HashFunction} to help {@code PrePartitionOperator} to shuffle {@link
28-
* DataChangeEvent} to designated subtask. This is usually beneficial for load balancing, when
29-
* writing to different partitions/buckets in {@link DataSink}, add custom implementation to further
30-
* improve efficiency.
31+
* Provider that provides {@link HashFunction} to help {@code PrePartitionOperator} to shuffle event
32+
* to designated subtask. This is usually beneficial for load balancing, when writing to different
33+
* partitions/buckets in {@link DataSink}, add custom implementation to further improve efficiency.
34+
*
35+
* @param <T> the type of given element
3136
*/
32-
public interface HashFunctionProvider extends Serializable {
37+
@Internal
38+
public interface HashFunctionProvider<T> extends Serializable {
3339

3440
/**
3541
* Gets a hash function based on the given table ID and schema, to help {@code
@@ -39,5 +45,5 @@ public interface HashFunctionProvider extends Serializable {
3945
* @param schema flink table schema
4046
* @return hash function based on the given table ID and schema
4147
*/
42-
HashFunction getHashFunction(TableId tableId, Schema schema);
48+
HashFunction<T> getHashFunction(@Nullable TableId tableId, Schema schema);
4349
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.flink.cdc.common.sink;
1919

2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
import org.apache.flink.cdc.common.event.DataChangeEvent;
22+
import org.apache.flink.cdc.common.function.HashFunctionProvider;
2123

2224
/**
2325
* {@code DataSink} is used to write change data to external system and apply metadata changes to
@@ -33,10 +35,10 @@ public interface DataSink {
3335
MetadataApplier getMetadataApplier();
3436

3537
/**
36-
* Get the {@link HashFunctionProvider} for calculating hash value when partition by primary
37-
* ley.
38+
* Get the {@code HashFunctionProvider<DataChangeEvent>} for calculating hash value if you need
39+
* to partition by data change event before Sink.
3840
*/
39-
default HashFunctionProvider getHashFunctionProvider() {
40-
return new DefaultHashFunctionProvider();
41+
default HashFunctionProvider<DataChangeEvent> getDataChangeEventHashFunctionProvider() {
42+
return new DefaultDataChangeEventHashFunctionProvider();
4143
}
4244
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DefaultHashFunctionProvider.java renamed to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DefaultDataChangeEventHashFunctionProvider.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,43 @@
1818
package org.apache.flink.cdc.common.sink;
1919

2020
import org.apache.flink.cdc.common.data.RecordData;
21+
import org.apache.flink.cdc.common.data.RecordData.FieldGetter;
2122
import org.apache.flink.cdc.common.event.DataChangeEvent;
2223
import org.apache.flink.cdc.common.event.OperationType;
2324
import org.apache.flink.cdc.common.event.TableId;
25+
import org.apache.flink.cdc.common.function.HashFunction;
26+
import org.apache.flink.cdc.common.function.HashFunctionProvider;
2427
import org.apache.flink.cdc.common.schema.Schema;
2528

29+
import javax.annotation.Nullable;
30+
2631
import java.util.ArrayList;
2732
import java.util.List;
2833
import java.util.Objects;
2934
import java.util.Optional;
3035

31-
/** the default implementation of hash function. */
32-
public class DefaultHashFunctionProvider implements HashFunctionProvider {
36+
/** The default {@link HashFunctionProvider} implementation for data change event. */
37+
public class DefaultDataChangeEventHashFunctionProvider
38+
implements HashFunctionProvider<DataChangeEvent> {
39+
3340
private static final long serialVersionUID = 1L;
3441

3542
@Override
36-
public HashFunction getHashFunction(TableId tableId, Schema schema) {
37-
return new DefaultHashFunction(schema);
43+
public HashFunction<DataChangeEvent> getHashFunction(@Nullable TableId tableId, Schema schema) {
44+
return new DefaultDataChangeEventHashFunction(schema);
3845
}
3946

40-
static class DefaultHashFunction implements HashFunction {
41-
private final List<RecordData.FieldGetter> primaryKeyGetters;
47+
/** The default {@link HashFunction} implementation for data change event. */
48+
static class DefaultDataChangeEventHashFunction implements HashFunction<DataChangeEvent> {
49+
50+
private final List<FieldGetter> primaryKeyGetters;
4251

43-
public DefaultHashFunction(Schema schema) {
52+
public DefaultDataChangeEventHashFunction(Schema schema) {
4453
primaryKeyGetters = createFieldGetters(schema);
4554
}
4655

4756
@Override
48-
public Integer apply(DataChangeEvent event) {
57+
public int hashcode(DataChangeEvent event) {
4958
List<Object> objectsToHash = new ArrayList<>();
5059
// Table ID
5160
TableId tableId = event.tableId();
@@ -56,17 +65,16 @@ public Integer apply(DataChangeEvent event) {
5665
// Primary key
5766
RecordData data =
5867
event.op().equals(OperationType.DELETE) ? event.before() : event.after();
59-
for (RecordData.FieldGetter primaryKeyGetter : primaryKeyGetters) {
68+
for (FieldGetter primaryKeyGetter : primaryKeyGetters) {
6069
objectsToHash.add(primaryKeyGetter.getFieldOrNull(data));
6170
}
6271

6372
// Calculate hash
6473
return (Objects.hash(objectsToHash.toArray()) * 31) & 0x7FFFFFFF;
6574
}
6675

67-
private List<RecordData.FieldGetter> createFieldGetters(Schema schema) {
68-
List<RecordData.FieldGetter> fieldGetters =
69-
new ArrayList<>(schema.primaryKeys().size());
76+
private List<FieldGetter> createFieldGetters(Schema schema) {
77+
List<FieldGetter> fieldGetters = new ArrayList<>(schema.primaryKeys().size());
7078
schema.primaryKeys().stream()
7179
.mapToInt(
7280
pk -> {

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
140140
parallelism,
141141
parallelism,
142142
schemaOperatorIDGenerator.generate(),
143-
dataSink.getHashFunctionProvider());
143+
dataSink.getDataChangeEventHashFunctionProvider());
144144

145145
// Build Sink Operator
146146
DataSinkTranslator sinkTranslator = new DataSinkTranslator();

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/PartitioningTranslator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
package org.apache.flink.cdc.composer.flink.translator;
1919

2020
import org.apache.flink.cdc.common.annotation.Internal;
21+
import org.apache.flink.cdc.common.event.DataChangeEvent;
2122
import org.apache.flink.cdc.common.event.Event;
22-
import org.apache.flink.cdc.common.sink.HashFunctionProvider;
23+
import org.apache.flink.cdc.common.function.HashFunctionProvider;
2324
import org.apache.flink.cdc.runtime.partitioning.EventPartitioner;
2425
import org.apache.flink.cdc.runtime.partitioning.PartitioningEventKeySelector;
2526
import org.apache.flink.cdc.runtime.partitioning.PostPartitionProcessor;
@@ -41,7 +42,7 @@ public DataStream<Event> translate(
4142
int upstreamParallelism,
4243
int downstreamParallelism,
4344
OperatorID schemaOperatorID,
44-
HashFunctionProvider hashFunctionProvider) {
45+
HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
4546
return input.transform(
4647
"PrePartition",
4748
new PartitioningEventTypeInfo(),

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
import org.apache.flink.cdc.common.event.FlushEvent;
2424
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2525
import org.apache.flink.cdc.common.event.TableId;
26+
import org.apache.flink.cdc.common.function.HashFunction;
27+
import org.apache.flink.cdc.common.function.HashFunctionProvider;
2628
import org.apache.flink.cdc.common.schema.Schema;
27-
import org.apache.flink.cdc.common.sink.HashFunction;
28-
import org.apache.flink.cdc.common.sink.HashFunctionProvider;
2929
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
3030
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
3131
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -47,19 +47,20 @@
4747
public class PrePartitionOperator extends AbstractStreamOperator<PartitioningEvent>
4848
implements OneInputStreamOperator<Event, PartitioningEvent> {
4949

50+
private static final long serialVersionUID = 1L;
5051
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);
5152

5253
private final OperatorID schemaOperatorId;
5354
private final int downstreamParallelism;
54-
private final HashFunctionProvider hashFunctionProvider;
55+
private final HashFunctionProvider<DataChangeEvent> hashFunctionProvider;
5556

5657
private transient SchemaEvolutionClient schemaEvolutionClient;
57-
private transient LoadingCache<TableId, HashFunction> cachedHashFunctions;
58+
private transient LoadingCache<TableId, HashFunction<DataChangeEvent>> cachedHashFunctions;
5859

5960
public PrePartitionOperator(
6061
OperatorID schemaOperatorId,
6162
int downstreamParallelism,
62-
HashFunctionProvider hashFunctionProvider) {
63+
HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
6364
this.chainingStrategy = ChainingStrategy.ALWAYS;
6465
this.schemaOperatorId = schemaOperatorId;
6566
this.downstreamParallelism = downstreamParallelism;
@@ -100,7 +101,7 @@ private void partitionBy(DataChangeEvent dataChangeEvent) throws Exception {
100101
dataChangeEvent,
101102
cachedHashFunctions
102103
.get(dataChangeEvent.tableId())
103-
.apply(dataChangeEvent)
104+
.hashcode(dataChangeEvent)
104105
% downstreamParallelism)));
105106
}
106107

@@ -126,17 +127,17 @@ private Schema loadLatestSchemaFromRegistry(TableId tableId) {
126127
return schema.get();
127128
}
128129

129-
private HashFunction recreateHashFunction(TableId tableId) {
130+
private HashFunction<DataChangeEvent> recreateHashFunction(TableId tableId) {
130131
return hashFunctionProvider.getHashFunction(tableId, loadLatestSchemaFromRegistry(tableId));
131132
}
132133

133-
private LoadingCache<TableId, HashFunction> createCache() {
134+
private LoadingCache<TableId, HashFunction<DataChangeEvent>> createCache() {
134135
return CacheBuilder.newBuilder()
135136
.expireAfterAccess(CACHE_EXPIRE_DURATION)
136137
.build(
137-
new CacheLoader<TableId, HashFunction>() {
138+
new CacheLoader<TableId, HashFunction<DataChangeEvent>>() {
138139
@Override
139-
public HashFunction load(TableId key) {
140+
public HashFunction<DataChangeEvent> load(TableId key) {
140141
return recreateHashFunction(key);
141142
}
142143
});

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.flink.cdc.common.event.FlushEvent;
2424
import org.apache.flink.cdc.common.event.TableId;
2525
import org.apache.flink.cdc.common.schema.Schema;
26-
import org.apache.flink.cdc.common.sink.DefaultHashFunctionProvider;
26+
import org.apache.flink.cdc.common.sink.DefaultDataChangeEventHashFunctionProvider;
2727
import org.apache.flink.cdc.common.types.DataTypes;
2828
import org.apache.flink.cdc.common.types.RowType;
2929
import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
@@ -128,9 +128,9 @@ void testPartitioningDataChangeEvent() throws Exception {
128128
}
129129

130130
private int getPartitioningTarget(Schema schema, DataChangeEvent dataChangeEvent) {
131-
return new DefaultHashFunctionProvider()
131+
return new DefaultDataChangeEventHashFunctionProvider()
132132
.getHashFunction(null, schema)
133-
.apply(dataChangeEvent)
133+
.hashcode(dataChangeEvent)
134134
% DOWNSTREAM_PARALLELISM;
135135
}
136136

@@ -139,7 +139,7 @@ private EventOperatorTestHarness<PrePartitionOperator, PartitioningEvent> create
139139
new PrePartitionOperator(
140140
TestingSchemaRegistryGateway.SCHEMA_OPERATOR_ID,
141141
DOWNSTREAM_PARALLELISM,
142-
new DefaultHashFunctionProvider());
142+
new DefaultDataChangeEventHashFunctionProvider());
143143
return new EventOperatorTestHarness<>(operator, DOWNSTREAM_PARALLELISM);
144144
}
145145
}

0 commit comments

Comments
 (0)