-
Notifications
You must be signed in to change notification settings - Fork 48
GH-615: Produce Avro core data types out of Arrow VSR #638
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-615: Produce Avro core data types out of Arrow VSR #638
Conversation
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks reasonable to me overall, thanks.
- We should document exactly how we handle type conversions, especially around nulls
- I'm OK punting on more complicated types for now
- I'm wondering if we shouldn't just make the producer not own the vector at all (i.e. it's still the caller's responsibility to close the vector). I think this makes the most sense, e.g. if you have a file reader, the reader already owns the vector, so using it with the avro producer would mean a double-close (or you would have to explicitly unload the data and load it somewhere else)
- I'm OK punting on dictionaries for the time being too if you want
adapter/avro/src/main/java/org/apache/arrow/adapter/avro/producers/Producer.java
Show resolved
Hide resolved
void skipNull(); | ||
|
||
/** Set the position to read value from vector. */ | ||
void setPosition(int index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we've been trying to be future-proof and use long
for indices now (since eventually once we support the new memory APIs we'll be able to address more than 2GiB of memory at once)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, that makes sense. We should really update the consumers to match. If you like, I could make both int and long available as overloads, and keep a long internally. That way we can update the consumer interface to match, without breaking the API. Would you like me to do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per my comment below, this proved more tricky than I first thought. In light of that, are you happy to stick with int for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let's leave it then. (We really need to fix this more generally...)
@Override | ||
public void close() throws Exception { | ||
vector.close(); | ||
} | ||
|
||
@Override | ||
public boolean resetValueVector(T vector) { | ||
this.vector = vector; | ||
this.currentIndex = 0; | ||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Should resetting close the prior vector? If not, then maybe we also shouldn't be closing the vector here (i.e. it's the upper level code's responsibility to manage the vector lifetime and the producer is just temporarily holding the vector, not owning it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I agree with you that producers should not own / close the vectors. In which case, this becomes a non-issue! Are you happy for me to update on those lines? The producers will still own a reference, but we could get rid of all the close() methods and remove AutoClosable from the base interface. Would you like me to do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let's drop close()
and AutoClosable
!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, close() and AutoClosable are gone :-)
try { | ||
AutoCloseables.close(producers); | ||
} catch (Exception e) { | ||
throw new RuntimeException("Error occurs in close.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally close
is declared to throw Exception
anyways - do we need to rewrap this in RuntimeException?
Otherwise use AutoCloseables.closeNoChecked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per above, if producers are not owning / closing the vectors this issue goes away. Otherwise yes this would make sense. Let me know, which you'd like me to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per above, close and AutoCloable are gone
Thanks so much for all the comments! I've put questions in line, will wait for your answers on how to address those points. In the meantime I will add the schema generator and producers for the narrow primitive types, as well as some top level doc comments describing the translation. I'll make a start on the test coverage as well.... |
|
||
@Override | ||
public void setPosition(int index) { | ||
currentIndex = index; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any restriction to the new index
? Do we expect index
< currentIndex
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added check, index < 0 or index > value count will throw an illegal argument exception.
I've allowed index == value count, because that is the final state after production is complete. Since the producer can get into that state naturally, and it has a valid meaning (production complete), it seemed wrong to disallow it. Thoughts welcome on that point!
Worth noting, there are several ways value count can not be set on the vector, either because client code doesn't set it, or a writer doesn't (e.g. UnionMapWriter does not update the entries data vector count) in which case the cause of the error may not be obvious (at least it wasn't for me)! Probably still the check is the right thing to do, but something to be aware of.
super(vector); | ||
Preconditions.checkArgument( | ||
vector.getTypeWidth() <= 16, "Decimal bytes length should <= 16."); | ||
reuseBytes = new byte[vector.getTypeWidth()]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct? I don't think the type width is constant for each value. It might be similar to what you did in AvroBytesProducer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decimal is fixed width byte (either 16 or 32)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I think I have mixed up the requirement from Iceberg which says Stored as fixed using the minimum number of bytes for the given precision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies, have removed this, I was mirroring consumers which need to handle both bytes and fixed inputs, but for producers we can always used fixed and the type width is always known.
@Override | ||
public void produce(Encoder encoder) throws IOException { | ||
if (vector.isNull(currentIndex)) { | ||
encoder.writeInt(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean that you placed [type, null]
as the union type? Is it a good convention to put null
before type
so the default value is null when reader type does not match the writer type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried doing that, it broke the consumers and the data would not do round trip. In Avro this is considered as the difference between "nullable" and "optional", with "optional" being the semantics you are suggesting. I have kept it as nullable for now. I do think we will need to follow up with a PR on the consumers, particularly to address null handling. Nullable types get mangled in the schema round trip at the moment!
case MAP: | ||
MapVector mapVector = (MapVector) vector; | ||
StructVector entryVector = (StructVector) mapVector.getDataVector(); | ||
VarCharVector keyVector = (VarCharVector) entryVector.getChildrenFromFields().get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the key is not a string type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I put a check in the schema generation but not here. In Avro keys have to be string so if that is not the case I have thrown an error. Please leave this open and I will fix - out of time for today!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a check for this, now it will throw illegal argument if the key type is not string (which is required by Avro)
Hello. I have pushed a second iteration for review. Major updates are:
I have tried to make it so most Arrow types will map to an Avro type if there is a suitable target. The basic approach is:
I'm not sure about the last one, would it be better just to say we can't convert those types and throw an error up front? Large types (for text, binary, list) are not supported yet, there will be some extra thought needed on getting very large buffers between the two APIs. Interval / duration also not done yet, Arrow Interval resembles Avro Duration but is not an exact match and Arrow duration doesn't really have an equivalent that I could see. View types also not included as of now. I tried to move over to using long as the current row index, however a lot of the high-level APIs on the vector types use ints. For working directly with primitive types I was able to go to the underlying buffer (there were reasons to do that anyway in a few cases), but for the complex types that gets trickier because the type of the child vectors is not known. Is it ok to keep the interface using an int for current index for now? Most of the primitive producers are updated anyway, so the work to move over later would not be too much. I've run out of time for today. Next I plan to do the tests - like the consumers, will be exhaustive across types, combinations of nullability etc with some edge cases. Then the last thing I'd like to come back to on this PR is enums / dict encoding, just in case that has any impact on the structure of things would be good to know while it's still easy to change things. I have addressed some of the comments but not all, I will work through the rest but wanted to share this to check I'm still going in the right direction! |
Sorry, I've gotten caught up in my main job - I'll try to circle back here soon |
void skipNull(); | ||
|
||
/** Set the position to read value from vector. */ | ||
void setPosition(int index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let's leave it then. (We really need to fix this more generally...)
* | ||
* @return true if reset is successful, false if reset is not needed. | ||
*/ | ||
boolean resetValueVector(T vector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, do any of the implementations ever return false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing ever returns false and the value is never used. I have changed this to return void.
adapter/avro/src/main/java/org/apache/arrow/adapter/avro/producers/AvroStructProducer.java
Show resolved
Hide resolved
@Override | ||
public void produce(Encoder encoder) throws IOException { | ||
long nanos = vector.getDataBuffer().getLong(currentIndex * (long) TimeNanoVector.TYPE_WIDTH); | ||
long micros = nanos / NANOS_PER_MICRO; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should throw on truncation to be safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will truncate since we are going nanos -> micros so there is more range available. There is loss of precision - I think that is acceptable, since Avro does not have a time-nanos type (at least not yet)!
*/ | ||
public class AvroTimeMilliProducer extends AvroIntProducer { | ||
|
||
// Time in milliseconds stored as integer, matches Avro time-millis type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, Arrow timestamp type is always 8 bytes, so won't this suffer a range issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think time for seconds / millis are four byte width? TimeMilliVector. So shouldn't be a range issue for this one.
abstract class BaseTimestampTzProducer<T extends TimeStampVector> | ||
extends BaseAvroProducer<TimeStampVector> { | ||
|
||
// Convert TZ values to UTC to encode Avro timestamp types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arrow stores zoned timestamps in UTC already. The time zone associated with the type is purely used for display. So there should be no need for conversion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great! I took out all the timezone conversion logic, the TZ aware producers are much simpler now :-)
@Override | ||
public void produce(Encoder encoder) throws IOException { | ||
int unsigned = vector.getDataBuffer().getInt(currentIndex * (long) UInt4Vector.TYPE_WIDTH); | ||
long signed = unsigned & 0xffffffffL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated the code to use that method.
@Override | ||
public void produce(Encoder encoder) throws IOException { | ||
short unsigned = vector.getDataBuffer().getShort(currentIndex * (long) UInt2Vector.TYPE_WIDTH); | ||
int signed = unsigned & 0xffff; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, is Short#toUnsignedInt applicable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated the code to use that method.
* Arrow data types. | ||
* | ||
* <ul> | ||
* <li>ArrowType.Null --> NULL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not to nitpick the formatting but a table might be better? (also does Javadoc allow unclosed tags like this?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've converted the type mapping info into and HTML table - shows up nicely in the IDE auto complete :-)
* </ul> | ||
* | ||
* <p>Nullable fields are represented as a union of [null | base-type]. Special treatment is given | ||
* to nullability of unions - a union is considered nullable if the union field is nullable or any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I think nullable unions aren't semantically valid - unions have no validity bitmap in Arrow. The only way to represent this is with a nullable child field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the spec and yes you are correct! I have updated the logic accordingly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we update the docstring too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated the comment on unions. Also updated the comment for nullable types, which are encoded as [ base-type | null ], per the discussion above.
Thanks @lidavidm for all the comments - really helpful :-) I'll push an update in a few days to address these and also add the tests, will probably be after the weekend. |
Hi @lidavidm , @wgtmac - Have pushed another update, here is a quick summary:
Apologies for the delay, lots else going on as usual but I think this is getting closer now. If you let me know what else you'd like to see for this to be considered ready, I'll try to turn it around as quickly as I can. |
I'm fine splitting things into other PRs if it makes your life easier. It is known that the Java Union implementation is wrong, unfortunately. See #108 If you want to punt on Union for now that's fine by me. I'm a bit backlogged and will try to re-review when I get a chance but that may not be for a week or two, sorry. |
b2b81ee
to
81065bf
Compare
Thanks @lidavidm - good to know it is not just me going crazy! I have removed support of data production for unions for now. Also rebased on master. Totally understand the time pressure. I might work on some of the other pieces in the meantime as separate PRs if that's OK, to have a couple of bits ready for you to look at :-) Also read the thread on Unions. My company uses Arrow Java heavily in our product - time is a big issue for us as well but if I can find some I'll try to raise a few fixes that we're interested in. |
Hi @lidavidm - There were two test failures in CI relating to nano precision time values that didn't show up in my local machine. I have pushed a fix. |
CI looks good now - just the link missing on the PR to issue #615, sorry I don't have permission to add that! |
I think the PR check hit this issue - could you add the milestone to the issue manually for now? |
Please ignore the check 😬 |
Ok great - in that case I think the CI is done :) |
|
||
@Override | ||
public void setPosition(int index) { | ||
if (index < 0 || index > vector.getValueCount()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these checks be index >= getValueCount()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see your comment below now. It might be worth noting this somewhere (perhaps the docstring of the base class?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a doc comment on the base class as you suggested
* </ul> | ||
* | ||
* <p>Nullable fields are represented as a union of [null | base-type]. Special treatment is given | ||
* to nullability of unions - a union is considered nullable if the union field is nullable or any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we update the docstring too?
if ((timeType.getUnit() == TimeUnit.SECOND || timeType.getUnit() == TimeUnit.MILLISECOND)) { | ||
return builder.intBuilder().prop("logicalType", "time-millis").endInt().noDefault(); | ||
} else { | ||
// All other time types (sec, micro, nano) are encoded as time-micros (LONG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comment is out of sync with code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments updated
Hi - I think I have addressed all the new comments. Also found an issue in the fixed list producer - I updated the test cases to show this up and included a fix. Fairly sure all the complex type producers are correct now but would be worth a set of eyes!! The four I have are list, fixed size list, map and struct. Please let me know if there is anything else needs addressing. |
Hello - Thought I would make a start on step 2 in the Avro series. I have raised #698 to describe what I'm thinking. Still very happy to do another round of updates on this one if there are more comments as / when people have time to review - lmk :-) But since there are a few more PRs in the series to get the full capability I thought it was worth starting on the second one! |
…his seems to be what the Avro framework intends)
This last update uses Avro's logical type classes instead of setting the raw schema props. The output to disk is the same, but if you want to use the generated Avro Schema directly these structures should be available. Needed for round-trip testing. |
The proposal looks good to me. Sorry, I will try to give a hopefully final review here this week - as you have seen I tend to be scattered about so I appreciate your patience |
Ok well these CI failures aren't related to the PR, merging... |
Hi @lidavidm - Thanks so much for merging, I'm really happy to get this first piece in :-) Have started work on the second bit, once the main pieces are ready I will raise a new PR. I'm snowed under too so may have to put it down and come back a week later etc., very happy to work in async mode! |
What's Changed
Per discussion in #615 , here is a first take on the core producers to generate Avro data from Arrow vectors. There are a few points I'd like to clarify before going further:
Nullability. Avro only understands nullable types as unions but that is not normally how they will be if the data comes from other sources. I have added a special NullableProducer to handle nullable vectors which are not unions. We will need something equivalent in the consumers and probably a setting in the AvroToArrowConfig to control it on read, defaulting to current behaviour. I have also added special handling for nullable unions, because unions can't be nested (i.e. you can't nest "type | null" as a type inside a union). I can add consumers to handle both (unions and regular types) for review, if that sounds right? At the moment the schema for nullable fields gets quite mangled on a round trip!
Arrow has a lot more types than Avro, at the level of minor / vector types. Going Avro -> Arrow we just pick the direct equivalent. Going Arrow -> Avro, we could cast silently if there is no loss of precision. E.g. TinyInt and SmallInt -> Int and so on. For types like e.g. Decimal256 and LargeVarChar we could write out safely but would need support in the consumers to read back the wider types. I could start by adding the safe conversions now and we could come back to the wide types in a later PR maybe?
I have made Producer inherit AutoClosable the same as Consumer does. Not sure if that is always what you want though - it will free the buffers in the VSR, but you might want to keep the VSR after you are finished with IO. Do we need something like detachValueVector() to go with resetValueVector()? Calling close() after detach would not affect the vector.
Type information is inferred from the list of vectors, using minor types. We'll also need to generate the Avro schema, the input for that would be a list of fields. I haven't done it yet but will do if that sounds right.
Dictionary encoding for enums not implemented yet, I'll add it if the rest looks good. Caveat is that dictionaries must be fixed before the encoding starts if we are writing out the whole file in one go (i.e. if the Avro schema is at the start of the container file). If the schema is saved separately that limitation need not apply, we could provide the schema once encoding is finished.
Please do let me know if this is going in the right direction + any comments. If it is I will add the missing pieces and start the exhaustive test coverage to mirror the consumers. Once it's done this PR should get us to the point where we can round trip the contents of an individual block for most data types, but it does not address the container format.
Closes #615.