Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Iceberg table sort orders #21977

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

evanvdia
Copy link
Contributor

@evanvdia evanvdia commented Feb 21, 2024

Description

Add Support for Iceberg connector to create sorted files. The sort order can be configured with the sorted_by table property. When creating the table, can specify an array of one or more columns to use for sorting.

Cherry-pick of trinodb/trino#14891

Motivation and Context

Issue : #21978

Test Plan

  • Added tests for sorted_by operations

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

# Iceberg
* Support for Iceberg table sort orders. Tables can be created to add a list of `sorted_by` columns which will be used to order files written to the table.

@evanvdia evanvdia requested a review from a team as a code owner February 21, 2024 08:40
@evanvdia evanvdia requested a review from presto-oss February 21, 2024 08:40
Copy link

linux-foundation-easycla bot commented Feb 21, 2024

CLA Signed

The committers listed above are authorized under a signed CLA.

  • ✅ login: evanvdia / name: Nidhin Varghese (c4d0805)

Copy link

github-actions bot commented Feb 21, 2024

Codenotify: Notifying subscribers in CODENOTIFY files for diff b230be1...c4d0805.

No notifications.

Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the documentation! I had a few suggestions to help readability, and one question about the formatting of one of the two code examples.

Also please sign the Presto CLA by selecting the "Please click here to be authorized" link in the earlier comment.

presto-docs/src/main/sphinx/connector/iceberg.rst Outdated Show resolved Hide resolved
presto-docs/src/main/sphinx/connector/iceberg.rst Outdated Show resolved Hide resolved
presto-docs/src/main/sphinx/connector/iceberg.rst Outdated Show resolved Hide resolved
presto-docs/src/main/sphinx/connector/iceberg.rst Outdated Show resolved Hide resolved
presto-docs/src/main/sphinx/connector/iceberg.rst Outdated Show resolved Hide resolved
@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch 2 times, most recently from d7a8899 to b65eae5 Compare February 22, 2024 11:16
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes! This is better. I found a nit near the beginning, and I ask if you would recheck the formatting of the first code block example.

presto-docs/src/main/sphinx/connector/iceberg.rst Outdated Show resolved Hide resolved
@tdcmeehan tdcmeehan added the iceberg Apache Iceberg related label Feb 22, 2024
@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch from 8c6b521 to 57e5e2a Compare February 23, 2024 14:15
steveburnett
steveburnett previously approved these changes Feb 23, 2024
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pull of updated branch, new local build of docs. Thanks for the changes!

@hantangwangd hantangwangd self-requested a review February 24, 2024 01:16
@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch from 57e5e2a to 07ed82a Compare February 27, 2024 10:11
Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a first look, some problems for discussion.

Comment on lines +1157 to +1886
CREATE TABLE emp.employees.employee (
emp_id BIGINT,
emp_name VARCHAR,
join_date DATE,
country VARCHAR)
WITH (
sorted_by = ARRAY['join_date']
)

Sorting can be combined with partitioning on the same column. For example::

CREATE TABLE emp.employees.employee (
emp_id BIGINT,
emp_name VARCHAR,
join_date DATE,
country VARCHAR)
WITH (
partitioning = ARRAY['month(join_date)'],
sorted_by = ARRAY['join_date']
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add description about the message like DESC NULLS FIRST that may appear after join_date?

Comment on lines 189 to 317
icebergTable.sortOrder().fields().stream()
.map(SortField::fromIceberg)
.collect(toImmutableList()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems haven't support IcebergNativeMetadata. Maybe try use transaction.replaceSortOrder() to support sort_by in IcebergNativeMetadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will analyse and work on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 78 to 89

@Test
public void testSortByAllTypes()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move the test cases to IcebergDistributedTestBase after support IcebergNativeMetadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will work on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the test cases to IcebergDistributedTestBase

Comment on lines 157 to 167
assertUpdate("INSERT INTO " + tableName + " VALUES " + values + ", " + highValues + ", " + lowValues, 3);
dropTable(getSession(), tableName);
Copy link
Member

@hantangwangd hantangwangd Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add more test cases to illustrate the function of sort_by, for example the data in written files are indeed sorted. What I feel from the current test cases is that we are mainly doing the syntax support testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will add test cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd Added test cases.

@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch from 07ed82a to 30ac341 Compare March 12, 2024 04:45
@tdcmeehan tdcmeehan requested a review from hantangwangd May 2, 2024 20:30
@tdcmeehan tdcmeehan self-assigned this May 2, 2024
@hantangwangd
Copy link
Member

Hi @evanvdia, just wanted to confirm that are you still working on this?

@evanvdia
Copy link
Contributor Author

evanvdia commented May 3, 2024

Hi @hantangwangd, yes. working on this.

@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch 2 times, most recently from 7b3bd0f to 3b6903b Compare May 23, 2024 11:43
@tdcmeehan
Copy link
Contributor

@evanvdia is this ready for review? There is a merge conflict.

@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch from 3b6903b to ef6df97 Compare June 5, 2024 16:23
@evanvdia
Copy link
Contributor Author

evanvdia commented Jun 5, 2024

@tdcmeehan Merge conflicts are resolved. But iceberg unit test cases are failing after rebasing.

image

Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, please give us a notice when this PR is ready for review, thanks.

continue;
}

Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), transformedPage, position);
WriteContext writer = createWriter(partitionData);
Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), page, position);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), page, position);
Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), transformedPage, position);

After check the code, I think the problem is introduced by this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hantangwangd . that was the issue. done the changes. Will let you know once PR is ready for review.

@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch from ef6df97 to 32840e1 Compare June 11, 2024 15:29
@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch from 32840e1 to 5e60f75 Compare July 12, 2024 11:45
@evanvdia evanvdia requested a review from elharo as a code owner July 12, 2024 11:45
@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch from 6b7e988 to 34c3810 Compare August 19, 2024 11:54
@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch from 34c3810 to 763dc56 Compare September 18, 2024 14:42
@tdcmeehan
Copy link
Contributor

@evanvdia I see you have pushed a change. Please, when it's ready for review, ping the reviewers when the comments have been addressed.

@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch from 763dc56 to be6f4c7 Compare November 13, 2024 11:47
@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch from be6f4c7 to 4f3e8e0 Compare November 25, 2024 11:22
@ZacBlanco
Copy link
Contributor

@evanvdia can you investigate the unit test failures? There are quite a few in the Iceberg module likely caused by these new changes:

2024-11-25T11:38:26.8478816Z [ERROR] Failures: 
2024-11-25T11:38:26.8481311Z [ERROR]   TestIcebergTableChangelog.init:46->AbstractTestQueryFramework.init:85->createQueryRunner:36 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8485491Z [ERROR]   TestIcebergDistributedHadoop>AbstractTestQueryFramework.init:85->IcebergDistributedTestBase.createQueryRunner:172 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8490199Z [ERROR]   TestIcebergHadoopCatalogDistributedQueries>AbstractTestQueryFramework.init:85->TestIcebergDistributedQueries.createQueryRunner:59 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8493872Z [ERROR]   TestIcebergSmokeHadoop>AbstractTestQueryFramework.init:85->IcebergDistributedSmokeTestBase.createQueryRunner:76 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8498088Z [ERROR]   TestIcebergDistributedNessie.init:57->AbstractTestQueryFramework.init:85->createQueryRunner:72 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8501229Z [ERROR]   TestIcebergNessieCatalogDistributedQueries.init:44->AbstractTestQueryFramework.init:85->createQueryRunner:59 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8504329Z [ERROR]   TestIcebergSmokeNessie.init:67->AbstractTestQueryFramework.init:85->createQueryRunner:98 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8507895Z [ERROR]   TestIcebergSystemTablesNessie.init:51->TestIcebergSystemTables.init:79->AbstractTestQueryFramework.assertUpdate:260->AbstractTestQueryFramework.assertUpdate:265 » Runtime Cannot commit transaction: last operation has not committed
2024-11-25T11:38:26.8511446Z [ERROR]   TestNessieMultiBranching.init:57->AbstractTestQueryFramework.init:85->createQueryRunner:90 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8514631Z [ERROR]   TestExpireSnapshotProcedure>AbstractTestQueryFramework.init:85->createQueryRunner:60 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8517328Z [ERROR]   TestFastForwardBranchProcedure>AbstractTestQueryFramework.init:85->createQueryRunner:47 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8520734Z [ERROR]   TestRemoveOrphanFilesProcedureHadoop>AbstractTestQueryFramework.init:85->TestRemoveOrphanFilesProcedureBase.createQueryRunner:85 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8524102Z [ERROR]   TestRollbackToTimestampProcedure>AbstractTestQueryFramework.init:85->createQueryRunner:58 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8527032Z [ERROR]   TestSetCurrentSnapshotProcedure>AbstractTestQueryFramework.init:85->createQueryRunner:48 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8529970Z [ERROR]   TestSetTablePropertyProcedure>AbstractTestQueryFramework.init:85->createQueryRunner:48 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8535351Z [ERROR]   TestIcebergDistributedRest.init:73->AbstractTestQueryFramework.init:85->createQueryRunner:95 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8538451Z [ERROR]   TestIcebergRestCatalogDistributedQueries.init:59->AbstractTestQueryFramework.init:85->createQueryRunner:76 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8541916Z [ERROR]   TestIcebergSmokeRest.init:80->AbstractTestQueryFramework.init:85->createQueryRunner:103 » Runtime Cannot create new AppendFiles: last operation has not committed
2024-11-25T11:38:26.8543345Z [INFO] 
2024-11-25T11:38:26.8543868Z [ERROR] Tests run: 3333, Failures: 18, Errors: 0, Skipped: 2340
2024-11-25T11:38:26.8544525Z [INFO] 

Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to reach out if you need any assistance to support sort orders on IcebergNativeMetadata.

