-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Add Hudi sink connector support #4164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Add Hudi sink connector support #4164
Conversation
|
@danny0405 @cshuo FYI |
|
Changes here will require Hudi 1.1.0 to be released first. |
cshuo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@voonhous thanks for the pr. Can you also describe the scope of pr for the Hudi CDC sink, e.g., what index types and table service(compaction) modes are supported.
| */ | ||
| private void processFlushForTableFunction( | ||
| EventBucketStreamWriteFunction tableFunction, Event flushEvent) { | ||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to use reflection now? call tableFunction.flushRemaining(false); directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| } | ||
|
|
||
| // Extract record key from event data using cached field getters | ||
| String recordKey = extractRecordKeyFromEvent(dataChangeEvent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
record key can be get from HoodieFlinkInternalRow directly by calling HoodieFlinkInternalRow#getRecordKey(). So extractRecordKeyFromEvent is unnecessary, and primaryKeyFieldGetters can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| /** Base infrastructures for streaming writer function to handle Events. */ | ||
| public abstract class EventStreamWriteFunction extends AbstractStreamWriteFunction<Event> | ||
| implements EventProcessorFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make minimal changes to StreamWriteFunction and BucketStreamWriteFunction, the generic type should kept as HoodieFlinkInternalRow. We can confine operations of Event within MultiTableEventStreamWriteFunction and StreamWriteFunction only need to provide the following operations:
- processData(HoodieFlinkInternalRow): DataChangeEvent can be converted to HoodieFlinkInternalRow in MultiTableEventStreamWriteFunction.
- flushRemaining(): called when flush event is received.
- updateSchema()?: when shema change event is received, and need update inner schema or related fields, like index fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems no need to implement EventProcessorFunction, actually processSchemaChange and processFlush of EventStreamWriteFunction will never be called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, i tried this, i remember what the problem was:
Event to HoodieFlinkInternalRow conversion in MultiTableEventStreamWriteFunction.
- HoodieFlinkInternalRow constructor requires fileId and instantTime upfront
- These values come from defineRecordLocation() which needs bucket number
- Cannot create HoodieFlinkInternalRow before calling defineRecordLocation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fileId and instantTime are not required to construct HoodieFlinkInternalRow, these two fields are later set in defineRecordLocation().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems no need to implement
EventProcessorFunction, actuallyprocessSchemaChangeandprocessFlushofEventStreamWriteFunctionwill never be called.
Caused by: java.lang.RuntimeException: Failed to process schema event for table: hudi_inventory_bptbsn.products
at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processSchemaChange(MultiTableEventStreamWriteFunction.java:296)
at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processElement(MultiTableEventStreamWriteFunction.java:167)
at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processElement(MultiTableEventStreamWriteFunction.java:72)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.cdc.connectors.hudi.sink.bucket.FlushEventAlignmentOperator.processElement(FlushEventAlignmentOperator.java:94)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsupportedOperationException: #processSchemaChange should not be called
at org.apache.flink.cdc.connectors.hudi.sink.function.EventBucketStreamWriteFunction.processSchemaChange(EventBucketStreamWriteFunction.java:158)
at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processSchemaChange(MultiTableEventStreamWriteFunction.java:293)
... 24 more
It is being invoked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| * <p>Assumes that CreateTableEvent will always arrive before DataChangeEvent for each table, | ||
| * following the standard CDC pipeline startup sequence. | ||
| */ | ||
| public class HudiRecordEventSerializer implements HudiRecordSerializer<Event> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems HudiRecordEventSerializer is designed to deal with serializing for multiple tables. Like comments in EventStreamWriteFunction, HudiRecordEventSerializer can be a field of MultiTableStreamWriteOperatorCoordinator? serializing data change event to HoodieFlinkInternalRow which are then dispatched to corresponding table write functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| // - Data events go to their specific bucket's task | ||
| DataStream<BucketWrapper> partitionedStream = | ||
| bucketAssignedStream.partitionCustom( | ||
| (key, numPartitions) -> key % numPartitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should also consider data skew problem since there are records from multiple table & partitions. You can refer to BucketIndexUtil#getPartitionIndexFunc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| DataChangeEvent dataChangeEvent, Schema schema) { | ||
| List<String> partitionKeys = schema.partitionKeys(); | ||
| if (partitionKeys == null || partitionKeys.isEmpty()) { | ||
| return "default"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be "" here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, good catch, fixed.
|
|
||
| /** Base infrastructures for streaming writer function to handle Events. */ | ||
| public abstract class EventStreamWriteFunction extends AbstractStreamWriteFunction<Event> | ||
| implements EventProcessorFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fileId and instantTime are not required to construct HoodieFlinkInternalRow, these two fields are later set in defineRecordLocation().
| } | ||
|
|
||
| /** | ||
| * Calculate bucket from HoodieFlinkInternalRow using the record key. The record key is already |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we going to support bucket byhoodie.bucket.index.hash.field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet, was planning on standardising eveyrthing to use record keys first. Since there is an orthogonal discussion on config, i wanted to leave this out for a separate exercise.
| String instantTime) { | ||
|
|
||
| // Extract record key from primary key fields | ||
| String recordKey = extractRecordKeyFromDataChangeEvent(dataChangeEvent, schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use RowDataKeyGen to get record key and partition path directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
9f52239 to
6254c7f
Compare
cshuo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @voonhous for updating, overall LGTM now. Left some minor comments.
|
|
||
| public void endInput() { | ||
| super.endInput(); | ||
| flushRemaining(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushRemaining(true); is not needed here?
| * <p>Implements {@link EventProcessorFunction#processDataChange(DataChangeEvent)}. | ||
| */ | ||
| @Override | ||
| public void processDataChange(DataChangeEvent event) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method seems useless. Add the following processDataChange method in line 356 as an api in EventProcessorFunction?
| }); | ||
| // Ensure tableFunction is initialized | ||
| getOrCreateTableFunction(tableId); | ||
| } else if (event instanceof SchemaChangeEvent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
event is always an SchemaChangeEvent
| } | ||
| } | ||
|
|
||
| public static void createHudiTablePath(Configuration config, TableId tableId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tableId is not used.
| throw new RuntimeException( | ||
| "Failed during first-time initialization for table: " + tId, | ||
| e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initializedTables.put(tableId, true); should be put here instead of inside getOrCreateTableFunction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
|
|
||
| private MultiTableWriteOperator( | ||
| Configuration config, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config is not used.
| }; | ||
| break; | ||
| case TINYINT: | ||
| fieldGetter = row -> row.getBoolean(fieldPos); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getByte(fieldPos)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same underlying implementation with more validation, let's keep it.
| int bucketNumber = BucketIdentifier.getBucketId(recordKey, tableIndexKeyFields, numBuckets); | ||
|
|
||
| // Use partition function to map bucket to task index for balanced distribution | ||
| int taskIndex = partitionIndexFunc.apply(numBuckets, partition, bucketNumber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionIndexFunc from hudi repo is designed for single table, here records may come from different tables, so maybe we can use tableId + "_" + partition instead of partition here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, good catch!
| .validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES); | ||
|
|
||
| FactoryHelper.DefaultContext factoryContext = (FactoryHelper.DefaultContext) context; | ||
| Configuration config = factoryContext.getFactoryConfiguration(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some validity check here? like index type check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a validation to ensure that index is BUCKET as that is the only index we are supporting.
|
|
||
| <dependency> | ||
| <groupId>org.apache.hudi</groupId> | ||
| <artifactId>hudi-flink1.20.x</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use ${flink.major.version}?
|
|
||
| public MultiTableStreamWriteOperatorCoordinator(Configuration conf, Context context) { | ||
| super(conf, context); | ||
| conf.setString("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's this config used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, was trying to fix some URI issue, but the rootcause of the problem was actually with classloader, will remove.
| // Ensure the table's filesystem structure exists before creating a client. | ||
| StreamerUtil.initTableIfNotExists(tableConfig); | ||
| HoodieFlinkWriteClient<?> writeClient = | ||
| FlinkWriteClients.createWriteClient(tableConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FlinkWriteClients.createWriteClient(tableConfig) is used for driver, embedded timeline service is hard coded as true, is that as expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My chain of thought is that it doesn't really matter. I have already created a coordinator writeclient which is running an embedded timeline server in which all the TM will connect to.
For other tableConfigs, i wanted to set embedded timeline service as false so that they do not start an embedded timeline server. FWIU, these writeclients will mainly be used for making commits to timeline. And do not need any coordination, so they can just use an MEMORY based timeline server/filesystem view, in which they will refresh the file system view before commit, which might be safer.
| // The baseConfig points to the dummy coordinator path. | ||
| // A .hoodie directory is required for the timeline server to start. | ||
| StreamerUtil.initTableIfNotExists(this.baseConfig); | ||
| this.timelineServerClient = FlinkWriteClients.createWriteClient(this.baseConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write client for each table on the writer task will load view storage conf to get the config for the remote timeline service, and the view storage conf is located in table_base_path/.hoodie/.aux/view_storage_conf, not sure whether the view storage conf file for each table is properly created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the "coordinator" writeclient, it will start an embedded timeline server and all the other tables will use this "coordintor"'s timeline server for FileSystemView requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean write client in writers are getting the ip/port conf of the timeline server through FileSystemViewStorageConfig, so each table should save view storage properties properly in coordinator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cshuo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@voonhous Thanks for updating, LGTM. Left some minor comments. cc @danny0405 also take a look
| LOG.error("Update table for CDC failed.", e); | ||
| throw e; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change.
| recordsInSnapshotPhase = | ||
| recordsInSnapshotPhase.stream().sorted().collect(Collectors.toList()); | ||
| validateSinkResult(warehouse, database, "products", recordsInSnapshotPhase); | ||
| Thread.sleep(3600000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change.
|
|
||
| if (tableId == null) { | ||
| LOG.warn("No tableId found for path: {}. Cannot process event.", tablePath); | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an unexpected case, should also fail the job here?
| JobStatus jobStatus = message.getJobState(); | ||
| if (!expectedStatus.isTerminalState() && jobStatus.isTerminalState()) { | ||
| try { | ||
| Thread.sleep(50000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, it was for debugging.
| "taskmanager.numberOfTaskSlots: 10", | ||
| "parallelism.default: 4", | ||
| "execution.checkpointing.interval: 300", | ||
| "execution.checkpointing.interval: 30s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, it was for debugging.
ad7a2e7 to
317439f
Compare
317439f to
13a2921
Compare
249fa2b to
ffce04e
Compare
ffce04e to
db61821
Compare
|
@lvyanquan @yuxiqian Bumped the version, but am not sure why the CI is failing, the E2E tests are passing locally:
Can you please advise and help to review? Thank you! |
|
We've re-triggered the CI tests, and a checkstyle issue was reported. You can fix it and trigger a new test run once it's resolved. |
|
@lvyanquan style issue is fixed, thks for helping. |



This PR adds Hudi CDC sink support to Flink CDC.
As of now the following features are supported:
Future improvements will be made to bring along more future support other native hudi features gradually/iteratively as we are trying to keep the PR small and manageable for reviews.