[issue-7357] Add support for Avro logical types in both realtime and batch ingestion#7358
Conversation
133d80b to
bbb220b
Compare
|
I'm looking at modifying the data used by |
Codecov Report
@@ Coverage Diff @@
## master #7358 +/- ##
=============================================
- Coverage 70.35% 36.79% -33.56%
+ Complexity 4406 84 -4322
=============================================
Files 1690 1693 +3
Lines 88187 88247 +60
Branches 13190 13198 +8
=============================================
- Hits 62042 32470 -29572
- Misses 21856 53163 +31307
+ Partials 4289 2614 -1675
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
b1122c7 to
183b5b1
Compare
|
I thought changing the commit hash would re-trigger the build |
There was a problem hiding this comment.
Note that BigDecimal is converted to Double as Pinot does not have support for big number columns at the moment and introducing a new type is out of scope in this ticket
There was a problem hiding this comment.
once bigdecimal type is fully supported a record reader option can control this behaviour
...rc/main/java/org/apache/pinot/plugin/inputformat/avro/AvroLogicalTypeConversionRegistry.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
can we put these in ArvoSchemUtils? is there a reason we need the Registry?
There was a problem hiding this comment.
I'll rebase, review the code since it's been a while and get back to you with the reason why I put this here
There was a problem hiding this comment.
there is not reason to not to move it ArvoSchemUtils other than having some separation of concern,, I'll move them over if you think that's a better place for this
2c5f819 to
c159f0f
Compare
There was a problem hiding this comment.
this setting should come from table config in order to avoid potentially breaking existing ingestion that come with logical types, however RecordExtractorConfig is always null. Is anyone working on this logic at the moment? I could give that a go as I'll need to add the setting for this feature
There was a problem hiding this comment.
yes I think we can create a AvroRecordExtractorConfig and provide some defaults to this if necessary. see CsvRecordExtractorConfig as an example. i think it can be part of this PR
There was a problem hiding this comment.
For ingestion jobs these properties can be configured via recordsReaderSpec. However I can't see how these properties can be configured for a realtime table - perhaps via ingestionSpec in the table definition? I was looking through the code but I couldn't figure it out. I believe this is not possible at the moment
There was a problem hiding this comment.
I think I've found where the properties can be read from. Stream config has decoder properties (it seems like these are available in table config of realtime tables) which are propagated to the Avro decoder classes, it looks like I can take advantage of that to populate the logical type flag. I'll add a commit to the PR and let me know if you see anything missing @walterddr
There was a problem hiding this comment.
hmm. not sure which one are you referring to. there's a RecordReaderConfig base class which provides configuration properties to RecordReaders (see RecordReaderFactory class). and it is apparently passed in via a readerConfigFile path (see. CreateSegmentCommand entrypoint)
Jackie-Jiang
left a comment
There was a problem hiding this comment.
LGTM otherwise, no major comments
Does the change also apply to batch ingestion flow? If so, let's update the PR name and description to reflect that
There was a problem hiding this comment.
| _enableLogicalTypes = Boolean.valueOf(props.getOrDefault("enableLogicalTypes", Boolean.FALSE.toString())); | |
| _enableLogicalTypes = Boolean.parseBoolean(props.get("enableLogicalTypes")); |
There was a problem hiding this comment.
(minor) Can be simplified
| if (value instanceof BigDecimal) { | |
| return Double.valueOf(((BigDecimal) value).doubleValue()); | |
| } | |
| if (value instanceof LocalDate) { | |
| return Long.valueOf(((LocalDate) value).toEpochDay()); | |
| } | |
| if (value instanceof Instant) { | |
| return Long.valueOf(((Instant) value).toEpochMilli()); | |
| } | |
| if (value instanceof BigDecimal) { | |
| return ((BigDecimal) value).doubleValue(); | |
| } | |
| if (value instanceof LocalDate) { | |
| return ((LocalDate) value).toEpochDay(); | |
| } | |
| if (value instanceof Instant) { | |
| return ((Instant) value).toEpochMilli(); | |
| } |
There was a problem hiding this comment.
(format) Is this auto-formatted? We usually keep the javadoc multi lines
There was a problem hiding this comment.
not sure, perhaps an old pinot code style I had in my IDE
There was a problem hiding this comment.
(format) This change does not follow the Pinot Style
There was a problem hiding this comment.
(format) We usually keep the javadoc multi lines
There was a problem hiding this comment.
(format) We usually keep the javadoc multi lines
There was a problem hiding this comment.
We don't need to change this (also want to test the behavior of passing null)
There was a problem hiding this comment.
We don't need to change this (also want to test the behavior of passing null)
There was a problem hiding this comment.
| AvroRecordExtractorConfig config = PluginManager.get().createInstance(AvroRecordExtractorConfig.class.getName()); | |
| AvroRecordExtractorConfig config = new AvroRecordExtractorConfig(); |
There was a problem hiding this comment.
(convention) We don't usually annotate non-null, but only annotate nullable. Also make it multiple lines
| default void init(@Nonnull Map<String, String> props) { } | |
| default void init(Map<String, String> props) { | |
| } |
|
@Jackie-Jiang thanks for reviewing, the PR covers both batch and realtime ingestion - I've renamed the PR. There is one more thing I'd like to do on this PR before I merge it. I'm looking at adding a full integration test (if possible) for both batch and ingestion, if you have any pointer please let me know. I'll also tidy up the commits |
e56ab9c to
fd5ed95
Compare
|
@ddcprg You may take a look at |
|
Thanks @Jackie-Jiang in that case I'll add more unit tests around record extractor. I'll squash all the commits into one after that |
b5ebdf9 to
7dd63fd
Compare
There was a problem hiding this comment.
I'm thinking it may be better to initially return a String representation of the timestamp instead, Pinot can coerce the value into a timestamp. At this point I can't make a guess of whether the user has specified millis or micros. Behaviour control could be configured via record reader settings in the future. @Jackie-Jiang @walterddr let me know what you think
There was a problem hiding this comment.
Seems the Instant is always converted from the timestamp type, so we might want to return the Timestamp object here.
There was a problem hiding this comment.
apologies for the delay, this now returns java.sql.Timestamp instead
There was a problem hiding this comment.
(minor) Revert, same for other places (is this auto-formatted?)
There was a problem hiding this comment.
yes, not sure why my IDE is doing this
There was a problem hiding this comment.
(minor) Let's put this static block next to the map construction for readability
There was a problem hiding this comment.
Seems the Instant is always converted from the timestamp type, so we might want to return the Timestamp object here.
b69a11b to
7c134c3
Compare
There was a problem hiding this comment.
We recently added big-decimal support in #8503, so here we should just return BigDecimal as is (no need to have this special handling. The base implementation can handle it properly)
There was a problem hiding this comment.
this is great! I've now changed this line and squashed all the commits into a single one
Description
Fixes #7357
I'm looking to get some feedback before making this a full PR
Upgrade Notes
Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
backward-incompat, and complete the section below on Release Notes)Does this PR fix a zero-downtime upgrade introduced earlier?
backward-incompat, and complete the section below on Release Notes)Does this PR otherwise need attention when creating release notes? Things to consider:
release-notesand complete the section on Release Notes)The code change should be backwards compatible.
Release Notes
Documentation