Skip to content

[issue-7357] Add support for Avro logical types in both realtime and batch ingestion#7358

Merged
Jackie-Jiang merged 1 commit intoapache:masterfrom
ddcprg:avro_logical_types
May 9, 2022
Merged

[issue-7357] Add support for Avro logical types in both realtime and batch ingestion#7358
Jackie-Jiang merged 1 commit intoapache:masterfrom
ddcprg:avro_logical_types

Conversation

@ddcprg
Copy link
Contributor

@ddcprg ddcprg commented Aug 24, 2021

Description

Fixes #7357

I'm looking to get some feedback before making this a full PR

Upgrade Notes

Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)

  • Yes (Please label as backward-incompat, and complete the section below on Release Notes)

Does this PR fix a zero-downtime upgrade introduced earlier?

  • Yes (Please label this as backward-incompat, and complete the section below on Release Notes)

Does this PR otherwise need attention when creating release notes? Things to consider:

  • New configuration options
  • Deprecation of configurations
  • Signature changes to public methods/interfaces
  • New plugins added or old plugins removed
  • Yes (Please label this PR as release-notes and complete the section on Release Notes)

The code change should be backwards compatible.

Release Notes

Documentation

@ddcprg ddcprg force-pushed the avro_logical_types branch from 133d80b to bbb220b Compare September 7, 2021 19:25
@ddcprg
Copy link
Contributor Author

ddcprg commented Sep 7, 2021

I'm looking at modifying the data used by RealtimeClusterIntegrationTest to add fields with logical types. Any guidelines on how this data has been generated?

@ddcprg ddcprg changed the title WIP [issue-7357] Add support for Avro logical types in realtime ingestion [issue-7357] Add support for Avro logical types in realtime ingestion Sep 7, 2021
@ddcprg ddcprg marked this pull request as ready for review September 7, 2021 19:31
@codecov-commenter
Copy link

codecov-commenter commented Sep 28, 2021

Codecov Report

Merging #7358 (c5eed6e) into master (ec42a3a) will decrease coverage by 33.55%.
The diff coverage is 0.00%.

@@              Coverage Diff              @@
##             master    #7358       +/-   ##
=============================================
- Coverage     70.35%   36.79%   -33.56%     
+ Complexity     4406       84     -4322     
=============================================
  Files          1690     1693        +3     
  Lines         88187    88247       +60     
  Branches      13190    13198        +8     
=============================================
- Hits          62042    32470    -29572     
- Misses        21856    53163    +31307     
+ Partials       4289     2614     -1675     
Flag Coverage Δ
integration1 27.50% <0.00%> (+0.05%) ⬆️
integration2 25.50% <0.00%> (-0.17%) ⬇️
unittests1 ?
unittests2 14.29% <0.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...t/plugin/inputformat/avro/AvroRecordExtractor.java 0.00% <0.00%> (-89.48%) ⬇️
...in/inputformat/avro/AvroRecordExtractorConfig.java 0.00% <0.00%> (ø)
...inot/plugin/inputformat/avro/AvroRecordReader.java 0.00% <0.00%> (-88.89%) ⬇️
...lugin/inputformat/avro/AvroRecordReaderConfig.java 0.00% <0.00%> (ø)
.../pinot/plugin/inputformat/avro/AvroSchemaUtil.java 0.00% <0.00%> (-18.19%) ⬇️
...ugin/inputformat/avro/KafkaAvroMessageDecoder.java 0.00% <0.00%> (ø)
...gin/inputformat/avro/SimpleAvroMessageDecoder.java 0.00% <0.00%> (ø)
.../pinot/spi/data/readers/RecordExtractorConfig.java 0.00% <0.00%> (ø)
...he/pinot/spi/data/readers/RecordReaderFactory.java 0.00% <0.00%> (-57.50%) ⬇️
...ava/org/apache/pinot/spi/plugin/PluginManager.java 0.00% <0.00%> (-57.97%) ⬇️
... and 947 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ec42a3a...c5eed6e. Read the comment docs.

@ddcprg
Copy link
Contributor Author

ddcprg commented Sep 29, 2021

I thought changing the commit hash would re-trigger the build

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that BigDecimal is converted to Double as Pinot does not have support for big number columns at the moment and introducing a new type is out of scope in this ticket

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once bigdecimal type is fully supported a record reader option can control this behaviour

Copy link
Member

@richardstartin richardstartin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the approach looks good. thanks @ddcprg for the contribution. I left one minor comment otherwise good to go.

Comment on lines 31 to 38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put these in ArvoSchemUtils? is there a reason we need the Registry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rebase, review the code since it's been a while and get back to you with the reason why I put this here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is not reason to not to move it ArvoSchemUtils other than having some separation of concern,, I'll move them over if you think that's a better place for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks good now.

@ddcprg ddcprg force-pushed the avro_logical_types branch 2 times, most recently from 2c5f819 to c159f0f Compare April 6, 2022 09:27
Copy link
Contributor Author

@ddcprg ddcprg Apr 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this setting should come from table config in order to avoid potentially breaking existing ingestion that come with logical types, however RecordExtractorConfig is always null. Is anyone working on this logic at the moment? I could give that a go as I'll need to add the setting for this feature

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I think we can create a AvroRecordExtractorConfig and provide some defaults to this if necessary. see CsvRecordExtractorConfig as an example. i think it can be part of this PR

Copy link
Contributor Author