Comment on lines 316 to 317
transaction.replaceSortOrder().apply().fields().stream().map(SortField::fromIceberg)
.collect(toImmutableList()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't open a transaction action (here is ReplaceSortOrder) without committing it, this will prevent any subsequent actions in the same transaction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd Added support to sort orders on IcebergNativeMetadata. and moved the test cases to IcebergDistributedTestBase. But some of the test cases are failing.
image

@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch from 4f3e8e0 to 5af2017 Compare December 10, 2024 14:44
@evanvdia
Copy link
Contributor Author

@ZacBlanco This unit test cases #21977 (comment) failures are fixed.

Comment on lines 308 to 361
ReplaceSortOrder replaceSortOrder = transaction.replaceSortOrder();
List<SortField> sortFields = replaceSortOrder.apply().fields().stream().map(SortField::fromIceberg)
.collect(toImmutableList());
try {
replaceSortOrder.commit();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ReplaceSortOrder replaceSortOrder = transaction.replaceSortOrder();
List<SortField> sortFields = replaceSortOrder.apply().fields().stream().map(SortField::fromIceberg)
.collect(toImmutableList());
try {
replaceSortOrder.commit();
SortOrder sortOrder = parseSortFields(schema, getSortOrder(tableMetadata.getProperties()));
ReplaceSortOrder replaceSortOrder = transaction.replaceSortOrder();
List<org.apache.iceberg.SortField> icebergSortFields = sortOrder.fields();
List<SortField> sortFields = icebergSortFields.stream().map(SortField::fromIceberg)
.collect(toImmutableList());
try {
for (org.apache.iceberg.SortField sortField : icebergSortFields) {
if (sortField.direction() == SortDirection.ASC) {
replaceSortOrder.asc(schema.findColumnName(sortField.sourceId()), sortField.nullOrder());
}
else {
replaceSortOrder.desc(schema.findColumnName(sortField.sourceId()), sortField.nullOrder());
}
}

I think a feasible way could go like this, although not carefully check whether it's the best. The core point is, we should get sortOrder from tableMetadata and then set it into the create table transaction through replaceSortOrder. You can have a try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd Done the changes.

@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch 2 times, most recently from 286cc5e to 9c96c62 Compare December 11, 2024 19:10
Copy link
Contributor

@ZacBlanco ZacBlanco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to do a better job testing and documenting the limitations (e.g. ORC only for sorting from what I can tell?).

Also, what happens if you use parquet and try to create and insert to a table table with a sort order? What about if sorted writing is disabled, but the table specifies a sort order. Is an error thrown? We should have a test case which clearly shows the expected behavior.

The Iceberg connector supports the creation of sorted tables.
Data in the Iceberg table is sorted as each file is written.

Sorted Iceberg tables can provide a huge increase in performance in query times.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Sorted Iceberg tables can provide a huge increase in performance in query times.
Sorted Iceberg tables can decrease query times in many cases; however, it will depend on the query shape and cluster configuration.

Let's not try to make bold claims. I would recommend leaving out the adjective "huge" and try to more objectively describe the feature. Especially when making performance claims, there are many factors which can increase (or decrease) performance. Users will get upset if sorted tables don't always improve performance :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZacBlanco It also supports sorting data in Parquet files.

Data sorted in PARQUET file format
image

Data sorted in ORC file format
image

If we set sorted_writing_enabled as false and table specifies a sort_by property, it will not throw any error but data in the files are unsorted.

Added test cases for scenarios where sorted writing is disabled, but the table specifies a sort order.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we set sorted_writing_enabled as false and table specifies a sort_by property, it will not throw any error but data in the files are unsorted.

Is there a reason that this should even be a configuration property and a table property? I don't think it really makes sense to have both. In the case where the configuration is disabled but it is set on the table property, we will potentially get incorrect results for later queries who expect the data in each file to be sorted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we removed the configuration property and sorting can now only be enabled or disabled through the table property.

return writerSortBufferSize;
}

@Config("writer-sort-buffer-size")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a description here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure @ZacBlanco.

Comment on lines 146 to 150
int maxOpenWriters,
List<SortField> sortOrder,
DataSize sortingFileWriterBufferSize,
int sortingFileWriterMaxOpenFiles,
TypeManager typeManager,
PageSorter pageSorter,
OrcFileWriterFactory orcFileWriterFactory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are quite a few fields related to sorting. I'm wondering if it might make sense to put them in their own class together.

Also, digging deeper, I realized that this seems to only be supported on ORC files due to the sorting file writer API. Have we considered enhancing this to support parquet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZacBlanco It also supports sorting data in Parquet files.

Do you really want me to do that class changes in this PR or can i open separate git issue to track this changes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should go into this PR.

Also, now that I'm looking more closely this, I'm wondering if it would be better to encapsulate the sorting parameters inside of IcebergFileWriterFactory instead of in the page sink. Since all of this logic has to do with generating sorting writers for both parquet and ORC types, I think it should probably not be inside the page sink since the sorting class is actually an implementation of the FileWriter interface.

I would consider updating the IcebergFileWriterFactory's createFileWriter method could accept some type of class (say, SortParameters) which will change the type of writer. This implementation should also be clearer how the sorting is done for ORC vs Parquet since the logic for handling different file formats is also done in the file writer factory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd createWriter() method is invoked when public CompletableFuture<?> appendPage(Page page) is called externally. I am not seeing any option to capture the sort params at this point. So only option is capture the sort params at the time of Icebergpagesink creation and use it in createWriter().

As you suggested i have grouped all the sort related config params to new class SortParameters.

Comment on lines 212 to 215
// We only support lowercase quoted identifiers for now.
// See https://github.com/trinodb/trino/issues/12226#issuecomment-1128839259
// TODO: Enhance quoted identifiers support in Iceberg partitioning to support mixed case identifiers
// See https://github.com/trinodb/trino/issues/12668
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help more clearly describe the limitation of this comment? Does this mean that identifiers in the CREATE TABLE specifying the sort order have to be quoted and lowercase?

We should clearly document this limitation to users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @ZacBlanco, It only supports lowercase quoted identifiers for now; otherwise, it throws an error, as shown in the following screenshot.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document this limitation and provide a test that asserts the failure

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the handling logic of the sort order column consistent with that of the table column? For example, when we create a table like this:

create table test("ID" int);

We will succeed with the column name lowercased to id. Is there a reason that the sort by column could not be handled like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd handled logic of the sort order column consistent with that of the table column.

Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix, the change overall looks good to me. Some little problems and nits.

Comment on lines 212 to 215
// We only support lowercase quoted identifiers for now.
// See https://github.com/trinodb/trino/issues/12226#issuecomment-1128839259
// TODO: Enhance quoted identifiers support in Iceberg partitioning to support mixed case identifiers
// See https://github.com/trinodb/trino/issues/12668
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the handling logic of the sort order column consistent with that of the table column? For example, when we create a table like this:

create table test("ID" int);

We will succeed with the column name lowercased to id. Is there a reason that the sort by column could not be handled like this?

@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch 3 times, most recently from 028f524 to 18b5d14 Compare December 23, 2024 05:11
@evanvdia
Copy link
Contributor Author

@ZacBlanco @hantangwangd The changes have been made. Could you kindly review the changes.

Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix, a few little problems, otherwise looks good to me.

Comment on lines 282 to 296
@Min(2)
@Max(1000)
public int getMaxOpenSortFiles()
{
return maxOpenSortFiles;
}

@Config("hive.max-open-sort-files")
@ConfigDescription("Maximum number of writer temporary files to read in one pass")
public HiveClientConfig setMaxOpenSortFiles(int maxOpenSortFiles)
{
this.maxOpenSortFiles = maxOpenSortFiles;
return this;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to move this code from below to here? Besides, since SortingFileWriterConfig has been created, would it be better to use its configuration properties uniformly? What do you think @ZacBlanco?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. This config should either be left in place or moved to SortingFileWriterConfig. I think it makes sense to move it to SortingFileWriterConfig.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZacBlanco @hantangwangd So will move this config to SortingFileWriterConfig class and can use this config for both Iceberg and Hive.

SortOrder sortOrder = icebergTable.sortOrder();
// TODO: Support sort column transforms (https://github.com/prestodb/presto/issues/24250)
if (sortOrder.isSorted() && hasIdentitySortColumnPresent(sortOrder)) {
List<String> sortColumnNames = toSortFields(sortOrder);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that when the sortOrder contains non-identity sorting columns, this code still throw an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @hantangwangd. I believe it will not throw an exception here, as it is already handled during table creation. An exception will be thrown if the sortOrder contains non-identity sorting columns. Therefore, I will remove the changes from here and keep it the same as before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @ZacBlanco mentioned, there can be tables created outside of Presto that we need to account for, these tables may have non-identity sorting columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd What I understood from the above comment is that there is no need to perform identity column validation and exception generation in the insert flow. I am assuming that Iceberg tables can be created from an external engine that does not perform this validation. I will remove the above check from the insert flow. Please correct me if I am wrong.

Comment on lines +349 to +351
List<org.apache.iceberg.SortField> icebergSortFields = sortOrder.fields();
List<SortField> sortFields = getSupportedSortFields(icebergTable.schema(), sortOrder);
for (org.apache.iceberg.SortField sortField : icebergSortFields) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we traverse the sort fields in sortFields here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd I didn't fully understand the request. Could you please elaborate?

As per current implementation, SortFileds collection only contain identity columns , so no need of extra traversal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that we should traverse sortFields rather than icebergSortFields here.

Comment on lines +78 to +85
public void testParseLowerCase()
{
// lowercase
assertParse("order_key asc nulls last", sortOrder(builder -> builder.asc("order_key", NullOrder.NULLS_LAST)));
assertParse("order_key desc nulls first", sortOrder(builder -> builder.desc("order_key", NullOrder.NULLS_FIRST)));
assertParse("\"order_key\" asc nulls last", sortOrder(builder -> builder.asc("order_key", NullOrder.NULLS_LAST)));
assertParse("\"order_key\" desc nulls first", sortOrder(builder -> builder.desc("order_key", NullOrder.NULLS_FIRST)));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add back the test cases testParseUpperCase and testParseMixedCase and modify their assertion logic, rather than delete them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok @hantangwangd will add test case back.

Copy link
Contributor

@ZacBlanco ZacBlanco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few more things

private final DataSize sortingFileWriterBufferSize;
private final Integer sortingFileWriterMaxOpenFiles;
private final Path tempDirectory;
private final TypeManager typeManager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter is not used anywhere. Can you please remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZacBlanco Removed

Comment on lines 146 to 150
int maxOpenWriters,
List<SortField> sortOrder,
DataSize sortingFileWriterBufferSize,
int sortingFileWriterMaxOpenFiles,
TypeManager typeManager,
PageSorter pageSorter,
OrcFileWriterFactory orcFileWriterFactory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should go into this PR.

Also, now that I'm looking more closely this, I'm wondering if it would be better to encapsulate the sorting parameters inside of IcebergFileWriterFactory instead of in the page sink. Since all of this logic has to do with generating sorting writers for both parquet and ORC types, I think it should probably not be inside the page sink since the sorting class is actually an implementation of the FileWriter interface.

I would consider updating the IcebergFileWriterFactory's createFileWriter method could accept some type of class (say, SortParameters) which will change the type of writer. This implementation should also be clearer how the sorting is done for ORC vs Parquet since the logic for handling different file formats is also done in the file writer factory.

Comment on lines 282 to 296
@Min(2)
@Max(1000)
public int getMaxOpenSortFiles()
{
return maxOpenSortFiles;
}

@Config("hive.max-open-sort-files")
@ConfigDescription("Maximum number of writer temporary files to read in one pass")
public HiveClientConfig setMaxOpenSortFiles(int maxOpenSortFiles)
{
this.maxOpenSortFiles = maxOpenSortFiles;
return this;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. This config should either be left in place or moved to SortingFileWriterConfig. I think it makes sense to move it to SortingFileWriterConfig.

@@ -130,7 +130,7 @@ public void configure(Binder binder)
binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON);
newSetBinder(binder, DynamicConfigurationProvider.class);
configBinder(binder).bindConfig(HiveClientConfig.class);

configBinder(binder).bindConfig(SortingFileWriterConfig.class, "hive");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the correct method to bind the config. I think we just want bindConfig(SortingFileWriterConfig.class)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed prefix.

@@ -489,7 +494,32 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(ConnectorSession se
icebergTable.location(),
getFileFormat(icebergTable),
getCompressionCodec(session),
icebergTable.properties());
icebergTable.properties(),
getSupportedSortFields(icebergTable.schema(), icebergTable.sortOrder()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be throwing an exception on beginInsert if there are any unsupported sort fields/types, rather than just filtering them out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZacBlanco It is handled in while creating the table and will throw the exception if there are any unsupported sort fields/types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be tables created outside of Presto that we need to account for so we should be checking in beginInsert too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following content is described in iceberg's spec https://iceberg.apache.org/spec/#sorting, which makes me a little confuse:

... Writers should use this default sort order to sort the data on write, but are not required to if the default order is prohibitively expensive, as it would be for streaming writes.

It seems to be saying that there is no need to strictly follow the table's default sort order when actually data writing. If that's true, seems we shouldn't throw exception when there exists unsupported sort fields.

However, when check the code in Iceberg, I found that the data/delete files only contain sort order id as a whole sorting feature , which means data/delete files should strictly follow the whole sort order or not sorted at all instead of just a part of it. Because if not so, we wouldn't be able to distinguish which columns were actually sorted based solely on the data/delete files metadata.

Will make some deeper research to figure out this, and if there is any misunderstanding, please let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be saying that there is no need to strictly follow the table's default sort order when actually data writing. If that's true, seems we shouldn't throw exception when there exists unsupported sort fields.

I agree with this. I didn't read the spec closely enough. It seems like we shouldn't throw an exception. This unfortunately diminishes the value of the sort feature for the optimizer...

@@ -127,6 +131,7 @@ public void setup(Binder binder)
binder.bind(HdfsConfiguration.class).annotatedWith(ForMetastoreHdfsEnvironment.class).to(HiveCachingHdfsConfiguration.class).in(Scopes.SINGLETON);
binder.bind(HdfsConfiguration.class).annotatedWith(ForCachingFileSystem.class).to(HiveHdfsConfiguration.class).in(Scopes.SINGLETON);

configBinder(binder).bindConfig(SortingFileWriterConfig.class, "iceberg");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we need to pass the prefix to bindConfig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZacBlanco The prefix allows logical grouping of configuration properties under a specific namespace.

Since sort related config for both Iceberg and Hive moved to SortingFileWriterConfig class (#21977 (comment)) , we will remove the prefix from bindconfig.

@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch from 18b5d14 to 44f5ab7 Compare January 7, 2025 17:54
@evanvdia evanvdia force-pushed the iceberg_sorted_by_tbl_ppty branch from 44f5ab7 to c4d0805 Compare January 8, 2025 01:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
iceberg Apache Iceberg related
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants