-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Fix instant file name path retrieval in Hudi Active Timeline #18213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix instant file name path retrieval in Hudi Active Timeline #18213
Conversation
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Attila Tóth.
|
32a55c9
to
88124ab
Compare
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Attila Tóth.
|
88124ab
to
60d2776
Compare
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
plugin/trino-hudi/pom.xml
Outdated
@@ -374,6 +374,13 @@ | |||
<scope>test</scope> | |||
</dependency> | |||
|
|||
<dependency> | |||
<groupId>org.mockito</groupId> | |||
<artifactId>mockito-core</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We basically don't use mockito in tests. Please fix BaseHudiConnectorTest
or TestHudiSparkCompatibility
(or both).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll fix that and append the existing suites.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test in trino-hudi
is ready. I have created a simple copy of the already existing test table stock_ticks_cow
in which the Hudi timeline has replacecommit
actions, that is present in the patchset. I have confirmed that without the code change the test is also failing exactly with the above mentioned exception. Please let me know what do you think about this solution/direction.
Still need some time to familiarize myself with the product tests and add a test case there as well for this fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I extended TestHudiSparkCompatibility
in the latest patchset, verifying that it also fails with the No scheme for file system location:
exception above. Modified the setup method as well to have a default value for S3_BUCKET
if that is not specified externally. Please let me know what do you think about this solution in overall.
Signing of the CLA is underway, I will send it in the upcoming days.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ebyhr Are you OK with the proposed changes? Thanks for taking a look again at this!
60d2776
to
2046013
Compare
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
2046013
to
410a077
Compare
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
410a077
to
4f5a8b4
Compare
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
Hey @ebyhr! Thanks for looking at this change! I'm ready to work on this more if needed - what do you think about the latest state of the PR? I have already sent the signed CLA on the 13th of July already. |
.../test/resources/hudi-testing-data/stock_ticks_cow_clustered/.hoodie/20230712105030944.commit
Outdated
Show resolved
Hide resolved
...rino-product-tests/src/main/java/io/trino/tests/product/hudi/TestHudiSparkCompatibility.java
Outdated
Show resolved
Hide resolved
createNonPartitionedTableWithReplaceCommit(tableName, COW_TABLE_TYPE); | ||
|
||
try { | ||
onTrino().executeQuery("SELECT id, name FROM hudi.default." + tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please verify the SELECT result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - what I want to see here though is that the query passes. If it does, then replace commits were parsed in the timeline successfully. Please let me know if you thought about some additional verification here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please verify the rows using containsOnly
instead of hasAnyRows
.
...rino-product-tests/src/main/java/io/trino/tests/product/hudi/TestHudiSparkCompatibility.java
Outdated
Show resolved
Hide resolved
plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/timeline/HudiActiveTimeline.java
Outdated
Show resolved
Hide resolved
...rino-product-tests/src/main/java/io/trino/tests/product/hudi/TestHudiSparkCompatibility.java
Outdated
Show resolved
Hide resolved
4f5a8b4
to
f5c27f8
Compare
8f53bae
to
b99ac1a
Compare
@ebyhr Thanks for all the comments and suggestions! I tried to answer them accordingly, please let me know what do you think. I have also verified that tests were failing if the code change in |
return Location.of(fileName.contains(SCHEMA_COMMIT_ACTION) ? metaClient.getSchemaFolderName() : metaClient.getMetaPath().path()).appendPath(fileName); | ||
Location metaPath = metaClient.getMetaPath(); | ||
if (fileName.contains(SCHEMA_COMMIT_ACTION)) { | ||
return metaPath.appendPath(HudiTableMetaClient.SCHEMA_FOLDER_NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is .appendPath(fileName)
missing? Do existing tests cover this condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added the appendPath(fileName)
to the schema commit path of the condition, it was clearly missing from there.
In order to test this, we need a second table with at least one schemacommit
action in the Hudi timeline. Will work on preparing that as well in this patchset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please note my comment below regarding this.
@@ -0,0 +1,74 @@ | |||
# Context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this README file! Is it possible to create the table with SQL? We usually use SQL when creating the resource likes plugin/trino-delta-lake/src/test/resources
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean Spark SQL instead of this minimal Spark pipeline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the README.md
file accordingly. When the schemacommit
test is ready, I will also add a README.md
there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please note my comment below regarding this.
createNonPartitionedTableWithReplaceCommit(tableName, COW_TABLE_TYPE); | ||
|
||
try { | ||
onTrino().executeQuery("SELECT id, name FROM hudi.default." + tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please verify the rows using containsOnly
instead of hasAnyRows
.
5c41bdc
to
eaac554
Compare
Regarding the comment
The code has been changed accordingly, but the test table contains 4 rows instead of 2. Will analyze the root cause of that as well. For now the test has this but I would like to understand the root cause of this. |
I merged the two test tables to one (now that one has During the creation of the testcases, I also noticed a different fault when handling If you agree with having the Otherwise, I can also work on restoring the test suite to contain just the Thanks in advance for looking at this once again! |
One more addition: it bugged me so I quickly looked at the option of altering the needed configuration for the |
After spending some time on analyzing the code further about the schema commit support in In theory future users of the Hudi timeline interface might benefit from this feature (get proper filenames for schema commits as well, which reside under the Given this, I would suggest to remove the schema tests altogether, and fall back to just the replace commit tests, which clearly reproduced the original issue. That would make the test system simpler, we can just use the product tests if needed to test this (so that a large number of files do not need to be added to |
// for ser/deser | ||
public HudiReplaceCommitMetadata() | ||
@JsonCreator(mode = JsonCreator.Mode.PROPERTIES) | ||
public HudiReplaceCommitMetadata(@JsonProperty("partitionToReplaceFileIds") final Map<String, List<String>> partitionToReplaceFileIds, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the 1st argument to next line and remove final.
public HudiReplaceCommitMetadata(
@JsonProperty("partitionToReplaceFileIds") Map<String, List<String>> partitionToReplaceFileIds,
@JsonProperty("compacted") Boolean compacted)
{
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a commit body to explain why this change is needed to "Fix replace commit handling in Hudi Active Timeline"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commit message body now has a brief reasoning about this change.
this.partitionToReplaceFileIds = partitionToReplaceFileIds; | ||
this.compacted = compacted; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these fields nulalble?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are not. If eg. partitionToReplaceFileIds
become null this code will throw an NPE (from HudiActiveTimeline::reetFileGroupsReplaced
):
// get replace instant mapping for each partition, fileId
return replaceMetadata.getPartitionToReplaceFileIds().entrySet().stream()
.flatMap(entry -> entry.getValue().stream().map(fileId ->
Map.entry(new HudiFileGroupId(entry.getKey(), fileId), instant)));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, it would be better to add ImmutableMap.copyOf
& requireNonNull
and change Boolean
to boolean
this.partitionToReplaceFileIds = ImmutableMap.copyOf(requireNonNull(partitionToReplaceFileIds, "partitionToReplaceFileIds is null"));
plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/table/HudiTableFileSystemView.java
Outdated
Show resolved
Hide resolved
@@ -155,6 +155,7 @@ private static void copyDir(Path srcDir, Path dstDir) | |||
public enum TestingTable | |||
{ | |||
HUDI_NON_PART_COW(nonPartitionRegularColumns()), | |||
HUDI_NON_PART_COW_CL_SC(schemaChangedNonPartitionedRegularColumns()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the meaning of CL and SC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CL means "clustered", while SC means "schema change" in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think code readers can understand the meaning. Please avoid abbreviations.
https://github.com/trinodb/trino/blob/master/.github/DEVELOPMENT.md#avoid-abbreviations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will remove the schema change tests altogether as requested anyways, and will change this as well in the next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed the abbreviation, the table is called HUDI_NON_PART_COW_CLUSTERING
(where COW
is a well-known Hudi concept).
plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java
Outdated
Show resolved
Hide resolved
@@ -0,0 +1 @@ | |||
{"schemas":[{"max_column_id":13,"version_id":20230815074720567,"type":"record","fields":[{"id":0,"name":"_hoodie_commit_time","optional":true,"type":"string","doc":""},{"id":1,"name":"_hoodie_commit_seqno","optional":true,"type":"string","doc":""},{"id":2,"name":"_hoodie_record_key","optional":true,"type":"string","doc":""},{"id":3,"name":"_hoodie_partition_path","optional":true,"type":"string","doc":""},{"id":4,"name":"_hoodie_file_name","optional":true,"type":"string","doc":""},{"id":5,"name":"rowId","optional":true,"type":"string"},{"id":6,"name":"partitionId","optional":true,"type":"string"},{"id":7,"name":"preComb","optional":true,"type":"long"},{"id":8,"name":"name","optional":true,"type":"string"},{"id":9,"name":"versionId","optional":true,"type":"string"},{"id":10,"name":"toBeDeletedStr","optional":true,"type":"string"},{"id":11,"name":"intToLong","optional":true,"type":"int"},{"id":12,"name":"longToInt","optional":true,"type":"long"},{"id":13,"name":"description","optional":true,"type":"string"}]},{"max_column_id":12,"version_id":0,"type":"record","fields":[{"id":0,"name":"_hoodie_commit_time","optional":true,"type":"string","doc":""},{"id":1,"name":"_hoodie_commit_seqno","optional":true,"type":"string","doc":""},{"id":2,"name":"_hoodie_record_key","optional":true,"type":"string","doc":""},{"id":3,"name":"_hoodie_partition_path","optional":true,"type":"string","doc":""},{"id":4,"name":"_hoodie_file_name","optional":true,"type":"string","doc":""},{"id":5,"name":"rowId","optional":true,"type":"string"},{"id":6,"name":"partitionId","optional":true,"type":"string"},{"id":7,"name":"preComb","optional":true,"type":"long"},{"id":8,"name":"name","optional":true,"type":"string"},{"id":9,"name":"versionId","optional":true,"type":"string"},{"id":10,"name":"toBeDeletedStr","optional":true,"type":"string"},{"id":11,"name":"intToLong","optional":true,"type":"int"},{"id":12,"name":"longToInt","optional":true,"type":"long"}]}]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove these file and use product test instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed the tests for the schema commits.
|
||
try { | ||
assertThat(onTrino().executeQuery("SELECT id, name FROM hudi.default." + tableName + ";")) | ||
.containsOnly(ImmutableList.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ImmutableList.of
is redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
aa1edd5
to
e01c2c7
Compare
Please let me know if any other things need to be done here. We are using a patched Trino image, but we would also would like to upgrade to an official Trino release having this fix soon. Thanks in advance! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change look good. Thanks @atezs82 for fixing the issue.
Please also include me as a reviewer for any change in Hudi connector.
this.partitionToReplaceFileIds = partitionToReplaceFileIds; | ||
this.compacted = compacted; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, it would be better to add ImmutableMap.copyOf
& requireNonNull
and change Boolean
to boolean
this.partitionToReplaceFileIds = ImmutableMap.copyOf(requireNonNull(partitionToReplaceFileIds, "partitionToReplaceFileIds is null"));
...rino-product-tests/src/main/java/io/trino/tests/product/hudi/TestHudiSparkCompatibility.java
Outdated
Show resolved
Hide resolved
CREATE TABLE hudi_non_part_cow | ||
USING hudi | ||
OPTIONS ( | ||
hoodie.table.name = 'hudi_non_part_cow', | ||
hoodie.datasource.write.recordkey.field = 'rowId', | ||
hoodie.datasource.write.precombine.field = 'preComb', | ||
hoodie.table.type = 'COPY_ON_WRITE' | ||
) | ||
LOCATION 'file:///<<< EXISTING TABLE LOCATION >>>' | ||
CREATE TABLE hudi_non_part_cow_clustered | ||
USING hudi | ||
OPTIONS ( | ||
hoodie.table.name = 'hudi_non_part_cow_clustered', | ||
hoodie.datasource.write.recordkey.field = 'rowId', | ||
hoodie.datasource.write.precombine.field = 'preComb', | ||
hoodie.table.type = 'COPY_ON_WRITE', | ||
hoodie.clustering.inline = 'true', | ||
hoodie.clustering.inline.max.commits = '1' | ||
) | ||
LOCATION 'file:///<<< NEW TABLE LOCATION >>>' | ||
AS SELECT * FROM hudi_non_part_cow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, why do we need this resource files instead of product tests? I think we can simply execute this CTAS statement in product tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was needed so that this special behavior (replace commit handling) is also tested in the Hudi connector, but I also agree on that this is still a bit more complex than it should be. I removed this test altogether from the Hudi connector and let the product tests test this behavior as well in the last changeset.
...rino-product-tests/src/main/java/io/trino/tests/product/hudi/TestHudiSparkCompatibility.java
Outdated
Show resolved
Hide resolved
The commit body is too long. Please follow https://github.com/trinodb/trino/blob/master/.github/DEVELOPMENT.md#format-git-commit-messages |
e01c2c7
to
64cc81f
Compare
Thanks for the review and the suggestions! I have shortened/slightly reformatted the commit message to contain the changes requested in #18213 (comment), but still have a shorter subject line (with a longer body explaining the exact changes done). Please let me know if that is acceptable. Regarding #18213 (comment), I have implemented the requested changes, on the other hand, changing the |
...rino-product-tests/src/main/java/io/trino/tests/product/hudi/TestHudiSparkCompatibility.java
Outdated
Show resolved
Hide resolved
plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiReplaceCommitMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiReplaceCommitMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiReplaceCommitMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/model/HudiReplaceCommitMetadata.java
Show resolved
Hide resolved
7082301
to
ba70065
Compare
ba70065
to
7eee37b
Compare
When reading Hudi tables with replace commits, the location of the Hudi commit file was assembled incorrectly in the Hudi Active Timeline logic. The Hudi replace commit metadata model object was also patched to deserialize Hudi metadata properly. This was needed so that Trino users (and the tests for the active timeline) do not see duplicated records for replace commits.
7eee37b
to
802b7c5
Compare
I also would like to thank @ebyhr for all the work he put into reviewing this change. |
Description
When submitting even a very simple query for certain types of Hudi tables (stored on S3) we were constantly receiving the following error:
Stacktrace taken from the Trino coordinator:
The issue could be reproduced for this table always, on the other hand, other tables were working just fine. We were using a simple SQL query
select * from test_catalog.test_schema.table limit 1;
, but more complex queries were also failing due to this error.This PR proposes a solution for the problem by slightly modifying the way Trino handles the Hudi timeline internally. Albeit the solution is simple, I have also added some unit test coverage about the fix. Please let me know your thoughts about it (I needed to include a Java mock library into Maven, this might not be desirable from overall project perspective).
Additional context and related issues
At some point during its operation, the Hudi connector tries to update ("reset") its internal state of the Hudi filesystem if "replaced instants" (files) are present on the Hudi timeline. This operation involves accessing the Hudi table's metadata folder on the object store. When this path segment is created, the corresponding
io.trino.filesystem.Location
class is created in a manner that it did not contain the scheme, just the path (see the code change below). This is forbidden in the code of that class (since it has to adhere to the formatscheme://[userInfo@]host[:port][/path]
), therefore the mentioned exception was thrown (refer to the Javadoc of theLocation
class for more information).Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: