Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 5c2d53e

Browse files
committedOct 16, 2024·
Spark-3.5: Refactor BaseProcedure to support views
1 parent ca8a3a4 commit 5c2d53e

23 files changed

+153
-88
lines changed
 

‎spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewUtil.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.spark.sql.catalyst.analysis
2121

22+
import org.apache.iceberg.spark.SparkSessionCatalog
2223
import org.apache.spark.sql.connector.catalog.CatalogPlugin
2324
import org.apache.spark.sql.connector.catalog.Identifier
2425
import org.apache.spark.sql.connector.catalog.View
@@ -37,7 +38,8 @@ object ViewUtil {
3738
}
3839

3940
def isViewCatalog(catalog: CatalogPlugin): Boolean = {
40-
catalog.isInstanceOf[ViewCatalog]
41+
// Spark session catalog doesn't support the views yet.
42+
catalog.isInstanceOf[ViewCatalog] && !catalog.isInstanceOf[SparkSessionCatalog[_]]
4143
}
4244

4345
implicit class IcebergViewHelper(plugin: CatalogPlugin) {

‎spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@
3030
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
3131
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
3232

33-
abstract class BaseCatalog
33+
public abstract class BaseCatalog
3434
implements StagingTableCatalog,
3535
ProcedureCatalog,
3636
SupportsNamespaces,
3737
HasIcebergCatalog,
38-
SupportsFunctions {
38+
SupportsFunctions,
39+
org.apache.spark.sql.connector.catalog.ViewCatalog,
40+
SupportsReplaceView {
3941
private static final String USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS = "use-nullable-query-schema";
4042
private static final boolean USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT = true;
4143

@@ -51,7 +53,7 @@ public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException
5153
if (isSystemNamespace(namespace)) {
5254
ProcedureBuilder builder = SparkProcedures.newBuilder(name);
5355
if (builder != null) {
54-
return builder.withTableCatalog(this).build();
56+
return builder.withCatalog(this).build();
5557
}
5658
}
5759

‎spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,7 @@
120120
*
121121
* <p>
122122
*/
123-
public class SparkCatalog extends BaseCatalog
124-
implements org.apache.spark.sql.connector.catalog.ViewCatalog, SupportsReplaceView {
123+
public class SparkCatalog extends BaseCatalog {
125124
private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER);
126125
private static final Splitter COMMA = Splitter.on(",");
127126
private static final Joiner COMMA_JOINER = Joiner.on(",");

‎spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java

+61
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@
3131
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
3232
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
3333
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
34+
import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
3435
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
3536
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
37+
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
3638
import org.apache.spark.sql.connector.catalog.CatalogExtension;
3739
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
3840
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
@@ -44,6 +46,8 @@
4446
import org.apache.spark.sql.connector.catalog.Table;
4547
import org.apache.spark.sql.connector.catalog.TableCatalog;
4648
import org.apache.spark.sql.connector.catalog.TableChange;
49+
import org.apache.spark.sql.connector.catalog.View;
50+
import org.apache.spark.sql.connector.catalog.ViewChange;
4751
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
4852
import org.apache.spark.sql.connector.expressions.Transform;
4953
import org.apache.spark.sql.types.StructType;
@@ -397,4 +401,61 @@ public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionExcep
397401
return getSessionCatalog().loadFunction(ident);
398402
}
399403
}
404+
405+
@Override
406+
public View replaceView(
407+
Identifier ident,
408+
String sql,
409+
String currentCatalog,
410+
String[] currentNamespace,
411+
StructType schema,
412+
String[] queryColumnNames,
413+
String[] columnAliases,
414+
String[] columnComments,
415+
Map<String, String> properties)
416+
throws NoSuchViewException, NoSuchNamespaceException {
417+
throw new UnsupportedOperationException("SessionCatalog doesn't support views");
418+
}
419+
420+
@Override
421+
public Identifier[] listViews(String... namespace) throws NoSuchNamespaceException {
422+
throw new UnsupportedOperationException("SessionCatalog doesn't support views");
423+
}
424+
425+
@Override
426+
public View loadView(Identifier ident) throws NoSuchViewException {
427+
throw new UnsupportedOperationException("SessionCatalog doesn't support views");
428+
}
429+
430+
@Override
431+
public View createView(
432+
Identifier ident,
433+
String sql,
434+
String currentCatalog,
435+
String[] currentNamespace,
436+
StructType schema,
437+
String[] queryColumnNames,
438+
String[] columnAliases,
439+
String[] columnComments,
440+
Map<String, String> properties)
441+
throws ViewAlreadyExistsException, NoSuchNamespaceException {
442+
throw new UnsupportedOperationException("SessionCatalog doesn't support views");
443+
}
444+
445+
@Override
446+
public View alterView(Identifier ident, ViewChange... changes)
447+
throws NoSuchViewException, IllegalArgumentException {
448+
throw new UnsupportedOperationException("SessionCatalog doesn't support views");
449+
}
450+
451+
@Override
452+
public boolean dropView(Identifier ident) {
453+
throw new UnsupportedOperationException("SessionCatalog doesn't support views");
454+
}
455+
456+
@Override
457+
public void renameView(Identifier oldIdent, Identifier newIdent)
458+
throws NoSuchViewException, ViewAlreadyExistsException {
459+
throw new UnsupportedOperationException("SessionCatalog doesn't support views");
460+
}
400461
}

‎spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3737
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
3838
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
39+
import org.apache.iceberg.spark.BaseCatalog;
3940
import org.apache.iceberg.spark.Spark3Util;
4041
import org.apache.iceberg.spark.SparkTableUtil;
4142
import org.apache.iceberg.spark.SparkTableUtil.SparkPartition;
@@ -45,7 +46,6 @@
4546
import org.apache.spark.sql.catalyst.TableIdentifier;
4647
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
4748
import org.apache.spark.sql.connector.catalog.Identifier;
48-
import org.apache.spark.sql.connector.catalog.TableCatalog;
4949
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
5050
import org.apache.spark.sql.types.DataTypes;
5151
import org.apache.spark.sql.types.Metadata;
@@ -82,15 +82,15 @@ class AddFilesProcedure extends BaseProcedure {
8282
new StructField("changed_partition_count", DataTypes.LongType, true, Metadata.empty()),
8383
});
8484

85-
private AddFilesProcedure(TableCatalog tableCatalog) {
86-
super(tableCatalog);
85+
private AddFilesProcedure(BaseCatalog catalog) {
86+
super(catalog);
8787
}
8888

8989
public static SparkProcedures.ProcedureBuilder builder() {
9090
return new BaseProcedure.Builder<AddFilesProcedure>() {
9191
@Override
9292
protected AddFilesProcedure doBuild() {
93-
return new AddFilesProcedure(tableCatalog());
93+
return new AddFilesProcedure(catalog());
9494
}
9595
};
9696
}

‎spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import java.util.List;
2222
import org.apache.iceberg.Table;
2323
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
24+
import org.apache.iceberg.spark.BaseCatalog;
2425
import org.apache.iceberg.spark.source.SparkTable;
2526
import org.apache.iceberg.util.SnapshotUtil;
2627
import org.apache.spark.sql.catalyst.InternalRow;
2728
import org.apache.spark.sql.connector.catalog.Identifier;
28-
import org.apache.spark.sql.connector.catalog.TableCatalog;
2929
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
3030
import org.apache.spark.sql.types.DataTypes;
3131
import org.apache.spark.sql.types.Metadata;
@@ -49,15 +49,15 @@ public class AncestorsOfProcedure extends BaseProcedure {
4949
new StructField("timestamp", DataTypes.LongType, true, Metadata.empty())
5050
});
5151

52-
private AncestorsOfProcedure(TableCatalog tableCatalog) {
53-
super(tableCatalog);
52+
private AncestorsOfProcedure(BaseCatalog catalog) {
53+
super(catalog);
5454
}
5555

5656
public static SparkProcedures.ProcedureBuilder builder() {
5757
return new Builder<AncestorsOfProcedure>() {
5858
@Override
5959
protected AncestorsOfProcedure doBuild() {
60-
return new AncestorsOfProcedure(tableCatalog());
60+
return new AncestorsOfProcedure(catalog());
6161
}
6262
};
6363
}

‎spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java

+19-18
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2929
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
3030
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
31+
import org.apache.iceberg.spark.BaseCatalog;
3132
import org.apache.iceberg.spark.Spark3Util;
3233
import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
3334
import org.apache.iceberg.spark.actions.SparkActions;
@@ -58,14 +59,14 @@ abstract class BaseProcedure implements Procedure {
5859
protected static final DataType STRING_ARRAY = DataTypes.createArrayType(DataTypes.StringType);
5960

6061
private final SparkSession spark;
61-
private final TableCatalog tableCatalog;
62+
private final BaseCatalog catalog;
6263

6364
private SparkActions actions;
6465
private ExecutorService executorService = null;
6566

66-
protected BaseProcedure(TableCatalog tableCatalog) {
67+
protected BaseProcedure(BaseCatalog catalog) {
6768
this.spark = SparkSession.active();
68-
this.tableCatalog = tableCatalog;
69+
this.catalog = catalog;
6970
}
7071

7172
protected SparkSession spark() {
@@ -80,7 +81,7 @@ protected SparkActions actions() {
8081
}
8182

8283
protected TableCatalog tableCatalog() {
83-
return this.tableCatalog;
84+
return this.catalog;
8485
}
8586

8687
protected <T> T modifyIcebergTable(Identifier ident, Function<org.apache.iceberg.Table, T> func) {
@@ -115,38 +116,38 @@ private <T> T execute(
115116

116117
protected Identifier toIdentifier(String identifierAsString, String argName) {
117118
CatalogAndIdentifier catalogAndIdentifier =
118-
toCatalogAndIdentifier(identifierAsString, argName, tableCatalog);
119+
toCatalogAndIdentifier(identifierAsString, argName, catalog);
119120

120121
Preconditions.checkArgument(
121-
catalogAndIdentifier.catalog().equals(tableCatalog),
122+
catalogAndIdentifier.catalog().equals(catalog),
122123
"Cannot run procedure in catalog '%s': '%s' is a table in catalog '%s'",
123-
tableCatalog.name(),
124+
catalog.name(),
124125
identifierAsString,
125126
catalogAndIdentifier.catalog().name());
126127

127128
return catalogAndIdentifier.identifier();
128129
}
129130

130131
protected CatalogAndIdentifier toCatalogAndIdentifier(
131-
String identifierAsString, String argName, CatalogPlugin catalog) {
132+
String identifierAsString, String argName, CatalogPlugin catalogPlugin) {
132133
Preconditions.checkArgument(
133134
identifierAsString != null && !identifierAsString.isEmpty(),
134135
"Cannot handle an empty identifier for argument %s",
135136
argName);
136137

137138
return Spark3Util.catalogAndIdentifier(
138-
"identifier for arg " + argName, spark, identifierAsString, catalog);
139+
"identifier for arg " + argName, spark, identifierAsString, catalogPlugin);
139140
}
140141

141142
protected SparkTable loadSparkTable(Identifier ident) {
142143
try {
143-
Table table = tableCatalog.loadTable(ident);
144+
Table table = catalog.loadTable(ident);
144145
ValidationException.check(
145146
table instanceof SparkTable, "%s is not %s", ident, SparkTable.class.getName());
146147
return (SparkTable) table;
147148
} catch (NoSuchTableException e) {
148149
String errMsg =
149-
String.format("Couldn't load table '%s' in catalog '%s'", ident, tableCatalog.name());
150+
String.format("Couldn't load table '%s' in catalog '%s'", ident, catalog.name());
150151
throw new RuntimeException(errMsg, e);
151152
}
152153
}
@@ -159,13 +160,13 @@ protected Dataset<Row> loadRows(Identifier tableIdent, Map<String, String> optio
159160
protected void refreshSparkCache(Identifier ident, Table table) {
160161
CacheManager cacheManager = spark.sharedState().cacheManager();
161162
DataSourceV2Relation relation =
162-
DataSourceV2Relation.create(table, Option.apply(tableCatalog), Option.apply(ident));
163+
DataSourceV2Relation.create(table, Option.apply(catalog), Option.apply(ident));
163164
cacheManager.recacheByPlan(spark, relation);
164165
}
165166

166167
protected Expression filterExpression(Identifier ident, String where) {
167168
try {
168-
String name = Spark3Util.quotedFullIdentifier(tableCatalog.name(), ident);
169+
String name = Spark3Util.quotedFullIdentifier(catalog.name(), ident);
169170
org.apache.spark.sql.catalyst.expressions.Expression expression =
170171
SparkExpressionConverter.collectResolvedSparkExpression(spark, name, where);
171172
return SparkExpressionConverter.convertToIcebergExpression(expression);
@@ -179,11 +180,11 @@ protected InternalRow newInternalRow(Object... values) {
179180
}
180181

181182
protected abstract static class Builder<T extends BaseProcedure> implements ProcedureBuilder {
182-
private TableCatalog tableCatalog;
183+
private BaseCatalog catalog;
183184

184185
@Override
185-
public Builder<T> withTableCatalog(TableCatalog newTableCatalog) {
186-
this.tableCatalog = newTableCatalog;
186+
public Builder<T> withCatalog(BaseCatalog newCatalog) {
187+
this.catalog = newCatalog;
187188
return this;
188189
}
189190

@@ -194,8 +195,8 @@ public T build() {
194195

195196
protected abstract T doBuild();
196197

197-
TableCatalog tableCatalog() {
198-
return tableCatalog;
198+
BaseCatalog catalog() {
199+
return catalog;
199200
}
200201
}
201202

‎spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
package org.apache.iceberg.spark.procedures;
2020

2121
import org.apache.iceberg.Snapshot;
22+
import org.apache.iceberg.spark.BaseCatalog;
2223
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
2324
import org.apache.spark.sql.catalyst.InternalRow;
2425
import org.apache.spark.sql.connector.catalog.Identifier;
25-
import org.apache.spark.sql.connector.catalog.TableCatalog;
2626
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
2727
import org.apache.spark.sql.types.DataTypes;
2828
import org.apache.spark.sql.types.Metadata;
@@ -57,12 +57,12 @@ public static ProcedureBuilder builder() {
5757
return new BaseProcedure.Builder<CherrypickSnapshotProcedure>() {
5858
@Override
5959
protected CherrypickSnapshotProcedure doBuild() {
60-
return new CherrypickSnapshotProcedure(tableCatalog());
60+
return new CherrypickSnapshotProcedure(catalog());
6161
}
6262
};
6363
}
6464

65-
private CherrypickSnapshotProcedure(TableCatalog catalog) {
65+
private CherrypickSnapshotProcedure(BaseCatalog catalog) {
6666
super(catalog);
6767
}
6868

‎spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
3131
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3232
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
33+
import org.apache.iceberg.spark.BaseCatalog;
3334
import org.apache.iceberg.spark.ChangelogIterator;
3435
import org.apache.iceberg.spark.source.SparkChangelogTable;
3536
import org.apache.spark.api.java.function.MapPartitionsFunction;
@@ -40,7 +41,6 @@
4041
import org.apache.spark.sql.catalyst.InternalRow;
4142
import org.apache.spark.sql.catalyst.expressions.OrderUtils;
4243
import org.apache.spark.sql.connector.catalog.Identifier;
43-
import org.apache.spark.sql.connector.catalog.TableCatalog;
4444
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
4545
import org.apache.spark.sql.types.DataTypes;
4646
import org.apache.spark.sql.types.Metadata;
@@ -118,13 +118,13 @@ public static SparkProcedures.ProcedureBuilder builder() {
118118
return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
119119
@Override
120120
protected CreateChangelogViewProcedure doBuild() {
121-
return new CreateChangelogViewProcedure(tableCatalog());
121+
return new CreateChangelogViewProcedure(catalog());
122122
}
123123
};
124124
}
125125

126-
private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
127-
super(tableCatalog);
126+
private CreateChangelogViewProcedure(BaseCatalog catalog) {
127+
super(catalog);
128128
}
129129

130130
@Override
There was a problem loading the remainder of the diff.

0 commit comments

Comments
 (0)
Please sign in to comment.