@ddcprg ddcprg Apr 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ingestion jobs these properties can be configured via recordsReaderSpec. However I can't see how these properties can be configured for a realtime table - perhaps via ingestionSpec in the table definition? I was looking through the code but I couldn't figure it out. I believe this is not possible at the moment

Copy link
Contributor Author

@ddcprg ddcprg Apr 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've found where the properties can be read from. Stream config has decoder properties (it seems like these are available in table config of realtime tables) which are propagated to the Avro decoder classes, it looks like I can take advantage of that to populate the logical type flag. I'll add a commit to the PR and let me know if you see anything missing @walterddr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. not sure which one are you referring to. there's a RecordReaderConfig base class which provides configuration properties to RecordReaders (see RecordReaderFactory class). and it is apparently passed in via a readerConfigFile path (see. CreateSegmentCommand entrypoint)

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise, no major comments

Does the change also apply to batch ingestion flow? If so, let's update the PR name and description to reflect that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_enableLogicalTypes = Boolean.valueOf(props.getOrDefault("enableLogicalTypes", Boolean.FALSE.toString()));
_enableLogicalTypes = Boolean.parseBoolean(props.get("enableLogicalTypes"));

Comment on lines 130 to 138
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Can be simplified

Suggested change
if (value instanceof BigDecimal) {
return Double.valueOf(((BigDecimal) value).doubleValue());
}
if (value instanceof LocalDate) {
return Long.valueOf(((LocalDate) value).toEpochDay());
}
if (value instanceof Instant) {
return Long.valueOf(((Instant) value).toEpochMilli());
}
if (value instanceof BigDecimal) {
return ((BigDecimal) value).doubleValue();
}
if (value instanceof LocalDate) {
return ((LocalDate) value).toEpochDay();
}
if (value instanceof Instant) {
return ((Instant) value).toEpochMilli();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(format) Is this auto-formatted? We usually keep the javadoc multi lines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, perhaps an old pinot code style I had in my IDE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(format) This change does not follow the Pinot Style

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(format) We usually keep the javadoc multi lines

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(format) We usually keep the javadoc multi lines

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to change this (also want to test the behavior of passing null)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to change this (also want to test the behavior of passing null)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
AvroRecordExtractorConfig config = PluginManager.get().createInstance(AvroRecordExtractorConfig.class.getName());
AvroRecordExtractorConfig config = new AvroRecordExtractorConfig();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(convention) We don't usually annotate non-null, but only annotate nullable. Also make it multiple lines

Suggested change
default void init(@Nonnull Map<String, String> props) { }
default void init(Map<String, String> props) {
}

@ddcprg ddcprg changed the title [issue-7357] Add support for Avro logical types in realtime ingestion [issue-7357] Add support for Avro logical types in both realtime and batch ingestion Apr 13, 2022
@ddcprg
Copy link
Contributor Author

ddcprg commented Apr 13, 2022

@Jackie-Jiang thanks for reviewing, the PR covers both batch and realtime ingestion - I've renamed the PR. There is one more thing I'd like to do on this PR before I merge it. I'm looking at adding a full integration test (if possible) for both batch and ingestion, if you have any pointer please let me know. I'll also tidy up the commits

@ddcprg ddcprg force-pushed the avro_logical_types branch from e56ab9c to fd5ed95 Compare April 13, 2022 07:31
@Jackie-Jiang
Copy link
Contributor

@ddcprg You may take a look at HybridClusterIntegrationTest on how to feed the data into both batch flow and streaming flow. In order to add an integration test, we need to add a new avro data set with the logical types. IMO it might be too much for this support (adding an integration test can add 1-2 minutes to the CI). A thorough unit test on avro record extractor should be good enough.

@ddcprg
Copy link
Contributor Author

ddcprg commented Apr 14, 2022

Thanks @Jackie-Jiang in that case I'll add more unit tests around record extractor. I'll squash all the commits into one after that

@ddcprg ddcprg force-pushed the avro_logical_types branch 2 times, most recently from b5ebdf9 to 7dd63fd Compare April 14, 2022 10:47
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking it may be better to initially return a String representation of the timestamp instead, Pinot can coerce the value into a timestamp. At this point I can't make a guess of whether the user has specified millis or micros. Behaviour control could be configured via record reader settings in the future. @Jackie-Jiang @walterddr let me know what you think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the Instant is always converted from the timestamp type, so we might want to return the Timestamp object here.

Copy link
Contributor Author

@ddcprg ddcprg May 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apologies for the delay, this now returns java.sql.Timestamp instead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) revert

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Revert, same for other places (is this auto-formatted?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, not sure why my IDE is doing this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Let's put this static block next to the map construction for readability

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the Instant is always converted from the timestamp type, so we might want to return the Timestamp object here.

@ddcprg ddcprg force-pushed the avro_logical_types branch 2 times, most recently from b69a11b to 7c134c3 Compare May 9, 2022 12:08
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We recently added big-decimal support in #8503, so here we should just return BigDecimal as is (no need to have this special handling. The base implementation can handle it properly)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great! I've now changed this line and squashed all the commits into a single one

@ddcprg ddcprg force-pushed the avro_logical_types branch from 7c134c3 to e9b8911 Compare May 9, 2022 19:31
@ddcprg ddcprg force-pushed the avro_logical_types branch from e9b8911 to c5eed6e Compare May 9, 2022 19:35
@Jackie-Jiang Jackie-Jiang merged commit b494345 into apache:master May 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support for Avro logical types in realtime and offline tables

5 participants