Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 0e1a3db

Browse files
committedOct 16, 2024·
Spark-3.5: Procedure to add view dialect
1 parent 5c2d53e commit 0e1a3db

File tree

4 files changed

+310
-1
lines changed

4 files changed

+310
-1
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark.extensions;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
23+
24+
import java.util.List;
25+
import org.apache.iceberg.ParameterizedTestExtension;
26+
import org.apache.iceberg.Parameters;
27+
import org.apache.iceberg.catalog.Catalog;
28+
import org.apache.iceberg.catalog.Namespace;
29+
import org.apache.iceberg.catalog.TableIdentifier;
30+
import org.apache.iceberg.catalog.ViewCatalog;
31+
import org.apache.iceberg.spark.Spark3Util;
32+
import org.apache.iceberg.spark.SparkCatalogConfig;
33+
import org.apache.iceberg.view.ImmutableSQLViewRepresentation;
34+
import org.apache.iceberg.view.View;
35+
import org.apache.iceberg.view.ViewRepresentation;
36+
import org.apache.iceberg.view.ViewVersion;
37+
import org.junit.jupiter.api.AfterEach;
38+
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.TestTemplate;
40+
import org.junit.jupiter.api.extension.ExtendWith;
41+
42+
@ExtendWith(ParameterizedTestExtension.class)
43+
public class TestAddViewDialectProcedure extends ExtensionsTestBase {
44+
private static final Namespace NAMESPACE = Namespace.of("default");
45+
private static final String VIEW_NAME = "view_year";
46+
private static final TableIdentifier VIEW_IDENTIFIER = TableIdentifier.of(NAMESPACE, VIEW_NAME);
47+
48+
@BeforeEach
49+
public void before() {
50+
super.before();
51+
spark.conf().set("spark.sql.defaultCatalog", catalogName);
52+
sql("USE %s", catalogName);
53+
sql("CREATE NAMESPACE IF NOT EXISTS %s", NAMESPACE);
54+
}
55+
56+
@AfterEach
57+
public void removeTables() {
58+
sql("DROP TABLE IF EXISTS %s", tableName);
59+
sql("DROP VIEW IF EXISTS %s", VIEW_NAME);
60+
}
61+
62+
@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
63+
public static Object[][] parameters() {
64+
return new Object[][] {
65+
{
66+
SparkCatalogConfig.SPARK_WITH_VIEWS.catalogName(),
67+
SparkCatalogConfig.SPARK_WITH_VIEWS.implementation(),
68+
SparkCatalogConfig.SPARK_WITH_VIEWS.properties()
69+
}
70+
};
71+
}
72+
73+
private ViewCatalog viewCatalog() {
74+
Catalog icebergCatalog = Spark3Util.loadIcebergCatalog(spark, catalogName);
75+
assertThat(icebergCatalog).isInstanceOf(ViewCatalog.class);
76+
return (ViewCatalog) icebergCatalog;
77+
}
78+
79+
@TestTemplate
80+
public void testAddNewViewDialect() {
81+
String sparkViewSQL =
82+
String.format("SELECT * FROM %s WHERE year(order_date) = year(current_date())", tableName);
83+
ViewRepresentation sparkRep =
84+
ImmutableSQLViewRepresentation.builder().dialect("spark").sql(sparkViewSQL).build();
85+
86+
String trinoViewSQL =
87+
String.format(
88+
"SELECT * FROM %s WHERE extract(year FROM order_date) = extract(year FROM current_date))",
89+
tableName);
90+
ViewRepresentation trinoRep =
91+
ImmutableSQLViewRepresentation.builder().dialect("trino").sql(trinoViewSQL).build();
92+
93+
sql("CREATE TABLE %s (order_id int NOT NULL, order_date date) USING iceberg", tableName);
94+
sql("INSERT INTO TABLE %s VALUES (1, DATE '2024-10-08')", tableName);
95+
sql("INSERT INTO TABLE %s VALUES (2, DATE '2025-10-08')", tableName);
96+
97+
sql("CREATE VIEW %s AS %s", VIEW_NAME, sparkViewSQL);
98+
99+
ViewCatalog viewCatalog = viewCatalog();
100+
View view = viewCatalog.loadView(VIEW_IDENTIFIER);
101+
int oldVersion = view.currentVersion().versionId();
102+
assertThat(view.currentVersion().representations()).containsExactly(sparkRep);
103+
104+
List<Object[]> output =
105+
sql(
106+
"CALL %s.system.add_view_dialect('%s', '%s', '%s')",
107+
catalogName, VIEW_IDENTIFIER, "trino", trinoViewSQL);
108+
109+
ViewVersion newVersion = viewCatalog.loadView(VIEW_IDENTIFIER).currentVersion();
110+
assertThat(output).containsExactly(new Object[] {newVersion.versionId()});
111+
assertThat(newVersion.versionId()).isNotEqualTo(oldVersion);
112+
assertThat(newVersion.representations()).containsExactlyInAnyOrder(sparkRep, trinoRep);
113+
114+
// test adding multiple SQL for same dialect
115+
assertThatThrownBy(
116+
() ->
117+
sql(
118+
"CALL %s.system.add_view_dialect('%s', '%s', '%s')",
119+
catalogName, VIEW_IDENTIFIER, "spark", "another sql"))
120+
.hasMessageContaining("Invalid view version: Cannot add multiple queries for dialect spark")
121+
.isInstanceOf(IllegalArgumentException.class);
122+
}
123+
124+
@TestTemplate
125+
public void testInvalidArgs() {
126+
String sparkViewSQL =
127+
String.format("SELECT * FROM %s WHERE year(order_date) = year(current_date())", tableName);
128+
129+
sql("CREATE TABLE %s (order_id int NOT NULL, order_date date) USING iceberg", tableName);
130+
sql("INSERT INTO TABLE %s VALUES (1, DATE '2024-10-08')", tableName);
131+
132+
sql("CREATE VIEW %s AS %s", VIEW_NAME, sparkViewSQL);
133+
134+
assertThatThrownBy(
135+
() ->
136+
sql(
137+
"CALL %s.system.add_view_dialect('%s', '%s', '%s')",
138+
catalogName, VIEW_IDENTIFIER + "_invalid", "trino", "foo"))
139+
.hasMessageContaining(
140+
"Couldn't load view 'default.view_year_invalid' in catalog 'spark_with_views'")
141+
.isInstanceOf(RuntimeException.class);
142+
143+
assertThatThrownBy(
144+
() ->
145+
sql("CALL %s.system.add_view_dialect('', '%s', '%s')", catalogName, "trino", "foo"))
146+
.hasMessageContaining("Cannot handle an empty identifier for argument view")
147+
.isInstanceOf(IllegalArgumentException.class);
148+
149+
assertThatThrownBy(
150+
() ->
151+
sql(
152+
"CALL %s.system.add_view_dialect('%s', '', '%s')",
153+
catalogName, VIEW_IDENTIFIER, "foo"))
154+
.hasMessageContaining("dialect should not be empty")
155+
.isInstanceOf(RuntimeException.class);
156+
157+
assertThatThrownBy(
158+
() ->
159+
sql(
160+
"CALL %s.system.add_view_dialect('%s', '%s', '')",
161+
catalogName, VIEW_IDENTIFIER, "foo"))
162+
.hasMessageContaining("sql should not be empty")
163+
.isInstanceOf(RuntimeException.class);
164+
}
165+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark.procedures;
20+
21+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
22+
import org.apache.iceberg.spark.BaseCatalog;
23+
import org.apache.iceberg.view.ReplaceViewVersion;
24+
import org.apache.iceberg.view.SQLViewRepresentation;
25+
import org.apache.spark.sql.catalyst.InternalRow;
26+
import org.apache.spark.sql.connector.catalog.Identifier;
27+
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
28+
import org.apache.spark.sql.types.DataTypes;
29+
import org.apache.spark.sql.types.Metadata;
30+
import org.apache.spark.sql.types.StructField;
31+
import org.apache.spark.sql.types.StructType;
32+
33+
public class AddViewDialectProcedure extends BaseProcedure {
34+
35+
private static final ProcedureParameter[] PARAMETERS =
36+
new ProcedureParameter[] {
37+
ProcedureParameter.required("view", DataTypes.StringType),
38+
ProcedureParameter.required("dialect", DataTypes.StringType),
39+
ProcedureParameter.required("sql", DataTypes.StringType)
40+
};
41+
42+
private static final StructType OUTPUT_TYPE =
43+
new StructType(
44+
new StructField[] {
45+
new StructField("updated_version_id", DataTypes.IntegerType, true, Metadata.empty()),
46+
});
47+
48+
public static SparkProcedures.ProcedureBuilder builder() {
49+
return new Builder<AddViewDialectProcedure>() {
50+
@Override
51+
protected AddViewDialectProcedure doBuild() {
52+
return new AddViewDialectProcedure(catalog());
53+
}
54+
};
55+
}
56+
57+
private AddViewDialectProcedure(BaseCatalog catalog) {
58+
super(catalog);
59+
}
60+
61+
@Override
62+
public ProcedureParameter[] parameters() {
63+
return PARAMETERS;
64+
}
65+
66+
@Override
67+
public StructType outputType() {
68+
return OUTPUT_TYPE;
69+
}
70+
71+
@Override
72+
public InternalRow[] call(InternalRow args) {
73+
Identifier viewIdentifier = toIdentifier(args.getString(0), PARAMETERS[0].name());
74+
String dialect = args.getString(1);
75+
String sql = args.getString(2);
76+
77+
Preconditions.checkArgument(!dialect.isEmpty(), "dialect should not be empty");
78+
Preconditions.checkArgument(!sql.isEmpty(), "sql should not be empty");
79+
80+
return withIcebergView(
81+
viewIdentifier,
82+
view -> {
83+
ReplaceViewVersion replaceViewVersion = view.replaceVersion();
84+
replaceViewVersion
85+
.withSchema(view.schema())
86+
.withDefaultNamespace(view.currentVersion().defaultNamespace());
87+
88+
// retain old representations
89+
view.currentVersion()
90+
.representations()
91+
.forEach(
92+
representation -> {
93+
SQLViewRepresentation viewRepresentation =
94+
(SQLViewRepresentation) representation;
95+
replaceViewVersion.withQuery(
96+
viewRepresentation.dialect(), viewRepresentation.sql());
97+
});
98+
99+
// add new representation
100+
replaceViewVersion.withQuery(dialect, sql).commit();
101+
102+
InternalRow outputRow = newInternalRow(view.currentVersion().versionId());
103+
return new InternalRow[] {outputRow};
104+
});
105+
}
106+
107+
@Override
108+
public String description() {
109+
return "AddViewDialectProcedure";
110+
}
111+
}

‎spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java

+33-1
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,20 @@
3434
import org.apache.iceberg.spark.actions.SparkActions;
3535
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
3636
import org.apache.iceberg.spark.source.SparkTable;
37+
import org.apache.iceberg.spark.source.SparkView;
3738
import org.apache.spark.sql.AnalysisException;
3839
import org.apache.spark.sql.Dataset;
3940
import org.apache.spark.sql.Row;
4041
import org.apache.spark.sql.SparkSession;
4142
import org.apache.spark.sql.catalyst.InternalRow;
4243
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
44+
import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
4345
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
4446
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
4547
import org.apache.spark.sql.connector.catalog.Identifier;
4648
import org.apache.spark.sql.connector.catalog.Table;
4749
import org.apache.spark.sql.connector.catalog.TableCatalog;
50+
import org.apache.spark.sql.connector.catalog.View;
4851
import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
4952
import org.apache.spark.sql.execution.CacheManager;
5053
import org.apache.spark.sql.execution.datasources.SparkExpressionConverter;
@@ -100,6 +103,15 @@ protected <T> T withIcebergTable(Identifier ident, Function<org.apache.iceberg.T
100103
}
101104
}
102105

106+
protected <T> T withIcebergView(
107+
Identifier ident, Function<org.apache.iceberg.view.View, T> func) {
108+
try {
109+
return executeView(ident, func);
110+
} finally {
111+
closeService();
112+
}
113+
}
114+
103115
private <T> T execute(
104116
Identifier ident, boolean refreshSparkCache, Function<org.apache.iceberg.Table, T> func) {
105117
SparkTable sparkTable = loadSparkTable(ident);
@@ -114,13 +126,20 @@ private <T> T execute(
114126
return result;
115127
}
116128

129+
private <T> T executeView(Identifier ident, Function<org.apache.iceberg.view.View, T> func) {
130+
SparkView sparkView = loadSparkView(ident);
131+
org.apache.iceberg.view.View icebergView = sparkView.view();
132+
133+
return func.apply(icebergView);
134+
}
135+
117136
protected Identifier toIdentifier(String identifierAsString, String argName) {
118137
CatalogAndIdentifier catalogAndIdentifier =
119138
toCatalogAndIdentifier(identifierAsString, argName, catalog);
120139

121140
Preconditions.checkArgument(
122141
catalogAndIdentifier.catalog().equals(catalog),
123-
"Cannot run procedure in catalog '%s': '%s' is a table in catalog '%s'",
142+
"Cannot run procedure in catalog '%s': '%s' is a table or view in catalog '%s'",
124143
catalog.name(),
125144
identifierAsString,
126145
catalogAndIdentifier.catalog().name());
@@ -152,6 +171,19 @@ protected SparkTable loadSparkTable(Identifier ident) {
152171
}
153172
}
154173

174+
protected SparkView loadSparkView(Identifier ident) {
175+
try {
176+
View view = catalog.loadView(ident);
177+
ValidationException.check(
178+
view instanceof SparkView, "%s is not %s", ident, SparkView.class.getName());
179+
return (SparkView) view;
180+
} catch (NoSuchViewException e) {
181+
String errMsg =
182+
String.format("Couldn't load view '%s' in catalog '%s'", ident, catalog.name());
183+
throw new RuntimeException(errMsg, e);
184+
}
185+
}
186+
155187
protected Dataset<Row> loadRows(Identifier tableIdent, Map<String, String> options) {
156188
String tableName = Spark3Util.quotedFullIdentifier(tableCatalog().name(), tableIdent);
157189
return spark().read().options(options).table(tableName);

‎spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
5656
mapBuilder.put("create_changelog_view", CreateChangelogViewProcedure::builder);
5757
mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder);
5858
mapBuilder.put("fast_forward", FastForwardBranchProcedure::builder);
59+
mapBuilder.put("add_view_dialect", AddViewDialectProcedure::builder);
5960
return mapBuilder.build();
6061
}
6162

0 commit comments

Comments
 (0)
Please sign in to comment.