Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow loading custom Catalog implementation in Spark and Flink #1640

Merged
merged 1 commit into from
Nov 4, 2020

Conversation

jackye1995
Copy link
Contributor

@jackye1995 jackye1995 commented Oct 22, 2020

As we are having multiple new Catalog implementations added to Iceberg, we need a way to load those Catalogs in Spark and Flink easily. Currently there is a simple switch branch that chooses between hive and hadoop catalogs. This approach requires the iceberg-spark and iceberg-flink module to take a dependency on the catalog implementation modules. This would potentially bring in many unnecessary dependencies as more and more cloud providers try to add support for Iceberg.

This PR proposes the following way to load custom Catalog implementations:

  1. the type of a catalog can be hive or hadoop to keep existing behaviors
  2. if catalog-impl is set, type is ignored and we will load catalog based on the class value
  3. A catalog must have a no-arg constructor be initialized by calling initialize()
  4. A catalog must implement Hadoop Configurable to read Hadoop configuration

For example, a GlueCatalog will be used in Spark like the following:

spark.sql.catalog.glue = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.glue.catalog-impl = org.apache.iceberg.aws.glue.GlueCatalog

@giovannifumarola
Copy link

Thanks Jack. I like the following approach.
spark.sql.catalog.glue = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.glue.type = custom
spark.sql.catalog.glue.impl = org.apache.iceberg.aws.glue.GlueCatalog

@rdblue
Copy link
Contributor

rdblue commented Oct 23, 2020

My main concern with this is how configuration is currently passed using Hadoop Configuration. We want to avoid using Configuration and only pass it where it is needed for our use of Hadoop APIs, like FileSystem. Passing Configuration as the last argument is fine because many catalogs will currently need it to use HadoopFileIO, but we'll need an alternative as well.

I think we also want to pass options from the catalog config in Flink and Spark, where users can pass properties like uri and warehouse. Could you add a Map to this to pass the catalog config options?

@rdblue rdblue mentioned this pull request Oct 23, 2020
@rymurr
Copy link
Contributor

rymurr commented Oct 27, 2020

I think we also want to pass options from the catalog config in Flink and Spark, where users can pass properties like uri and warehouse. Could you add a Map to this to pass the catalog config options?

I like the Map over Configuration suggestion as well. In #1587 I made the constructor take String name, Map props, Configuration conf as it still needs a HadoopFileIO

Has anyone thought of how to do this for the IcebergSource? Currently df.write().format("iceberg") is as far as I understand going to use Hive/HDFS regardless of these settings.

@rdblue
Copy link
Contributor

rdblue commented Oct 27, 2020

@rymurr, for your IcebergSource question: the source can implement a trait to return the catalog and identifier to load instead of returning a table itself. There are two reasons we don't do this already:

  1. The catalog must be defined in Spark properties, so we either need to have a "default" catalog or have a way to configure a catalog for the source
  2. If we use a catalog, then we still need to support path-based tables. We will need to add a way to pass a path as an identifier to the catalog and have it load using HadoopTables

Figuring out how we want to do this shouldn't be too difficult. We just found it easier to keep the existing behavior for the last release since there weren't other catalogs at the time.

@jackye1995 jackye1995 force-pushed the custom-catalog branch 13 times, most recently from d3201f2 to 17f1c5c Compare October 27, 2020 23:39
@jackye1995 jackye1995 closed this Oct 27, 2020
@jackye1995
Copy link
Contributor Author

fix rebase issue, reopen PR

@jackye1995 jackye1995 reopened this Oct 27, 2020
@@ -58,13 +59,19 @@

// Can not just use "type", it conflicts with CATALOG_TYPE.
public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
public static final String ICEBERG_CATALOG_TYPE_CUSTOM = "custom";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using type=custom and impl=com.example.Catalog, why not just combine them into type=com.example.Catalog. We can try to load the type as an implementation class if it isn't a well-known name like "hive".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi and @RussellSpitzer, do you have an opinion here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this sounds like a cleaner way to go, the only disadvantage is that we are overloading the term type. I have updated the code.

@rdblue
Copy link
Contributor

rdblue commented Nov 3, 2020

@jackye1995, I'm happy with the implementation and behavior (other than changing IcebergSource later). The main things to fix are names and how we configure it. Thanks!

@jackye1995
Copy link
Contributor Author

I meant to make a comment before on this. We used Function<String, String> in Nessie for this and it feels like a narrower interface for this purpose. See here: https://github.com/projectnessie/nessie/blob/main/clients/client/src/main/java/com/dremio/nessie/client/NessieClient.java#L161

Interesting, but it should still be compatible by using Map<String, String>::get

@jacques-n
Copy link

I was suggesting using the narrower function instead map here as well.

@rdblue
Copy link
Contributor

rdblue commented Nov 3, 2020

What's the drawback of using a map? It is a bit narrower to use the function, but it is also limiting. Catalogs can't use getOrDefault or check contains. I also don't have a serious problem if we need to copy to create an Immutable map for this because it isn't in a performance-critical path.

@jacques-n
Copy link

Just a lot extra methods exposed. Iteration, put, etc.

@jackye1995
Copy link
Contributor Author

Just a lot extra methods exposed. Iteration, put, etc.

I think the advantage of using a Map is that:

  1. consistent with the TableProperties interface
  2. consistent with the Spark and Flink properties interface
  3. as Ryan says, there are some useful methods like getOrDefault and contains, and iteration might still be needed for some use cases such as getting all configs of a certain prefix.

@jackye1995
Copy link
Contributor Author

@rdblue @jacques-n any thoughts?

@jacques-n
Copy link

I'm fine with it as is.

Some background: I'm not especially supportive of maps in interfaces since they are so broad. I've had bad experiences in the past where we've had to move maps to external storage (say DynamoDB) and having a broad use of the Map interface hurt us in reworking things.

@rdblue
Copy link
Contributor

rdblue commented Nov 4, 2020

I think it is okay to use a map here. Like I said, even if we had to copy values to create a map to pass in here, that happens once per session and is not an unreasonable amount of overhead.

@rdblue rdblue merged commit b623b07 into apache:master Nov 4, 2020
rymurr pushed a commit to rymurr/iceberg that referenced this pull request Nov 5, 2020
rymurr pushed a commit to rymurr/iceberg that referenced this pull request Nov 9, 2020
rymurr pushed a commit to rymurr/iceberg that referenced this pull request Nov 10, 2020
rymurr pushed a commit to rymurr/iceberg that referenced this pull request Nov 18, 2020
rymurr pushed a commit to rymurr/iceberg that referenced this pull request Nov 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants