Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make projected parquet collection schema forward compatible with the given file schema #1921

Open
wants to merge 34 commits into
base: develop
Choose a base branch
from

Conversation

mickjermsurawong-stripe
Copy link

@mickjermsurawong-stripe mickjermsurawong-stripe commented Sep 27, 2019

message compliant_schema {
  required group country_codes (LIST) {
    repeated binary array (UTF8);
  }
  required int32 to_be_removed;
}

will fail with the generated projection

message thrift_generated {
  optional group country_codes (LIST) {
    repeated binary country_codes_tuple (UTF8);
  }
}
  • This PR introduces ParquetListFormatForwardCompatibility to convert thrift-generated schema to compliant ones. The difficulties here are:

    • we can not simply just traverse the two graphs and swap node one-for-one. The node has to be from the projection, because it has subset of fields and different optional/required info.
    • there are at least 4 variations of compliant parquet types, and we want to convert from non-compliant to compliant, and among the compliant.
  • The strategy here is to implement different rules for each format. The rule tell us how to decompose the list schema, and to reconstruct it again.

    • For example, we match projected schema with Rule 1, and match file schema with Rule 2.
    • Rule 1 will decompose the projected schema, and Rule 2 will take that information to construct in light of the file schema.

@CLAassistant
Copy link

CLAassistant commented Sep 27, 2019

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.

✅ mickjermsurawong-stripe
❌ joshrosen-stripe
You have signed the CLA already but the status is still pending? Let us recheck it.

@mickjermsurawong-stripe mickjermsurawong-stripe changed the title Make projected parquet schema forward compatible with the given file schema [WIP] Make projected parquet schema forward compatible with the given file schema Sep 27, 2019
@mickjermsurawong-stripe
Copy link
Author

cc @moulimukherjee

@mickjermsurawong-stripe
Copy link
Author

Hi @johnynek,
We ran into a problem where Scalding job fails reading parequet files produced by another Spark job. The problem we discovered is due to projected schema which is generated from Thrift class has different list format than our Spark job.

Just would like to ask for a quick look whether this is the right strategy, and if this is something that we would like to push it to the public one as well.

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.

Have we run this internally to do some smoke tests that it can read old data that was previously written?

I added some general comments. I really don't know the latest with parquet. Maybe @isnotinvain or @julienledem can comment?

cc @ttim

@johnynek
Copy link
Collaborator

Also, you need to bump the .travis.yml jdk setting to openjdk8 since oraclejdk8 I think no longer works on travis.

@mickjermsurawong-stripe
Copy link
Author

Thanks @johnynek. Ah I haven't done any tests internally on actual data. Just purely unit tests now. Will follow up with that.

I can port all these to Scala, and will make the resolver stateless; yup the object instantiation isn't neccessary.

@mickjermsurawong-stripe mickjermsurawong-stripe changed the title [WIP] Make projected parquet schema forward compatible with the given file schema Make projected parquet collection schema forward compatible with the given file schema Sep 30, 2019
@mickjermsurawong-stripe
Copy link
Author

hi @isnotinvain or @julienledem, I've addressed the first round of general feedback, and added support for map schema compat as well. Would appreciate your review here.

Copy link

@joshrosen-stripe joshrosen-stripe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some initial review comments, primarily focused on understanding (and potentially simplifying) two of the larger and more complex pattern matches at the heart of the converter.

I also left a couple of minor style nits, but those can be addressed later (in a final polish pass) once we're confident in the rest of the design.

I haven't finished looking at this yet, though: I still need to go through ParquetListFormatRule and also need to take a closer look at the test cases.

repeatedListType: Option[Type] = None,
repeatedMapType: Option[Type] = None)

private def unwrapGroup(typ: Type, wrappers: Seq[GroupType] = Seq()): GroupUnwrapped = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a naive question, but what does "wrapped" or "wrapper" refer to in this case? Could you add a comment explaining this? That might make the following code easier to review.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now simplified to just extract for only the repeated type, and let the recursive call in main method formatForwardCompatibleType handle multiple layers instead.
This now becomes a sealed trait for the group type itself, and a helper method to get its repeated type

private[scrooge] sealed trait CollectionGroup {
  def groupType: GroupType

  def repeatedType: Type
}

}
}

def isGroupList(projection: Type): Boolean = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm my understanding:

This is matching on things like

required group my_list (LIST) {
  repeated group list {
    optional binary element (UTF8);
  }
}

where groupProjection here refers to the top-most type, and the groupProjection.getFields.get(0).isRepetition(Type.Repetition.REPEATED) check would apply to the second-level element (repeated group list)?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup that's correct on groupProjection, and the repeated is the second level.

The part after repeated could be different depending on formats eg.

required group my_list (LIST) {
  repeated repeated int32 [element|array];
}

repeatedListType=None,
repeatedMapType=None
)
} else if (typ.asGroupType.getFieldCount != 1) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this else if branch and the else branch I noticed we have typ.asGroupType calls everywhere. To avoid that and to make it clear that !typ.isPrimitive implies that typ is a group type, could we instead do something like

if (typ.isPrimitive) {
   // ...
} else {
  val groupTyp = typ.asGroupTyp
  if (groupTyp.getFieldCount != 1) {
     //...
  } else {
     //...
  }
}

We might be able to further improve clarity by changing the sign of the second condition, doing

if (typ.isPrimitive) {
   // ...
} else {
  val groupTyp = typ.asGroupTyp
  if (groupTyp.getFieldCount == 1) {  // <--- inverted condition
     //...
  } else {
     //...
  }
}

which then simplifies or clarifies the

// Note the field count is strictly 1 here

comment present in the current code.

We could then add a comment in the else branch saying something like

// The group contains more than one field, so ... <whatever that implies>

This is maybe nitpicky, but I'm trying to walk through each case here to make sure that I understand what all of the branches correspond to, so I'm considering whether a different ordering or presentation of the cases can make it easier to understand and verify.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified this to two methods to extract group list and maplist instead.

* this is because the subset fields of source node and its optional/required must
* be maintained in the formatted result.
*/
private def formatForwardCompatibleType(sourceType: Type, targetType: Type): Type = {

This comment was marked as outdated.


private[scrooge] def isElementRequired(repeatedType: Type): Boolean

private[scrooge] def check(typ: Type): Boolean

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the name, it's a bit non-obvious what this method does. It this actually something like appliesToType(typ: Type) which returns true if the type is a list of this format and false otherwise?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on naming this something more descriptive

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as suggested

def name: String
}

private[scrooge] object Source extends SourceOrTarget {
Copy link
Contributor

@thomaschow thomaschow Sep 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the advantage of distinguishing between Source and Target? is it so we can define supported directionalities in terms of parquet spec conversion? I'm wondering if we're being too specific here and wind up seeing a direction that we didn't engineer for, and we fail

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess since scalding is going to be stuck on the older parquet spec, this would not happen.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. Will see how I can do end-to-end testing in this repo.
Ah the only thing i don't support here is the target type of _tuple format which then would be compatible with the thrift-generated projection source

override def check(repeatedType: Type): Boolean = repeatedType.getName.endsWith("_tuple")

override def elementName(repeatedType: Type): String = {
repeatedType.getName.substring(0, repeatedType.getName.length - 6)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would like docs or variable/method renaming to explain this substring indexing a little more clearly

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now made _tuple as a variable where length is calculated from it more explicitly.

import org.apache.parquet.thrift.struct.ThriftType.{ListType, MapType, StructType}
import org.scalatest.{Matchers, WordSpec}

class ParquetCollectionFormatForwardCompatibilityTests extends WordSpec with Matchers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc how hard would it be to add some actual thrift fixtures based tests in here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. Will see how I can do end-to-end testing in this repo.

@mickjermsurawong-stripe
Copy link
Author

@joshrosen-stripe, @tchow-stripe
Thanks for the really helpful review. I've simplified many parts of the code, and addressed the PR feedback.
Pending issues are figuring out to do end-to-end testing here.
@isnotinvain this is ready for another round of review.

Copy link

@xton xton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This review is incomplete, I'll finish later this evening. I've got some rename suggestions and some possible bugs so far.

sourceGroup.groupType.withNewFields(formattedRepeated)
}

private def findCollectionGroup(typ: Type): Option[CollectionGroup] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming nit:

Suggested change
private def findCollectionGroup(typ: Type): Option[CollectionGroup] = {
private def extractCollectionGroup(typ: Type): Option[CollectionGroup] = {

"find" implies that the return values will be a subset of the input, while "extract" (which the underlying functions use) suggests that the whole input will be considered.

formatForwardCompatibleCollectionGroup[ListGroup](sourceGroup, targetGroup)
case (Some(sourceGroup: MapGroup), Some(targetGroup: MapGroup)) =>
formatForwardCompatibleCollectionGroup[MapGroup](sourceGroup, targetGroup)
case _ if sourceType.isPrimitive || targetType.isPrimitive => // Base case
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would explicitly check and throw if one is a primitive but not the other. As written this will silently pass through certain schema mismatches.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

* this is because the subset fields of source node and its optional/required must
* be maintained in the formatted result.
*/
private def formatForwardCompatibleType(sourceType: Type, targetType: Type): Type = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private def formatForwardCompatibleType(sourceType: Type, targetType: Type): Type = {
private def formatForwardCompatibleType(fileType: Type, projectionType: Type): Type = {

"source" and "target" are very generic names and this function is really only used for one thing. It might make sense to rename them accordingly to make reasoning about the correctness here easier.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From conversation thread below:

By that i mean we want to convert projection source schema to be the in the format of the target file schema.

val targetGroup = targetType.asGroupType
val resultFields = sourceGroup.getFields.asScala.map { sourceField =>
if (!targetGroup.containsField(sourceField.getName)) {
if (!sourceField.isRepetition(Repetition.OPTIONAL)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is incorrect, I think. "source" == the file's schema, yes (see above renaming suggestion)? If so, then the optionality of a field which is not included in the projection is not important. We should be iterating over the fields in the projected (target) schema and checking the optionality of the projected fields if they're missing from the file.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah no, source here is actually the projection, and target is the file's scheme.
By that i mean we want to convert projection source schema to be the in the format of the target file schema.
Given this confusion, i think makes sense that I should rename this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on this point, I think naming convention along the lines of: fileSchema, readSchema, projectedFileSchema, projectedReadSchema all help clarify which is which

Copy link

@xton xton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One potential simplification and some rename/docs nits. Looking good!

fileGroup.repeatedType,
projectFileType(_, _))
// Respect optional/required from the projected read group.
projectedReadGroup.groupType.withNewFields(projectedFileRepeatedType)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this end up letting through cases where the file's field is optional and the projection's field is required? If so we should explicitly detect and throw for that (probably in projectFileType).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the assertion at the "struct projection" which will have access of field-level optional/required

override val rules: Seq[ParquetListFormatRule] = Seq(
PrimitiveElementRule, PrimitiveArrayRule,
GroupElementRule, GroupArrayRule,
TupleRule, StandardRule
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Soooo many cases... That said, I like this approach. Makes it painfully clear the variants that we're supporting.

Comment on lines 89 to 98
private def projectFileGroup[T <: CollectionGroup](projectedReadGroup: T,
fileGroup: T,
fieldContext: FieldContext)(implicit t: ClassTag[T]): GroupType = {

val (formatter, updatedFieldContext) = t.runtimeClass.asInstanceOf[Class[T]] match {
case c if c == classOf[MapGroup] =>
(ParquetMapFormatter, fieldContext.copy(nestedListLevel = fieldContext.nestedListLevel + 1))
case c if c == classOf[ListGroup] =>
(ParquetListFormatter, fieldContext.copy(nestedListLevel = fieldContext.nestedListLevel + 1))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's refactor this to avoid reflection and implicit magic. E.g.:

Suggested change
private def projectFileGroup[T <: CollectionGroup](projectedReadGroup: T,
fileGroup: T,
fieldContext: FieldContext)(implicit t: ClassTag[T]): GroupType = {
val (formatter, updatedFieldContext) = t.runtimeClass.asInstanceOf[Class[T]] match {
case c if c == classOf[MapGroup] =>
(ParquetMapFormatter, fieldContext.copy(nestedListLevel = fieldContext.nestedListLevel + 1))
case c if c == classOf[ListGroup] =>
(ParquetListFormatter, fieldContext.copy(nestedListLevel = fieldContext.nestedListLevel + 1))
}
private def projectFileGroup(projectedReadGroup: T, fileGroup: T, fieldContext: FieldContext, formatter: ParquetCollectionFormatter): GroupType = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to

def projectFileGroup(fileGroup: CollectionGroup,
                               projectedReadGroup: CollectionGroup,
                               fieldContext: FieldContext,
                               formatter: ParquetCollectionFormatter)

@mickjermsurawong-stripe
Copy link
Author

mickjermsurawong-stripe commented Oct 4, 2019

Thanks @xton, addressed the feedback.
@isnotinvain could you help to take a look on this please?

private def projectFileType(fileType: Type, projectedReadType: Type, fieldContext: FieldContext): Type = {
(extractCollectionGroup(projectedReadType), extractCollectionGroup(fileType)) match {
case _ if projectedReadType.isPrimitive && fileType.isPrimitive =>
projectedReadType
Copy link

@joshrosen-stripe joshrosen-stripe Oct 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check that both types are the same primitive type? Or is that handled elsewhere? I imagine that certain types of primitive-type-changing projections might be supported (e.g. widening an int to a long) but others might not be (e.g. converting a string to an int).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That type compatibility is handled in a separate place here, so by the time this is called we should already have type compatibility.

public static MessageType getSchemaForRead(MessageType fileMessageType, MessageType projectedMessageType) {
assertGroupsAreCompatible(fileMessageType, projectedMessageType);
return projectedMessageType;
}

def formatCompatibleRepeatedType(fileRepeatedType: Type,
readRepeatedType: Type,
fieldContext: FieldContext,
recursiveSolver: (Type, Type, FieldContext) => Type): Type

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, I think this is always invoked with the projectFileType method defined above?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup it is. My thought is to make this recursive call explicit from the caller side so it's clear from the reading within this format compatbility layer instead of jumping into the respective formatter which then cyclically refers back to the caller's method.

joshrosen-stripe and others added 3 commits October 5, 2019 15:03
* Add more comments; add explicit else conditions to some if cases; lift base case out of pattern match

* Formatting changes

* More minor formatting.
@mickjermsurawong-stripe
Copy link
Author

hi @isnotinvain, would like to follow-up on this if you have more feedback here please.

@mickjermsurawong-stripe
Copy link
Author

Hi @isnotinvain, another bump on this please.

Updates from our side: we actually have implemented this in our internally (via subclassing of ParquetScroogeScheme), and it does cover production use cases of read Spark job schema.

  • list having 3 level, and 2-level in legacy mode, compared to _tuple suffixed via generated schema from thrift class.
  • map having repeated group named key_value instead of map
    This will also cover more conversion types as outlined in ParquetMapFormatter and ParquetListFormatter

Without this patch, scalding job reading Spark job output can either fail hard or falsely read empty collection.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants