Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete iceberg support for time type #24091

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

auden-woolfson
Copy link
Contributor

@auden-woolfson auden-woolfson commented Nov 19, 2024

Description

Bringing this functionality from IBM's repository. Added support for partitioning over the time type in orc files through the iceberg connector

Lakehouse PR for those with access

== RELEASE NOTES ==

General Changes
* Add support for time type partitioning in the ORC file format for Iceberg. :pr:`24091`
* Add testing for partitioning using time type in Iceberg. :pr:`24091`

@auden-woolfson auden-woolfson added the from:IBM PR from IBM label Nov 19, 2024
@ethanyzhang ethanyzhang requested review from a team and ScrapCodes and removed request for a team November 21, 2024 06:03
sdruzkin
sdruzkin previously approved these changes Nov 23, 2024
Copy link
Collaborator

@sdruzkin sdruzkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ORC changes look good.

@@ -186,6 +196,8 @@ private Block readNullBlock(boolean[] isNull, int nonNullCount)
throw new VerifyError("Unsupported type " + type);
}

protected void maybeTransformValues(long[] values, int nextBatchSize) {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I don't think we do here... just took this directly from the IBM code. Will push version without it

Copy link
Contributor

@ZacBlanco ZacBlanco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor change, otherwise looks good

Copy link

@ScrapCodes ScrapCodes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this, would be great if we can improve test coverage.

assertQuery(format("SELECT COUNT(*) FROM %s", tableName), "SELECT 2");
assertQuery(format("SELECT x FROM %s WHERE y = 12345", tableName), "SELECT CAST('10:12:34' AS TIME)");
assertQuery(format("SELECT x FROM %s WHERE y = 67890", tableName), "SELECT CAST('9:00:00' AS TIME)");
dropTable(session, tableName);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include groupby and orderby as well

Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have to handle the write logic for TimeType on ORC as well. The raw data in ORC file should follow Iceberg's specification (in micros precision), it's a little different from the memory format in presto (in millis precision).

@@ -310,6 +310,9 @@ private static TypeInfo toHiveTypeInfo(Type type)
if (DOUBLE.equals(type)) {
return HIVE_DOUBLE.getTypeInfo();
}
if (TimeType.TIME.equals(type)) {
return HIVE_INT.getTypeInfo();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be HIVE_LONG?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would it be long? This is how it was merged internally...

Copy link
Contributor

@ZacBlanco ZacBlanco Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time is defined to be in microseconds in Iceberg, so long can make sense. Even if time can only represent between 00:00 and 23:59.99999. There are 86,400 seconds in a day; so 86.4B microseconds which exceeds integer size. So it should probably be a long.

Comment on lines 1053 to 1080
assertQuery(format("SELECT x FROM %s WHERE y = 12345", tableName), "SELECT CAST('10:12:34' AS TIME)");
assertQuery(format("SELECT x FROM %s WHERE y = 67890", tableName), "SELECT CAST('9:00:00' AS TIME)");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems when executing filter by a column with TimeType on ORC, we will meet some problems, no matter the TimeType column is a partitioned column or non-partitioned column.

For reference, see test cases in IcebergDistributedTestBase.testPartitionedByTimeType().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean... that test is also passing. Is there something I am missing from the test cases that I should add?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test IcebergDistributedTestBase.testPartitionedByTimeType() is currently running on PARQUET only, it includes the following scenarios that I think should also be added here to test on ORC:

assertQuery(format("SELECT y FROM %s WHERE x = time '10:12:34'", tableName), "values(12345)");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please expand on how you identified this issue? Looked through some of the orc code and I'm not sure what I'm missing. Thank you

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the Iceberg specification, the raw data for TimeType maintained in data files should be with microsecond precision. Meanwhile, the data for TimeType is maintained in presto with millisecond precision. So in order to support TimeType on PARQUET in PR #21337, we have done the following things:

  1. Convert the data of type TimeType on writing, change the precision from milliseconds to microseconds. See: TimeValueWriter.

  2. Convert the data of type TimeType on reading, change the precision from microseconds to milliseconds. See: LongTimeMicrosColumnReader.

  3. Convert the TimeType values exist in filter predication. For Iceberg, filtering conditions need to be processed at two levels:

  • At the iceberg file plan level, see: IcebergSplitManager.getSplits(...) and ExpressionConverter.getIcebergLiteralValue().
  • At the iceberg page source level, the predicate passed to the newly constructed ConnectorPageSource should be handled (for PARQUET, the filter has been completely ignored since columnIndexFilterEnabled = false), see: IcebergPageSourceProvider.createParquetPageSource(...).
  1. Handle the partitioned column with type TimeType, See IcebergPageSink.getIcebergValue(), IcebergPageSource.nativeValueToBlock(), and PartitionTable.convert().

So based on this context, in order to support TimeType on ORC, we need to do the following file format specific things:

  1. Convert the data of type TimeType on writing, change the precision from milliseconds to microseconds in LongColumnWriter and LongDictionaryColumnWriter etc.

  2. Convert the data of type TimeType on reading, change the precision from microseconds to milliseconds in LongDirectBatchStreamReader and LongDictionaryBatchStreamReader etc.

  3. Convert the TimeType value in filter predication at the iceberg page source level, convert or ignore the predicates on columns of type TimeType in IcebergPageSourcePrivider.createBatchOrcPageSource().

In my local experiment, after doing all these things, the tests will all passing. You can have a try, and please let me know if there are any misunderstandings above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still a little unclear on how to go about step 3... do you have a branch that you made your local changes on?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright thanks. Was the error you got before making the fix the same as this?

java.lang.AssertionError: For query: 
 SELECT y FROM test_selected_by_time WHERE x = time '10:12:34'
 actual column types:
 [bigint]
expected column types:
[bigint]

not equal
Actual rows (1 of 1 extra rows shown, 2 rows in total):
    [54321]

I made the changes you requested and I am still getting this error on testSelectOrPartitionedByTime

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have inserted another row, so the assertion statement should be:

assertQuery(format("SELECT y FROM %s WHERE x = time '10:12:34'", tableName), "values 12345, 54321");

@auden-woolfson auden-woolfson force-pushed the oss_time_testing branch 4 times, most recently from aa9adb1 to 81a8a84 Compare December 13, 2024 22:21
@steveburnett
Copy link
Contributor

Thanks for the release note entry! A couple of nits.

== RELEASE NOTES ==

General Changes
* Add support for time type partitioning in the ORC file format for Iceberg. :pr:`24091`
* Add testing for partitioning using time type in Iceberg. :pr:`24091`

ZacBlanco
ZacBlanco previously approved these changes Jan 7, 2025
Copy link
Contributor

@ZacBlanco ZacBlanco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit, otherwise lgtm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
from:IBM PR from IBM
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants