Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make projected parquet collection schema forward compatible with the given file schema #1921

Open
wants to merge 34 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e373ea2
forward list compat
mickjermsurawong-stripe Sep 27, 2019
721a490
bump travis to openjdk8
mickjermsurawong-stripe Sep 28, 2019
0adbfae
fail when projecting on list of struct
mickjermsurawong-stripe Sep 29, 2019
84c119a
recurse only on elements
mickjermsurawong-stripe Sep 29, 2019
6bf7652
check for non-optional extra fields for projection
mickjermsurawong-stripe Sep 30, 2019
018f267
move to scala
mickjermsurawong-stripe Sep 30, 2019
ae89c06
migrate test to different class
mickjermsurawong-stripe Sep 30, 2019
ca67991
handle map legacy format
mickjermsurawong-stripe Sep 30, 2019
b7363b8
format import
mickjermsurawong-stripe Sep 30, 2019
2996317
improve docs
mickjermsurawong-stripe Sep 30, 2019
e02d0a8
fix style warning
mickjermsurawong-stripe Sep 30, 2019
4d40c06
address PR feedback
mickjermsurawong-stripe Oct 1, 2019
6e551e5
undo import wildcard
mickjermsurawong-stripe Oct 1, 2019
436def8
fix warning descenents of sealed traits
mickjermsurawong-stripe Oct 1, 2019
7ba2305
remove duplicate test
mickjermsurawong-stripe Oct 1, 2019
d2f2fb5
improve test names and remove one duplicate
mickjermsurawong-stripe Oct 1, 2019
9c96d23
add docs
mickjermsurawong-stripe Oct 1, 2019
09bd4dd
Check on schema type mismatch
mickjermsurawong-stripe Oct 3, 2019
83b6898
explicit rename from source/target to projected read schema and file …
mickjermsurawong-stripe Oct 3, 2019
d0ed5d6
support creating _tuple format and generalize compat in all directions
mickjermsurawong-stripe Oct 3, 2019
5de8b64
support legacy spark list of nullable elements
mickjermsurawong-stripe Oct 3, 2019
db937c3
improve docs
mickjermsurawong-stripe Oct 3, 2019
0c38af8
add tests for all supported list compat conversions
mickjermsurawong-stripe Oct 3, 2019
3700a85
improve docs
mickjermsurawong-stripe Oct 4, 2019
3b7255b
remove classtag inference
mickjermsurawong-stripe Oct 4, 2019
3a9be2e
rename schema name and use consistent method
mickjermsurawong-stripe Oct 4, 2019
5b5cf2b
file rename to drop "forward" compat
mickjermsurawong-stripe Oct 4, 2019
e29c7e9
test rename and make variables consistent
mickjermsurawong-stripe Oct 4, 2019
cd7e69c
make names file/read oriented
mickjermsurawong-stripe Oct 4, 2019
b05f13c
improve test make sure formatted type is still compatible with given …
mickjermsurawong-stripe Oct 4, 2019
7bb0382
check for field optional/required
mickjermsurawong-stripe Oct 4, 2019
6365db8
Review suggestions for https://github.com/twitter/scalding/pull/1921 …
joshrosen-stripe Oct 5, 2019
b6c8a92
improve code coverage and remove dead code after restructuring
mickjermsurawong-stripe Oct 5, 2019
579905c
auto-format from running sbt test
mickjermsurawong-stripe Oct 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
language: scala
jdk: oraclejdk8
jdk: openjdk8
sudo: false

before_install:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@
import org.apache.parquet.thrift.projection.ThriftProjectionException;
import org.apache.parquet.thrift.struct.ThriftType;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;

/**
* Read support for Scrooge
Expand Down Expand Up @@ -132,7 +130,9 @@ public static MessageType getSchemaForRead(MessageType fileMessageType, String p
*/
public static MessageType getSchemaForRead(MessageType fileMessageType, MessageType projectedMessageType) {
assertGroupsAreCompatible(fileMessageType, projectedMessageType);
return projectedMessageType;
return ParquetCollectionFormatForwardCompatibility.formatForwardCompatibleMessage(
projectedMessageType, fileMessageType
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package com.twitter.scalding.parquet.scrooge

import java.util

import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{GroupType, MessageType, Type}
import org.apache.parquet.thrift.DecodingSchemaMismatchException
import org.slf4j.LoggerFactory

import scala.collection.mutable

object ParquetCollectionFormatForwardCompatibility {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet is complicated because there are always 4 schemas involved: the write-time schema, the read-time schema, and a projected schema in terms of the write-schema, and a projected schema in terms of the read-time schema. I find this really hard to keep track of, so I think it'd be a huge help to me and future readers to put a really detailed comment on this class explaining what it's for, what it makes compatible, what the problem is, how it solves it, etc. I think for something like this, the more detailed the better.


private val LOGGER = LoggerFactory.getLogger(getClass)
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create a forward-compatible schema, using content from source type with format from target type.
* @param sourceType source type with legacy format
* @param targetType target type to which source is converted to
*/
def formatForwardCompatibleMessage(sourceType: MessageType, targetType: MessageType): MessageType = {
val groupResult = formatForwardCompatibleType(sourceType, targetType).asGroupType()
LOGGER.info("Making source schema to be compatible with target" +
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved Hide resolved
s"\nSource:\n${sourceType}\nTarget:\n${targetType}\nResult:\n${groupResult}")
new MessageType(groupResult.getName, groupResult.getFields)
}

/**
* Traverse source/target schemas and format nodes of list or map.
* The formatting is not to one-to-one node swapping from source to target,
* this is because the subset fields of source node and its optional/required must
* be maintained in the formatted result.
*/
private def formatForwardCompatibleType(sourceType: Type, targetType: Type): Type = {

This comment was marked as outdated.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private def formatForwardCompatibleType(sourceType: Type, targetType: Type): Type = {
private def formatForwardCompatibleType(fileType: Type, projectionType: Type): Type = {

"source" and "target" are very generic names and this function is really only used for one thing. It might make sense to rename them accordingly to make reasoning about the correctness here easier.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From conversation thread below:

By that i mean we want to convert projection source schema to be the in the format of the target file schema.

(unwrapGroup(sourceType), unwrapGroup(targetType)) match {
case _ if sourceType.isPrimitive || targetType.isPrimitive =>
// Base case
sourceType
case (
GroupUnwrapped(sourceWrappers, Some(sourceRepeatedListType), None),
GroupUnwrapped(_, Some(targetRepeatedListType), None)
) =>
// Format list
val sourceRule = ParquetListFormatRule.findFirstListRule(sourceRepeatedListType, Source)
val targetRule = ParquetListFormatRule.findFirstListRule(targetRepeatedListType, Target)

val formattedRepeated = (sourceRule, targetRule) match {
case (Some(sourceRule), Some(targetRule)) => {
val sourceElement = sourceRule.elementType(sourceRepeatedListType)
val targetElement = targetRule.elementType(targetRepeatedListType)
// Recurse on the element instead of `repeated` type because list still can have
// different formats at repeated type
val forwardCompatElement = formatForwardCompatibleType(sourceElement, targetElement)
// Wrap the solved element with current source structure, and do actual conversion work
val forwardCompatRepeated = ParquetListFormatRule.wrapElementAsRepeatedType(
sourceRule,
sourceRepeatedListType,
forwardCompatElement
)
ParquetListFormatRule.formatForwardCompatibleRepeatedType(
forwardCompatRepeated,
targetRepeatedListType)
}
case _ => sourceRepeatedListType // No-op
}
// Wrapped the formatted repeated type in its original groups,
// describing field name and whether it's optional/required
sourceWrappers.foldRight(formattedRepeated) {
(wrapper, group) => wrapper.withNewFields(group)
}
case (
GroupUnwrapped(sourceWrappers, None, Some(sourceRepeatedMapType)),
GroupUnwrapped(_, None, Some(targetRepeatedMapType))
) =>
// Format map
val forwardCompatRepeated = formatForwardCompatibleType(sourceRepeatedMapType, targetRepeatedMapType)
val formattedRepeated = ParquetMapFormatRule.formatForwardCompatibleRepeatedType(
forwardCompatRepeated,
targetRepeatedMapType
)
// Wrapped the formatted repeated type in its original groups,
// describing field name and whether it's optional/required
sourceWrappers.foldRight(formattedRepeated) {
(wrapper, group) => wrapper.withNewFields(group)
}
case _ =>
// Field projection
val sourceGroup = sourceType.asGroupType
val targetGroup = targetType.asGroupType

import scala.collection.JavaConverters._
mickjermsurawong-stripe marked this conversation as resolved.
Show resolved Hide resolved
val resultFields = sourceGroup.getFields.asScala.map { sourceField =>
if (!targetGroup.containsField(sourceField.getName)) {
if (!sourceField.isRepetition(Repetition.OPTIONAL)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is incorrect, I think. "source" == the file's schema, yes (see above renaming suggestion)? If so, then the optionality of a field which is not included in the projection is not important. We should be iterating over the fields in the projected (target) schema and checking the optionality of the projected fields if they're missing from the file.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah no, source here is actually the projection, and target is the file's scheme.
By that i mean we want to convert projection source schema to be the in the format of the target file schema.
Given this confusion, i think makes sense that I should rename this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on this point, I think naming convention along the lines of: fileSchema, readSchema, projectedFileSchema, projectedReadSchema all help clarify which is which

throw new DecodingSchemaMismatchException(
s"Found non-optional source field ${sourceField.getName}:\n$sourceField\n\n" +
s"not present in the given target type:\n${targetGroup}"
)
}
sourceField
}
else {
val fieldIndex = targetGroup.getFieldIndex(sourceField.getName)
val targetField = targetGroup.getFields.get(fieldIndex)
formatForwardCompatibleType(sourceField, targetField)
}
}
sourceGroup.withNewFields(resultFields.asJava)
}
}

private case class GroupUnwrapped(wrappers: Seq[GroupType],
repeatedListType: Option[Type] = None,
repeatedMapType: Option[Type] = None)

private def unwrapGroup(typ: Type, wrappers: Seq[GroupType] = Seq()): GroupUnwrapped = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a naive question, but what does "wrapped" or "wrapper" refer to in this case? Could you add a comment explaining this? That might make the following code easier to review.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now simplified to just extract for only the repeated type, and let the recursive call in main method formatForwardCompatibleType handle multiple layers instead.
This now becomes a sealed trait for the group type itself, and a helper method to get its repeated type

private[scrooge] sealed trait CollectionGroup {
  def groupType: GroupType

  def repeatedType: Type
}

if (typ.isPrimitive) {
GroupUnwrapped(
wrappers,
repeatedListType=None,
repeatedMapType=None
)
} else if (typ.asGroupType.getFieldCount != 1) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this else if branch and the else branch I noticed we have typ.asGroupType calls everywhere. To avoid that and to make it clear that !typ.isPrimitive implies that typ is a group type, could we instead do something like

if (typ.isPrimitive) {
   // ...
} else {
  val groupTyp = typ.asGroupTyp
  if (groupTyp.getFieldCount != 1) {
     //...
  } else {
     //...
  }
}

We might be able to further improve clarity by changing the sign of the second condition, doing

if (typ.isPrimitive) {
   // ...
} else {
  val groupTyp = typ.asGroupTyp
  if (groupTyp.getFieldCount == 1) {  // <--- inverted condition
     //...
  } else {
     //...
  }
}

which then simplifies or clarifies the

// Note the field count is strictly 1 here

comment present in the current code.

We could then add a comment in the else branch saying something like

// The group contains more than one field, so ... <whatever that implies>

This is maybe nitpicky, but I'm trying to walk through each case here to make sure that I understand what all of the branches correspond to, so I'm considering whether a different ordering or presentation of the cases can make it easier to understand and verify.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified this to two methods to extract group list and maplist instead.

GroupUnwrapped(
wrappers :+ typ.asGroupType(),
repeatedListType=None,
repeatedMapType=None
)
} else {
// Note the field count is strictly 1 here, and the wrappers will be used later
// to wrap back the formatted results.
if (ParquetListFormatRule.isGroupList(typ)) {
GroupUnwrapped(
wrappers :+ typ.asGroupType(),
repeatedListType=Some(typ.asGroupType.getFields.get(0)),
repeatedMapType = None
)
} else if (ParquetMapFormatRule.isGroupMap(typ)) {
GroupUnwrapped(
wrappers :+ typ.asGroupType(),
repeatedListType=None,
repeatedMapType=Some(typ.asGroupType.getFields.get(0))
)
} else {
unwrapGroup(typ.asGroupType.getFields.get(0), wrappers :+ typ.asGroupType())
}
}
}
}

trait ParquetCollectionFormatRule {
def formatForwardCompatibleRepeatedType(sourceRepeatedMapType: Type, targetRepeatedMapType: Type): Type
}
Loading