Skip to content

Conversation

@voonhous
Copy link
Member

@voonhous voonhous commented Oct 24, 2025

This PR adds Hudi CDC sink support to Flink CDC.

As of now the following features are supported:

  1. Simple bucket index
  2. Non-partitioned tables
  3. MOR tables
  4. Compaction plan generation (Compaction execution will require a separate process as of now)

Future improvements will be made to bring along more future support other native hudi features gradually/iteratively as we are trying to keep the PR small and manageable for reviews.

@voonhous
Copy link
Member Author

@danny0405 @cshuo FYI

@voonhous
Copy link
Member Author

Changes here will require Hudi 1.1.0 to be released first.

Copy link

@cshuo cshuo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous thanks for the pr. Can you also describe the scope of pr for the Hudi CDC sink, e.g., what index types and table service(compaction) modes are supported.

*/
private void processFlushForTableFunction(
EventBucketStreamWriteFunction tableFunction, Event flushEvent) {
try {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to use reflection now? call tableFunction.flushRemaining(false); directly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

// Extract record key from event data using cached field getters
String recordKey = extractRecordKeyFromEvent(dataChangeEvent);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

record key can be get from HoodieFlinkInternalRow directly by calling HoodieFlinkInternalRow#getRecordKey(). So extractRecordKeyFromEvent is unnecessary, and primaryKeyFieldGetters can be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/** Base infrastructures for streaming writer function to handle Events. */
public abstract class EventStreamWriteFunction extends AbstractStreamWriteFunction<Event>
implements EventProcessorFunction {
Copy link

@cshuo cshuo Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make minimal changes to StreamWriteFunction and BucketStreamWriteFunction, the generic type should kept as HoodieFlinkInternalRow. We can confine operations of Event within MultiTableEventStreamWriteFunction and StreamWriteFunction only need to provide the following operations:

  • processData(HoodieFlinkInternalRow): DataChangeEvent can be converted to HoodieFlinkInternalRow in MultiTableEventStreamWriteFunction.
  • flushRemaining(): called when flush event is received.
  • updateSchema()?: when shema change event is received, and need update inner schema or related fields, like index fields.

Copy link

@cshuo cshuo Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no need to implement EventProcessorFunction, actually processSchemaChange and processFlush of EventStreamWriteFunction will never be called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, i tried this, i remember what the problem was:

Event to HoodieFlinkInternalRow conversion in MultiTableEventStreamWriteFunction.

  1. HoodieFlinkInternalRow constructor requires fileId and instantTime upfront
  2. These values come from defineRecordLocation() which needs bucket number
  3. Cannot create HoodieFlinkInternalRow before calling defineRecordLocation

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileId and instantTime are not required to construct HoodieFlinkInternalRow, these two fields are later set in defineRecordLocation().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no need to implement EventProcessorFunction, actually processSchemaChange and processFlush of EventStreamWriteFunction will never be called.

Caused by: java.lang.RuntimeException: Failed to process schema event for table: hudi_inventory_bptbsn.products
	at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processSchemaChange(MultiTableEventStreamWriteFunction.java:296)
	at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processElement(MultiTableEventStreamWriteFunction.java:167)
	at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processElement(MultiTableEventStreamWriteFunction.java:72)
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.cdc.connectors.hudi.sink.bucket.FlushEventAlignmentOperator.processElement(FlushEventAlignmentOperator.java:94)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsupportedOperationException: #processSchemaChange should not be called
	at org.apache.flink.cdc.connectors.hudi.sink.function.EventBucketStreamWriteFunction.processSchemaChange(EventBucketStreamWriteFunction.java:158)
	at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processSchemaChange(MultiTableEventStreamWriteFunction.java:293)
	... 24 more

It is being invoked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* <p>Assumes that CreateTableEvent will always arrive before DataChangeEvent for each table,
* following the standard CDC pipeline startup sequence.
*/
public class HudiRecordEventSerializer implements HudiRecordSerializer<Event> {
Copy link

@cshuo cshuo Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems HudiRecordEventSerializer is designed to deal with serializing for multiple tables. Like comments in EventStreamWriteFunction, HudiRecordEventSerializer can be a field of MultiTableStreamWriteOperatorCoordinator? serializing data change event to HoodieFlinkInternalRow which are then dispatched to corresponding table write functions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// - Data events go to their specific bucket's task
DataStream<BucketWrapper> partitionedStream =
bucketAssignedStream.partitionCustom(
(key, numPartitions) -> key % numPartitions,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should also consider data skew problem since there are records from multiple table & partitions. You can refer to BucketIndexUtil#getPartitionIndexFunc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

DataChangeEvent dataChangeEvent, Schema schema) {
List<String> partitionKeys = schema.partitionKeys();
if (partitionKeys == null || partitionKeys.isEmpty()) {
return "default";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be "" here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, good catch, fixed.


/** Base infrastructures for streaming writer function to handle Events. */
public abstract class EventStreamWriteFunction extends AbstractStreamWriteFunction<Event>
implements EventProcessorFunction {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileId and instantTime are not required to construct HoodieFlinkInternalRow, these two fields are later set in defineRecordLocation().

}

/**
* Calculate bucket from HoodieFlinkInternalRow using the record key. The record key is already
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to support bucket byhoodie.bucket.index.hash.field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet, was planning on standardising eveyrthing to use record keys first. Since there is an orthogonal discussion on config, i wanted to leave this out for a separate exercise.

String instantTime) {

// Extract record key from primary key fields
String recordKey = extractRecordKeyFromDataChangeEvent(dataChangeEvent, schema);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use RowDataKeyGen to get record key and partition path directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@voonhous voonhous force-pushed the hudi-connector-rework-push-to-origin branch from 9f52239 to 6254c7f Compare November 5, 2025 06:53
Copy link

@cshuo cshuo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @voonhous for updating, overall LGTM now. Left some minor comments.


public void endInput() {
super.endInput();
flushRemaining(true);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushRemaining(true); is not needed here?

* <p>Implements {@link EventProcessorFunction#processDataChange(DataChangeEvent)}.
*/
@Override
public void processDataChange(DataChangeEvent event) throws Exception {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method seems useless. Add the following processDataChange method in line 356 as an api in EventProcessorFunction?

});
// Ensure tableFunction is initialized
getOrCreateTableFunction(tableId);
} else if (event instanceof SchemaChangeEvent) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

event is always an SchemaChangeEvent

}
}

public static void createHudiTablePath(Configuration config, TableId tableId)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableId is not used.

throw new RuntimeException(
"Failed during first-time initialization for table: " + tId,
e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initializedTables.put(tableId, true); should be put here instead of inside getOrCreateTableFunction?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initializedTables.computeIfAbsent(
      tableId,
      tId -> {
          return true;  // - This value gets inserted as initializedTables.put(tableId, true)
});

The computeIfAbsent will insert as true, no need to put again.

Example:
image

I have fixed the relevant calls and lifecycle management of this map.

}

private MultiTableWriteOperator(
Configuration config,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config is not used.

};
break;
case TINYINT:
fieldGetter = row -> row.getBoolean(fieldPos);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getByte(fieldPos)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same underlying implementation with more validation, let's keep it.

int bucketNumber = BucketIdentifier.getBucketId(recordKey, tableIndexKeyFields, numBuckets);

// Use partition function to map bucket to task index for balanced distribution
int taskIndex = partitionIndexFunc.apply(numBuckets, partition, bucketNumber);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionIndexFunc from hudi repo is designed for single table, here records may come from different tables, so maybe we can use tableId + "_" + partition instead of partition here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, good catch!

.validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES);

FactoryHelper.DefaultContext factoryContext = (FactoryHelper.DefaultContext) context;
Configuration config = factoryContext.getFactoryConfiguration();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some validity check here? like index type check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a validation to ensure that index is BUCKET as that is the only index we are supporting.


<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.20.x</artifactId>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use ${flink.major.version}?


public MultiTableStreamWriteOperatorCoordinator(Configuration conf, Context context) {
super(conf, context);
conf.setString("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this config used for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, was trying to fix some URI issue, but the rootcause of the problem was actually with classloader, will remove.

// Ensure the table's filesystem structure exists before creating a client.
StreamerUtil.initTableIfNotExists(tableConfig);
HoodieFlinkWriteClient<?> writeClient =
FlinkWriteClients.createWriteClient(tableConfig);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlinkWriteClients.createWriteClient(tableConfig) is used for driver, embedded timeline service is hard coded as true, is that as expected?

Copy link
Member Author

@voonhous voonhous Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My chain of thought is that it doesn't really matter. I have already created a coordinator writeclient which is running an embedded timeline server in which all the TM will connect to.

For other tableConfigs, i wanted to set embedded timeline service as false so that they do not start an embedded timeline server. FWIU, these writeclients will mainly be used for making commits to timeline. And do not need any coordination, so they can just use an MEMORY based timeline server/filesystem view, in which they will refresh the file system view before commit, which might be safer.

// The baseConfig points to the dummy coordinator path.
// A .hoodie directory is required for the timeline server to start.
StreamerUtil.initTableIfNotExists(this.baseConfig);
this.timelineServerClient = FlinkWriteClients.createWriteClient(this.baseConfig);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write client for each table on the writer task will load view storage conf to get the config for the remote timeline service, and the view storage conf is located in table_base_path/.hoodie/.aux/view_storage_conf, not sure whether the view storage conf file for each table is properly created.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the "coordinator" writeclient, it will start an embedded timeline server and all the other tables will use this "coordintor"'s timeline server for FileSystemView requests.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean write client in writers are getting the ip/port conf of the timeline server through FileSystemViewStorageConfig, so each table should save view storage properties properly in coordinator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed the comments and manually verified that the view_storage_conf are both pointing to the same timeline instance:

image

Copy link

@cshuo cshuo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous Thanks for updating, LGTM. Left some minor comments. cc @danny0405 also take a look

LOG.error("Update table for CDC failed.", e);
throw e;
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change.

recordsInSnapshotPhase =
recordsInSnapshotPhase.stream().sorted().collect(Collectors.toList());
validateSinkResult(warehouse, database, "products", recordsInSnapshotPhase);
Thread.sleep(3600000L);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change.


if (tableId == null) {
LOG.warn("No tableId found for path: {}. Cannot process event.", tablePath);
return;
Copy link

@cshuo cshuo Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an unexpected case, should also fail the job here?

JobStatus jobStatus = message.getJobState();
if (!expectedStatus.isTerminalState() && jobStatus.isTerminalState()) {
try {
Thread.sleep(50000);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it was for debugging.

"taskmanager.numberOfTaskSlots: 10",
"parallelism.default: 4",
"execution.checkpointing.interval: 300",
"execution.checkpointing.interval: 30s",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it was for debugging.

@voonhous voonhous force-pushed the hudi-connector-rework-push-to-origin branch 2 times, most recently from ad7a2e7 to 317439f Compare November 7, 2025 10:08
@voonhous voonhous force-pushed the hudi-connector-rework-push-to-origin branch from 317439f to 13a2921 Compare November 7, 2025 10:10
@voonhous voonhous force-pushed the hudi-connector-rework-push-to-origin branch 7 times, most recently from 249fa2b to ffce04e Compare November 21, 2025 08:10
@voonhous voonhous force-pushed the hudi-connector-rework-push-to-origin branch from ffce04e to db61821 Compare November 21, 2025 08:36
@voonhous
Copy link
Member Author

@lvyanquan @yuxiqian Bumped the version, but am not sure why the CI is failing, the E2E tests are passing locally:

image

Can you please advise and help to review? Thank you!

@lvyanquan
Copy link
Contributor

We've re-triggered the CI tests, and a checkstyle issue was reported. You can fix it and trigger a new test run once it's resolved.

@cshuo
Copy link

cshuo commented Nov 24, 2025

@lvyanquan style issue is fixed, thks for helping.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants