Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: add JSON parser for ContentFile and FileScanTask #6934

Merged
merged 12 commits into from
Jun 26, 2023

Conversation

stevenzwu
Copy link
Contributor

@stevenzwu stevenzwu commented Feb 24, 2023

this closes issue #1698.

There are two motivations as described by issue #1698.

  1. provide a more stable serialization (than Java serialization) for Flink checkpoint
  2. can be used by REST catalog for scan planning or committing files

/**
* Return the schema for this file scan task.
*/
default Schema schema() {
Copy link
Contributor Author

@stevenzwu stevenzwu Feb 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed so that FileScanTaskParser (added in this PR) can serialize the schema. Then during the deserialization part, schema can be pass into the constructor of BaseFileScanTask.

Keep it at this level (not base ContentScanTask interface or lower) to limit the scope of change.

@@ -52,12 +53,24 @@ public F file() {
return file;
}

protected Schema schema() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exposed as protected so that BaseFileScanTask can use it to implement the FileScanTask#schema() method

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Little odd that we reverse engineer the schema from the string here, but seems like the most backwards compatible thing we can do here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree it is a little odd. On the other hand, partition spec is in the same model in this class. As you said, otherwise we would have to change the constructors of a bunch of classes. The current choice of passing schema and spec as strings is to make those scan tasks serializable.

  @Override
  public PartitionSpec spec() {
    if (spec == null) {
      synchronized (this) {
        if (spec == null) {
          this.spec = PartitionSpecParser.fromJson(schema(), specString);
        }
      }
    }
    return spec;
  }

cc @nastra

import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.JsonUtil;

class ContentFileParser {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since DataFile and DeleteFile has the same structure, calling this ContentFileParser without any generic type.

@@ -134,6 +134,8 @@ public static class Builder {
private Map<Integer, ByteBuffer> upperBounds = null;
private ByteBuffer keyMetadata = null;
private List<Long> splitOffsets = null;
private List<Integer> equalityFieldIds = null;
private Integer sortOrderId = SortOrder.unsorted().orderId();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relocated the line here to follow the same order of definition

@@ -134,6 +134,8 @@ public static class Builder {
private Map<Integer, ByteBuffer> upperBounds = null;
private ByteBuffer keyMetadata = null;
private List<Long> splitOffsets = null;
private List<Integer> equalityFieldIds = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a setter for equalityFieldIds so that the parser unit test can cover this field too.


private final PartitionSpec spec;

ContentFileParser(PartitionSpec spec) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike other JSON parser with a static singleton pattern, ContentFileParser depends on the partition spec. Hence this is a regular class and constructor.

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did a high-level pass over the parsers themselves and left a few comments. I haven't had a chance to look closer at the tests yet

Copy link
Contributor Author

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra thx a lot for the initial review. I addressed the comments in the latest commit

@stevenzwu stevenzwu force-pushed the issue-1698-split-json branch from a8062a7 to 4d57100 Compare April 5, 2023 02:41
Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the late re-review @stevenzwu, I've left a few more comments.

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been mainly focusing on the JSON parsers and left a few comments, but overall this looks almost ready. It would be great to get some additional input from another reviewer

import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestContentFileParser {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to also add a test with a plain JSON string to see how the full JSON looks like. And then maybe also another test with a plain JSON string where all optional fields (metrics, equality field ids, sort order id, split offsets, ...) are missing


JsonNode pNode = node.get(property);
Preconditions.checkArgument(
pNode.isTextual(), "Cannot parse from non-text value: %s: %s", property, pNode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we should mention that we're trying to parse this from text to a binary representation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also fixed a couple other error msgs with the same problem.

@stevenzwu
Copy link
Contributor Author

Spark CI build failed with some seemingly env problem

        Caused by:
        java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rwxr-xr-x
            at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:724)
            at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:654)
            at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:586)
            at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:548)
            at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:174)
            at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:129)
            at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
            at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
            at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
            at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
            at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:293)
            at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:492)
            at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:352)
            at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:71)
            at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:70)
            at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:224)
            at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
            at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)

@stevenzwu stevenzwu force-pushed the issue-1698-split-json branch from 61e40a7 to 0016f36 Compare June 24, 2023 03:01
@stevenzwu stevenzwu force-pushed the issue-1698-split-json branch from a465d34 to 8105811 Compare June 25, 2023 14:35
@stevenzwu
Copy link
Contributor Author

merging after rebase

@stevenzwu stevenzwu merged commit b8db3f0 into apache:master Jun 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants