-
Notifications
You must be signed in to change notification settings - Fork 707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make projected parquet collection schema forward compatible with the given file schema #1921
base: develop
Are you sure you want to change the base?
Changes from 11 commits
e373ea2
721a490
0adbfae
84c119a
6bf7652
018f267
ae89c06
ca67991
b7363b8
2996317
e02d0a8
4d40c06
6e551e5
436def8
7ba2305
d2f2fb5
9c96d23
09bd4dd
83b6898
d0ed5d6
5de8b64
db937c3
0c38af8
3700a85
3b7255b
3a9be2e
5b5cf2b
e29c7e9
cd7e69c
b05f13c
7bb0382
6365db8
b6c8a92
579905c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
language: scala | ||
jdk: oraclejdk8 | ||
jdk: openjdk8 | ||
sudo: false | ||
|
||
before_install: | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,153 @@ | ||||||
package com.twitter.scalding.parquet.scrooge | ||||||
|
||||||
import java.util | ||||||
|
||||||
import org.apache.parquet.schema.Type.Repetition | ||||||
import org.apache.parquet.schema.{GroupType, MessageType, Type} | ||||||
import org.apache.parquet.thrift.DecodingSchemaMismatchException | ||||||
import org.slf4j.LoggerFactory | ||||||
|
||||||
import scala.collection.mutable | ||||||
|
||||||
object ParquetCollectionFormatForwardCompatibility { | ||||||
|
||||||
private val LOGGER = LoggerFactory.getLogger(getClass) | ||||||
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
/** | ||||||
* Create a forward-compatible schema, using content from source type with format from target type. | ||||||
* @param sourceType source type with legacy format | ||||||
* @param targetType target type to which source is converted to | ||||||
*/ | ||||||
def formatForwardCompatibleMessage(sourceType: MessageType, targetType: MessageType): MessageType = { | ||||||
val groupResult = formatForwardCompatibleType(sourceType, targetType).asGroupType() | ||||||
LOGGER.info("Making source schema to be compatible with target" + | ||||||
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
s"\nSource:\n${sourceType}\nTarget:\n${targetType}\nResult:\n${groupResult}") | ||||||
new MessageType(groupResult.getName, groupResult.getFields) | ||||||
} | ||||||
|
||||||
/** | ||||||
* Traverse source/target schemas and format nodes of list or map. | ||||||
* The formatting is not to one-to-one node swapping from source to target, | ||||||
* this is because the subset fields of source node and its optional/required must | ||||||
* be maintained in the formatted result. | ||||||
*/ | ||||||
private def formatForwardCompatibleType(sourceType: Type, targetType: Type): Type = { | ||||||
This comment was marked as outdated.
Sorry, something went wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
"source" and "target" are very generic names and this function is really only used for one thing. It might make sense to rename them accordingly to make reasoning about the correctness here easier. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From conversation thread below:
|
||||||
(unwrapGroup(sourceType), unwrapGroup(targetType)) match { | ||||||
case _ if sourceType.isPrimitive || targetType.isPrimitive => | ||||||
// Base case | ||||||
sourceType | ||||||
case ( | ||||||
GroupUnwrapped(sourceWrappers, Some(sourceRepeatedListType), None), | ||||||
GroupUnwrapped(_, Some(targetRepeatedListType), None) | ||||||
) => | ||||||
// Format list | ||||||
val sourceRule = ParquetListFormatRule.findFirstListRule(sourceRepeatedListType, Source) | ||||||
val targetRule = ParquetListFormatRule.findFirstListRule(targetRepeatedListType, Target) | ||||||
|
||||||
val formattedRepeated = (sourceRule, targetRule) match { | ||||||
case (Some(sourceRule), Some(targetRule)) => { | ||||||
val sourceElement = sourceRule.elementType(sourceRepeatedListType) | ||||||
val targetElement = targetRule.elementType(targetRepeatedListType) | ||||||
// Recurse on the element instead of `repeated` type because list still can have | ||||||
// different formats at repeated type | ||||||
val forwardCompatElement = formatForwardCompatibleType(sourceElement, targetElement) | ||||||
// Wrap the solved element with current source structure, and do actual conversion work | ||||||
val forwardCompatRepeated = ParquetListFormatRule.wrapElementAsRepeatedType( | ||||||
sourceRule, | ||||||
sourceRepeatedListType, | ||||||
forwardCompatElement | ||||||
) | ||||||
ParquetListFormatRule.formatForwardCompatibleRepeatedType( | ||||||
forwardCompatRepeated, | ||||||
targetRepeatedListType) | ||||||
} | ||||||
case _ => sourceRepeatedListType // No-op | ||||||
} | ||||||
// Wrapped the formatted repeated type in its original groups, | ||||||
// describing field name and whether it's optional/required | ||||||
sourceWrappers.foldRight(formattedRepeated) { | ||||||
(wrapper, group) => wrapper.withNewFields(group) | ||||||
} | ||||||
case ( | ||||||
GroupUnwrapped(sourceWrappers, None, Some(sourceRepeatedMapType)), | ||||||
GroupUnwrapped(_, None, Some(targetRepeatedMapType)) | ||||||
) => | ||||||
// Format map | ||||||
val forwardCompatRepeated = formatForwardCompatibleType(sourceRepeatedMapType, targetRepeatedMapType) | ||||||
val formattedRepeated = ParquetMapFormatRule.formatForwardCompatibleRepeatedType( | ||||||
forwardCompatRepeated, | ||||||
targetRepeatedMapType | ||||||
) | ||||||
// Wrapped the formatted repeated type in its original groups, | ||||||
// describing field name and whether it's optional/required | ||||||
sourceWrappers.foldRight(formattedRepeated) { | ||||||
(wrapper, group) => wrapper.withNewFields(group) | ||||||
} | ||||||
case _ => | ||||||
// Field projection | ||||||
val sourceGroup = sourceType.asGroupType | ||||||
val targetGroup = targetType.asGroupType | ||||||
|
||||||
import scala.collection.JavaConverters._ | ||||||
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
val resultFields = sourceGroup.getFields.asScala.map { sourceField => | ||||||
if (!targetGroup.containsField(sourceField.getName)) { | ||||||
if (!sourceField.isRepetition(Repetition.OPTIONAL)) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check is incorrect, I think. "source" == the file's schema, yes (see above renaming suggestion)? If so, then the optionality of a field which is not included in the projection is not important. We should be iterating over the fields in the projected (target) schema and checking the optionality of the projected fields if they're missing from the file. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah no, source here is actually the projection, and target is the file's scheme. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. on this point, I think naming convention along the lines of: fileSchema, readSchema, projectedFileSchema, projectedReadSchema all help clarify which is which |
||||||
throw new DecodingSchemaMismatchException( | ||||||
s"Found non-optional source field ${sourceField.getName}:\n$sourceField\n\n" + | ||||||
s"not present in the given target type:\n${targetGroup}" | ||||||
) | ||||||
} | ||||||
sourceField | ||||||
} | ||||||
else { | ||||||
val fieldIndex = targetGroup.getFieldIndex(sourceField.getName) | ||||||
val targetField = targetGroup.getFields.get(fieldIndex) | ||||||
formatForwardCompatibleType(sourceField, targetField) | ||||||
} | ||||||
} | ||||||
sourceGroup.withNewFields(resultFields.asJava) | ||||||
} | ||||||
} | ||||||
|
||||||
private case class GroupUnwrapped(wrappers: Seq[GroupType], | ||||||
repeatedListType: Option[Type] = None, | ||||||
repeatedMapType: Option[Type] = None) | ||||||
|
||||||
private def unwrapGroup(typ: Type, wrappers: Seq[GroupType] = Seq()): GroupUnwrapped = { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps a naive question, but what does "wrapped" or "wrapper" refer to in this case? Could you add a comment explaining this? That might make the following code easier to review. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I now simplified to just extract for only the repeated type, and let the recursive call in main method
|
||||||
if (typ.isPrimitive) { | ||||||
GroupUnwrapped( | ||||||
wrappers, | ||||||
repeatedListType=None, | ||||||
repeatedMapType=None | ||||||
) | ||||||
} else if (typ.asGroupType.getFieldCount != 1) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this if (typ.isPrimitive) {
// ...
} else {
val groupTyp = typ.asGroupTyp
if (groupTyp.getFieldCount != 1) {
//...
} else {
//...
}
} We might be able to further improve clarity by changing the sign of the second condition, doing if (typ.isPrimitive) {
// ...
} else {
val groupTyp = typ.asGroupTyp
if (groupTyp.getFieldCount == 1) { // <--- inverted condition
//...
} else {
//...
}
} which then simplifies or clarifies the
comment present in the current code. We could then add a comment in the
This is maybe nitpicky, but I'm trying to walk through each case here to make sure that I understand what all of the branches correspond to, so I'm considering whether a different ordering or presentation of the cases can make it easier to understand and verify. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simplified this to two methods to extract group list and maplist instead. |
||||||
GroupUnwrapped( | ||||||
wrappers :+ typ.asGroupType(), | ||||||
repeatedListType=None, | ||||||
repeatedMapType=None | ||||||
) | ||||||
} else { | ||||||
// Note the field count is strictly 1 here, and the wrappers will be used later | ||||||
// to wrap back the formatted results. | ||||||
if (ParquetListFormatRule.isGroupList(typ)) { | ||||||
GroupUnwrapped( | ||||||
wrappers :+ typ.asGroupType(), | ||||||
repeatedListType=Some(typ.asGroupType.getFields.get(0)), | ||||||
repeatedMapType = None | ||||||
) | ||||||
} else if (ParquetMapFormatRule.isGroupMap(typ)) { | ||||||
GroupUnwrapped( | ||||||
wrappers :+ typ.asGroupType(), | ||||||
repeatedListType=None, | ||||||
repeatedMapType=Some(typ.asGroupType.getFields.get(0)) | ||||||
) | ||||||
} else { | ||||||
unwrapGroup(typ.asGroupType.getFields.get(0), wrappers :+ typ.asGroupType()) | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
trait ParquetCollectionFormatRule { | ||||||
def formatForwardCompatibleRepeatedType(sourceRepeatedMapType: Type, targetRepeatedMapType: Type): Type | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parquet is complicated because there are always 4 schemas involved: the write-time schema, the read-time schema, and a projected schema in terms of the write-schema, and a projected schema in terms of the read-time schema. I find this really hard to keep track of, so I think it'd be a huge help to me and future readers to put a really detailed comment on this class explaining what it's for, what it makes compatible, what the problem is, how it solves it, etc. I think for something like this, the more detailed the better.