-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Support for Iceberg table sort orders #21977
base: master
Are you sure you want to change the base?
Conversation
|
Codenotify: Notifying subscribers in CODENOTIFY files for diff b230be1...c4d0805. No notifications. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the documentation! I had a few suggestions to help readability, and one question about the formatting of one of the two code examples.
Also please sign the Presto CLA by selecting the "Please click here to be authorized" link in the earlier comment.
d7a8899
to
b65eae5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes! This is better. I found a nit near the beginning, and I ask if you would recheck the formatting of the first code block example.
8c6b521
to
57e5e2a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! (docs)
Pull of updated branch, new local build of docs. Thanks for the changes!
57e5e2a
to
07ed82a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take a first look, some problems for discussion.
CREATE TABLE emp.employees.employee ( | ||
emp_id BIGINT, | ||
emp_name VARCHAR, | ||
join_date DATE, | ||
country VARCHAR) | ||
WITH ( | ||
sorted_by = ARRAY['join_date'] | ||
) | ||
|
||
Sorting can be combined with partitioning on the same column. For example:: | ||
|
||
CREATE TABLE emp.employees.employee ( | ||
emp_id BIGINT, | ||
emp_name VARCHAR, | ||
join_date DATE, | ||
country VARCHAR) | ||
WITH ( | ||
partitioning = ARRAY['month(join_date)'], | ||
sorted_by = ARRAY['join_date'] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add description about the message like DESC NULLS FIRST
that may appear after join_date
?
icebergTable.sortOrder().fields().stream() | ||
.map(SortField::fromIceberg) | ||
.collect(toImmutableList())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems haven't support IcebergNativeMetadata
. Maybe try use transaction.replaceSortOrder()
to support sort_by in IcebergNativeMetadata
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will analyse and work on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
@Test | ||
public void testSortByAllTypes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move the test cases to IcebergDistributedTestBase
after support IcebergNativeMetadata
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will work on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the test cases to IcebergDistributedTestBase
assertUpdate("INSERT INTO " + tableName + " VALUES " + values + ", " + highValues + ", " + lowValues, 3); | ||
dropTable(getSession(), tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add more test cases to illustrate the function of sort_by
, for example the data in written files are indeed sorted. What I feel from the current test cases is that we are mainly doing the syntax support testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will add test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Working on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hantangwangd Added test cases.
07ed82a
to
30ac341
Compare
Hi @evanvdia, just wanted to confirm that are you still working on this? |
Hi @hantangwangd, yes. working on this. |
7b3bd0f
to
3b6903b
Compare
@evanvdia is this ready for review? There is a merge conflict. |
3b6903b
to
ef6df97
Compare
@tdcmeehan Merge conflicts are resolved. But iceberg unit test cases are failing after rebasing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, please give us a notice when this PR is ready for review, thanks.
continue; | ||
} | ||
|
||
Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), transformedPage, position); | ||
WriteContext writer = createWriter(partitionData); | ||
Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), page, position); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), page, position); | |
Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), transformedPage, position); |
After check the code, I think the problem is introduced by this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @hantangwangd . that was the issue. done the changes. Will let you know once PR is ready for review.
ef6df97
to
32840e1
Compare
32840e1
to
5e60f75
Compare
6b7e988
to
34c3810
Compare
34c3810
to
763dc56
Compare
@evanvdia I see you have pushed a change. Please, when it's ready for review, ping the reviewers when the comments have been addressed. |
763dc56
to
be6f4c7
Compare
be6f4c7
to
4f3e8e0
Compare
@evanvdia can you investigate the unit test failures? There are quite a few in the Iceberg module likely caused by these new changes:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to reach out if you need any assistance to support sort orders on IcebergNativeMetadata
.
transaction.replaceSortOrder().apply().fields().stream().map(SortField::fromIceberg) | ||
.collect(toImmutableList())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't open a transaction action (here is ReplaceSortOrder
) without committing it, this will prevent any subsequent actions in the same transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hantangwangd Added support to sort orders on IcebergNativeMetadata. and moved the test cases to IcebergDistributedTestBase. But some of the test cases are failing.
4f3e8e0
to
5af2017
Compare
@ZacBlanco This unit test cases #21977 (comment) failures are fixed. |
ReplaceSortOrder replaceSortOrder = transaction.replaceSortOrder(); | ||
List<SortField> sortFields = replaceSortOrder.apply().fields().stream().map(SortField::fromIceberg) | ||
.collect(toImmutableList()); | ||
try { | ||
replaceSortOrder.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReplaceSortOrder replaceSortOrder = transaction.replaceSortOrder(); | |
List<SortField> sortFields = replaceSortOrder.apply().fields().stream().map(SortField::fromIceberg) | |
.collect(toImmutableList()); | |
try { | |
replaceSortOrder.commit(); | |
SortOrder sortOrder = parseSortFields(schema, getSortOrder(tableMetadata.getProperties())); | |
ReplaceSortOrder replaceSortOrder = transaction.replaceSortOrder(); | |
List<org.apache.iceberg.SortField> icebergSortFields = sortOrder.fields(); | |
List<SortField> sortFields = icebergSortFields.stream().map(SortField::fromIceberg) | |
.collect(toImmutableList()); | |
try { | |
for (org.apache.iceberg.SortField sortField : icebergSortFields) { | |
if (sortField.direction() == SortDirection.ASC) { | |
replaceSortOrder.asc(schema.findColumnName(sortField.sourceId()), sortField.nullOrder()); | |
} | |
else { | |
replaceSortOrder.desc(schema.findColumnName(sortField.sourceId()), sortField.nullOrder()); | |
} | |
} |
I think a feasible way could go like this, although not carefully check whether it's the best. The core point is, we should get sortOrder
from tableMetadata
and then set it into the create table transaction through replaceSortOrder
. You can have a try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hantangwangd Done the changes.
286cc5e
to
9c96c62
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to do a better job testing and documenting the limitations (e.g. ORC only for sorting from what I can tell?).
Also, what happens if you use parquet and try to create and insert to a table table with a sort order? What about if sorted writing is disabled, but the table specifies a sort order. Is an error thrown? We should have a test case which clearly shows the expected behavior.
The Iceberg connector supports the creation of sorted tables. | ||
Data in the Iceberg table is sorted as each file is written. | ||
|
||
Sorted Iceberg tables can provide a huge increase in performance in query times. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorted Iceberg tables can provide a huge increase in performance in query times. | |
Sorted Iceberg tables can decrease query times in many cases; however, it will depend on the query shape and cluster configuration. |
Let's not try to make bold claims. I would recommend leaving out the adjective "huge" and try to more objectively describe the feature. Especially when making performance claims, there are many factors which can increase (or decrease) performance. Users will get upset if sorted tables don't always improve performance :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ZacBlanco It also supports sorting data in Parquet files.
Data sorted in PARQUET file format
Data sorted in ORC file format
If we set sorted_writing_enabled
as false and table specifies a sort_by property, it will not throw any error but data in the files are unsorted.
Added test cases for scenarios where sorted writing is disabled, but the table specifies a sort order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we set sorted_writing_enabled as false and table specifies a sort_by property, it will not throw any error but data in the files are unsorted.
Is there a reason that this should even be a configuration property and a table property? I don't think it really makes sense to have both. In the case where the configuration is disabled but it is set on the table property, we will potentially get incorrect results for later queries who expect the data in each file to be sorted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, we removed the configuration property and sorting can now only be enabled or disabled through the table property.
presto-hive/src/main/java/com/facebook/presto/hive/SortingFileWriterConfig.java
Outdated
Show resolved
Hide resolved
return writerSortBufferSize; | ||
} | ||
|
||
@Config("writer-sort-buffer-size") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a description here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure @ZacBlanco.
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java
Outdated
Show resolved
Hide resolved
int maxOpenWriters, | ||
List<SortField> sortOrder, | ||
DataSize sortingFileWriterBufferSize, | ||
int sortingFileWriterMaxOpenFiles, | ||
TypeManager typeManager, | ||
PageSorter pageSorter, | ||
OrcFileWriterFactory orcFileWriterFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are quite a few fields related to sorting. I'm wondering if it might make sense to put them in their own class together.
Also, digging deeper, I realized that this seems to only be supported on ORC files due to the sorting file writer API. Have we considered enhancing this to support parquet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ZacBlanco It also supports sorting data in Parquet files.
Do you really want me to do that class changes in this PR or can i open separate git issue to track this changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should go into this PR.
Also, now that I'm looking more closely this, I'm wondering if it would be better to encapsulate the sorting parameters inside of IcebergFileWriterFactory
instead of in the page sink. Since all of this logic has to do with generating sorting writers for both parquet and ORC types, I think it should probably not be inside the page sink since the sorting class is actually an implementation of the FileWriter interface.
I would consider updating the IcebergFileWriterFactory
's createFileWriter
method could accept some type of class (say, SortParameters
) which will change the type of writer. This implementation should also be clearer how the sorting is done for ORC vs Parquet since the logic for handling different file formats is also done in the file writer factory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hantangwangd createWriter()
method is invoked when public CompletableFuture<?> appendPage(Page page)
is called externally. I am not seeing any option to capture the sort params at this point. So only option is capture the sort params at the time of Icebergpagesink creation and use it in createWriter()
.
As you suggested i have grouped all the sort related config params to new class SortParameters.
// We only support lowercase quoted identifiers for now. | ||
// See https://github.com/trinodb/trino/issues/12226#issuecomment-1128839259 | ||
// TODO: Enhance quoted identifiers support in Iceberg partitioning to support mixed case identifiers | ||
// See https://github.com/trinodb/trino/issues/12668 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help more clearly describe the limitation of this comment? Does this mean that identifiers in the CREATE TABLE specifying the sort order have to be quoted and lowercase?
We should clearly document this limitation to users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes @ZacBlanco, It only supports lowercase quoted identifiers for now; otherwise, it throws an error, as shown in the following screenshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should document this limitation and provide a test that asserts the failure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep the handling logic of the sort order column consistent with that of the table column? For example, when we create a table like this:
create table test("ID" int);
We will succeed with the column name lowercased to id
. Is there a reason that the sort by column could not be handled like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hantangwangd handled logic of the sort order column consistent with that of the table column.
9c96c62
to
1eb811d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix, the change overall looks good to me. Some little problems and nits.
presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
// We only support lowercase quoted identifiers for now. | ||
// See https://github.com/trinodb/trino/issues/12226#issuecomment-1128839259 | ||
// TODO: Enhance quoted identifiers support in Iceberg partitioning to support mixed case identifiers | ||
// See https://github.com/trinodb/trino/issues/12668 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep the handling logic of the sort order column consistent with that of the table column? For example, when we create a table like this:
create table test("ID" int);
We will succeed with the column name lowercased to id
. Is there a reason that the sort by column could not be handled like this?
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java
Outdated
Show resolved
Hide resolved
028f524
to
18b5d14
Compare
@ZacBlanco @hantangwangd The changes have been made. Could you kindly review the changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix, a few little problems, otherwise looks good to me.
@Min(2) | ||
@Max(1000) | ||
public int getMaxOpenSortFiles() | ||
{ | ||
return maxOpenSortFiles; | ||
} | ||
|
||
@Config("hive.max-open-sort-files") | ||
@ConfigDescription("Maximum number of writer temporary files to read in one pass") | ||
public HiveClientConfig setMaxOpenSortFiles(int maxOpenSortFiles) | ||
{ | ||
this.maxOpenSortFiles = maxOpenSortFiles; | ||
return this; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to move this code from below to here? Besides, since SortingFileWriterConfig
has been created, would it be better to use its configuration properties uniformly? What do you think @ZacBlanco?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. This config should either be left in place or moved to SortingFileWriterConfig. I think it makes sense to move it to SortingFileWriterConfig.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ZacBlanco @hantangwangd So will move this config to SortingFileWriterConfig class and can use this config for both Iceberg and Hive.
SortOrder sortOrder = icebergTable.sortOrder(); | ||
// TODO: Support sort column transforms (https://github.com/prestodb/presto/issues/24250) | ||
if (sortOrder.isSorted() && hasIdentitySortColumnPresent(sortOrder)) { | ||
List<String> sortColumnNames = toSortFields(sortOrder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that when the sortOrder
contains non-identity sorting columns, this code still throw an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes @hantangwangd. I believe it will not throw an exception here, as it is already handled during table creation. An exception will be thrown if the sortOrder contains non-identity sorting columns. Therefore, I will remove the changes from here and keep it the same as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @ZacBlanco mentioned, there can be tables created outside of Presto that we need to account for, these tables may have non-identity sorting columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hantangwangd What I understood from the above comment is that there is no need to perform identity column validation and exception generation in the insert flow. I am assuming that Iceberg tables can be created from an external engine that does not perform this validation. I will remove the above check from the insert flow. Please correct me if I am wrong.
List<org.apache.iceberg.SortField> icebergSortFields = sortOrder.fields(); | ||
List<SortField> sortFields = getSupportedSortFields(icebergTable.schema(), sortOrder); | ||
for (org.apache.iceberg.SortField sortField : icebergSortFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we traverse the sort fields in sortFields
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hantangwangd I didn't fully understand the request. Could you please elaborate?
As per current implementation, SortFileds collection only contain identity columns , so no need of extra traversal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is that we should traverse sortFields
rather than icebergSortFields
here.
public void testParseLowerCase() | ||
{ | ||
// lowercase | ||
assertParse("order_key asc nulls last", sortOrder(builder -> builder.asc("order_key", NullOrder.NULLS_LAST))); | ||
assertParse("order_key desc nulls first", sortOrder(builder -> builder.desc("order_key", NullOrder.NULLS_FIRST))); | ||
assertParse("\"order_key\" asc nulls last", sortOrder(builder -> builder.asc("order_key", NullOrder.NULLS_LAST))); | ||
assertParse("\"order_key\" desc nulls first", sortOrder(builder -> builder.desc("order_key", NullOrder.NULLS_FIRST))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add back the test cases testParseUpperCase
and testParseMixedCase
and modify their assertion logic, rather than delete them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok @hantangwangd will add test case back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few more things
private final DataSize sortingFileWriterBufferSize; | ||
private final Integer sortingFileWriterMaxOpenFiles; | ||
private final Path tempDirectory; | ||
private final TypeManager typeManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This parameter is not used anywhere. Can you please remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ZacBlanco Removed
int maxOpenWriters, | ||
List<SortField> sortOrder, | ||
DataSize sortingFileWriterBufferSize, | ||
int sortingFileWriterMaxOpenFiles, | ||
TypeManager typeManager, | ||
PageSorter pageSorter, | ||
OrcFileWriterFactory orcFileWriterFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should go into this PR.
Also, now that I'm looking more closely this, I'm wondering if it would be better to encapsulate the sorting parameters inside of IcebergFileWriterFactory
instead of in the page sink. Since all of this logic has to do with generating sorting writers for both parquet and ORC types, I think it should probably not be inside the page sink since the sorting class is actually an implementation of the FileWriter interface.
I would consider updating the IcebergFileWriterFactory
's createFileWriter
method could accept some type of class (say, SortParameters
) which will change the type of writer. This implementation should also be clearer how the sorting is done for ORC vs Parquet since the logic for handling different file formats is also done in the file writer factory.
@Min(2) | ||
@Max(1000) | ||
public int getMaxOpenSortFiles() | ||
{ | ||
return maxOpenSortFiles; | ||
} | ||
|
||
@Config("hive.max-open-sort-files") | ||
@ConfigDescription("Maximum number of writer temporary files to read in one pass") | ||
public HiveClientConfig setMaxOpenSortFiles(int maxOpenSortFiles) | ||
{ | ||
this.maxOpenSortFiles = maxOpenSortFiles; | ||
return this; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. This config should either be left in place or moved to SortingFileWriterConfig. I think it makes sense to move it to SortingFileWriterConfig.
@@ -130,7 +130,7 @@ public void configure(Binder binder) | |||
binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON); | |||
newSetBinder(binder, DynamicConfigurationProvider.class); | |||
configBinder(binder).bindConfig(HiveClientConfig.class); | |||
|
|||
configBinder(binder).bindConfig(SortingFileWriterConfig.class, "hive"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is the correct method to bind the config. I think we just want bindConfig(SortingFileWriterConfig.class)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed prefix.
@@ -489,7 +494,32 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(ConnectorSession se | |||
icebergTable.location(), | |||
getFileFormat(icebergTable), | |||
getCompressionCodec(session), | |||
icebergTable.properties()); | |||
icebergTable.properties(), | |||
getSupportedSortFields(icebergTable.schema(), icebergTable.sortOrder())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be throwing an exception on beginInsert
if there are any unsupported sort fields/types, rather than just filtering them out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ZacBlanco It is handled in while creating the table and will throw the exception if there are any unsupported sort fields/types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There can be tables created outside of Presto that we need to account for so we should be checking in beginInsert
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following content is described in iceberg's spec https://iceberg.apache.org/spec/#sorting, which makes me a little confuse:
... Writers should use this default sort order to sort the data on write, but are not required to if the default order is prohibitively expensive, as it would be for streaming writes.
It seems to be saying that there is no need to strictly follow the table's default sort order when actually data writing. If that's true, seems we shouldn't throw exception when there exists unsupported sort fields.
However, when check the code in Iceberg, I found that the data/delete files only contain sort order id as a whole sorting feature , which means data/delete files should strictly follow the whole sort order or not sorted at all instead of just a part of it. Because if not so, we wouldn't be able to distinguish which columns were actually sorted based solely on the data/delete files metadata.
Will make some deeper research to figure out this, and if there is any misunderstanding, please let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be saying that there is no need to strictly follow the table's default sort order when actually data writing. If that's true, seems we shouldn't throw exception when there exists unsupported sort fields.
I agree with this. I didn't read the spec closely enough. It seems like we shouldn't throw an exception. This unfortunately diminishes the value of the sort feature for the optimizer...
@@ -127,6 +131,7 @@ public void setup(Binder binder) | |||
binder.bind(HdfsConfiguration.class).annotatedWith(ForMetastoreHdfsEnvironment.class).to(HiveCachingHdfsConfiguration.class).in(Scopes.SINGLETON); | |||
binder.bind(HdfsConfiguration.class).annotatedWith(ForCachingFileSystem.class).to(HiveHdfsConfiguration.class).in(Scopes.SINGLETON); | |||
|
|||
configBinder(binder).bindConfig(SortingFileWriterConfig.class, "iceberg"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we need to pass the prefix to bindConfig
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ZacBlanco The prefix allows logical grouping of configuration properties under a specific namespace.
Since sort related config for both Iceberg and Hive moved to SortingFileWriterConfig
class (#21977 (comment)) , we will remove the prefix from bindconfig
.
18b5d14
to
44f5ab7
Compare
44f5ab7
to
c4d0805
Compare
Description
Add Support for Iceberg connector to create sorted files. The sort order can be configured with the
sorted_by
table property. When creating the table, can specify an array of one or more columns to use for sorting.Cherry-pick of trinodb/trino#14891
Motivation and Context
Issue : #21978
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.