-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-8401] Support KeepValues partial merge mode #13540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
ae85c2b to
c5464f7
Compare
| this.readerContext = readerContext; | ||
| this.partialUpdateMode = partialUpdateMode; | ||
| this.mergeProperties = parseMergeProperties(props); | ||
| if (partialUpdateMode == PartialUpdateMode.KEEP_VALUES) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the PartialUpdateMode.KEEP_VALUES mode is kind of orthoganal to existing partial merging logic, is it possible we add another two partial update BufferedRecordMergers and also a new #partialMerge method here so make the logic more clear.
| * @param mergedSchema The merged schema for the merged record. | ||
| * @return whether the Avro schema is partial compared to the merged schema. | ||
| */ | ||
| public static boolean isPartial(Schema schema, Schema mergedSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be very costly, we need to ensure both of these two schemas are cached by AvroSchemaCache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides cached by AvroSchemaCache, maybe we can also add some short-circuit logic, such as checking by comparing fields number firstly?
cshuo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#13498 is landed, this pr can be rebased on master.
| boolean enablePartialMerging) { | ||
| // Note that: When either newRecord or oldRecord is a delete record, | ||
| // skip partial update since delete records do not have meaningful columns. | ||
| if (partialUpdateMode == PartialUpdateMode.NONE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partialUpdateMode can not be PartialUpdateMode.NONE now, maybe we can add validating check for update mode in the constructor, and remove the check here.
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
Show resolved
Hide resolved
| * @param mergedSchema The merged schema for the merged record. | ||
| * @return whether the Avro schema is partial compared to the merged schema. | ||
| */ | ||
| public static boolean isPartial(Schema schema, Schema mergedSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides cached by AvroSchemaCache, maybe we can also add some short-circuit logic, such as checking by comparing fields number firstly?
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialMergingUtils.java
Show resolved
Hide resolved
| this.partialUpdateMode = partialUpdateMode; | ||
| this.mergeProperties = parseMergeProperties(props); | ||
| if (partialUpdateMode == PartialUpdateMode.KEEP_VALUES) { | ||
| partialMergingUtils = new PartialMergingUtils<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All methods in PartialMergingUtils can be static? Seems don't need to instantiate PartialMergingUtils object.
| // The merged schema contains fields that only appear in either older and/or newer record. | ||
| Schema mergedSchema = | ||
| getCachedMergedSchema(oldSchema, newSchema, readerSchema); | ||
| Map<String, Integer> fieldNameToIdFromNewRecordSchema = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since ids of fields are not used at all, we can use Schema.Field field = newSchema.getField(fieldName); to check existence instead, which is also a map#get(String) operation.
| Schema schema2 = schemaPair.getLeft().getRight(); | ||
| Schema refSchema = schemaPair.getRight(); | ||
| Map<String, Integer> nameToIdMapping1 = | ||
| getCachedFieldNameToIdMapping(schema1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use schema1.getFields().stream().map(Schema.Field::name).collect(Collectors.toSet()); to get the fields name set instead? Since this operation is not at record-level, and only occurs during cache miss for MERGED_SCHEMA_CACHE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems FIELD_NAME_TO_ID_MAPPING_CACHE can be removed then.
Change Logs
As title.
Impact
Simplify existing code path.
Risk level (write none, low medium or high below)
Medium.
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist