Skip to content

Commit a2dafe1

Browse files
committed
Spark: Add tests for create, alter, insert, and select.
1 parent 0bad670 commit a2dafe1

File tree

11 files changed

+945
-15
lines changed

11 files changed

+945
-15
lines changed

hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,12 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
153153
}
154154

155155
@Override
156-
public void renameTable(TableIdentifier from, TableIdentifier to) {
156+
public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
157157
if (!isValidIdentifier(from)) {
158158
throw new NoSuchTableException("Invalid identifier: %s", from);
159159
}
160+
161+
TableIdentifier to = removeCatalogName(originalTo);
160162
Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to);
161163

162164
String toDatabase = to.namespace().level(0);
@@ -347,6 +349,20 @@ protected boolean isValidIdentifier(TableIdentifier tableIdentifier) {
347349
return tableIdentifier.namespace().levels().length == 1;
348350
}
349351

352+
private TableIdentifier removeCatalogName(TableIdentifier to) {
353+
if (isValidIdentifier(to)) {
354+
return to;
355+
}
356+
357+
// check if the identifier includes the catalog name and remove it
358+
if (to.namespace().levels().length == 2 && name().equals(to.namespace().level(0))) {
359+
return TableIdentifier.of(Namespace.of(to.namespace().level(1)), to.name());
360+
}
361+
362+
// return the original unmodified
363+
return to;
364+
}
365+
350366
private boolean isValidateNamespace(Namespace namespace) {
351367
return namespace.levels().length == 1;
352368
}

spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iceberg.spark;
2121

22+
import com.google.common.collect.Iterables;
2223
import java.util.List;
2324
import java.util.stream.Collectors;
2425
import java.util.stream.IntStream;
@@ -28,13 +29,17 @@
2829
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
2930
import org.apache.spark.sql.Row;
3031
import org.apache.spark.sql.SparkSession;
32+
import org.apache.spark.sql.internal.SQLConf;
3133
import org.junit.AfterClass;
34+
import org.junit.Assert;
3235
import org.junit.BeforeClass;
3336

3437
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
3538

3639
public class SparkTestBase {
3740

41+
protected static final Object ANY = new Object();
42+
3843
private static TestHiveMetastore metastore = null;
3944
private static HiveConf hiveConf = null;
4045
protected static SparkSession spark = null;
@@ -48,6 +53,7 @@ public static void startMetastoreAndSpark() {
4853

4954
SparkTestBase.spark = SparkSession.builder()
5055
.master("local[2]")
56+
.config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
5157
.config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
5258
.enableHiveSupport()
5359
.getOrCreate();
@@ -65,19 +71,45 @@ public static void stopMetastoreAndSpark() {
6571
SparkTestBase.spark = null;
6672
}
6773

68-
protected List<String[]> sql(String query, Object... args) {
74+
protected List<Object[]> sql(String query, Object... args) {
6975
List<Row> rows = spark.sql(String.format(query, args)).collectAsList();
7076
if (rows.size() < 1) {
7177
return ImmutableList.of();
7278
}
7379

7480
return rows.stream()
7581
.map(row -> IntStream.range(0, row.size())
76-
.mapToObj(pos -> row.isNullAt(pos) ? null : row.get(pos).toString())
77-
.toArray(String[]::new)
82+
.mapToObj(pos -> row.isNullAt(pos) ? null : row.get(pos))
83+
.toArray(Object[]::new)
7884
).collect(Collectors.toList());
7985
}
8086

87+
protected Object scalarSql(String query, Object... args) {
88+
List<Object[]> rows = sql(query, args);
89+
Assert.assertEquals("Scalar SQL should return one row", 1, rows.size());
90+
Object[] row = Iterables.getOnlyElement(rows);
91+
Assert.assertEquals("Scalar SQL should return one value", 1, row.length);
92+
return row[0];
93+
}
94+
95+
protected Object[] row(Object... values) {
96+
return values;
97+
}
98+
99+
protected void assertEquals(String context, List<Object[]> expectedRows, List<Object[]> actualRows) {
100+
Assert.assertEquals(context + ": number of results should match", expectedRows.size(), actualRows.size());
101+
for (int row = 0; row < expectedRows.size(); row += 1) {
102+
Object[] expected = expectedRows.get(row);
103+
Object[] actual = actualRows.get(row);
104+
for (int col = 0; col < actualRows.get(row).length; col += 1) {
105+
if (expected[col] != ANY) {
106+
Assert.assertEquals(context + ": row " + row + " col " + col + " contents should match",
107+
expected[col], actual[col]);
108+
}
109+
}
110+
}
111+
}
112+
81113
protected static String dbPath(String dbName) {
82114
return metastore.getDatabasePath(dbName);
83115
}

spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class SimpleRecord {
2828
public SimpleRecord() {
2929
}
3030

31-
SimpleRecord(Integer id, String data) {
31+
public SimpleRecord(Integer id, String data) {
3232
this.id = id;
3333
this.data = data;
3434
}

spark3/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo
6666
this.dsSchema = info.schema();
6767
this.options = info.options();
6868
this.overwriteMode = options.containsKey("overwrite-mode") ?
69-
options.get("overwrite-mode").toLowerCase(Locale.ROOT) : null;
69+
options.get("overwrite-mode").toLowerCase(Locale.ROOT) :
70+
spark.sqlContext().conf().partitionOverwriteMode().toString().toLowerCase(Locale.ROOT);
7071
}
7172

7273
private JavaSparkContext lazySparkContext() {
@@ -87,7 +88,17 @@ public WriteBuilder overwriteDynamicPartitions() {
8788
public WriteBuilder overwrite(Filter[] filters) {
8889
this.overwriteExpr = SparkFilters.convert(filters);
8990
if (overwriteExpr == Expressions.alwaysTrue() && "dynamic".equals(overwriteMode)) {
90-
// use the write option to override truncating the table. use dynamic overwrite instead.
91+
// this is a work-around for a Spark bug, where Spark will use a static overwrite expression, alwaysTrue. this
92+
// happens Spark checks whether an INSERT plan should use dynamic overwrite or a static overwrite. Spark uses the
93+
// number of identity partitions instead of the total number of partitions and defaults to static when the number
94+
// of static values provided are equal. if the table has hidden partitions, then it looks like the overwrite
95+
// should be static when there are no static values provided. instead, Spark should rely on the overwrite mode
96+
// when the number of identity partitions and the number of static values is equal.
97+
//
98+
// here, we detect the bug by catching alwaysTrue, which indicates that there were no static partition values.
99+
// there is a slight chance that overwriting the entire table was intended. there are two paths that will result
100+
// in a truncate or overwrite(true): DataFrameWriter with mode overwrite, which was a dynamic overwrite in 2.4,
101+
// and the new DataFrameWriterV2 using overwrite(lit(true)).
91102
this.overwriteDynamic = true;
92103
} else {
93104
Preconditions.checkState(!overwriteDynamic, "Cannot overwrite dynamically and by filter: %s", overwriteExpr);

spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import java.io.IOException;
2424
import java.util.Map;
2525
import org.apache.iceberg.catalog.Catalog;
26+
import org.apache.iceberg.catalog.Namespace;
2627
import org.apache.iceberg.catalog.SupportsNamespaces;
28+
import org.apache.iceberg.catalog.TableIdentifier;
2729
import org.apache.iceberg.hadoop.HadoopCatalog;
2830
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
2931
import org.junit.AfterClass;
@@ -76,6 +78,8 @@ public static Object[][] parameters() {
7678
protected final String catalogName;
7779
protected final Catalog validationCatalog;
7880
protected final SupportsNamespaces validationNamespaceCatalog;
81+
protected final TableIdentifier tableIdent = TableIdentifier.of(Namespace.of("default"), "table");
82+
protected final String tableName;
7983

8084
public SparkCatalogTestBase(String catalogName, String implementation, Map<String, String> config) {
8185
this.catalogName = catalogName;
@@ -90,5 +94,7 @@ public SparkCatalogTestBase(String catalogName, String implementation, Map<Strin
9094
if (config.get("type").equalsIgnoreCase("hadoop")) {
9195
spark.conf().set("spark.sql.catalog." + catalogName + ".warehouse", "file:" + warehouse);
9296
}
97+
98+
this.tableName = (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default.table";
9399
}
94100
}
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.spark.sql;
21+
22+
import java.util.Map;
23+
import org.apache.iceberg.AssertHelpers;
24+
import org.apache.iceberg.catalog.Namespace;
25+
import org.apache.iceberg.catalog.TableIdentifier;
26+
import org.apache.iceberg.hadoop.HadoopCatalog;
27+
import org.apache.iceberg.spark.SparkCatalogTestBase;
28+
import org.apache.iceberg.types.Types;
29+
import org.apache.iceberg.types.Types.NestedField;
30+
import org.apache.spark.sql.AnalysisException;
31+
import org.junit.After;
32+
import org.junit.Assert;
33+
import org.junit.Assume;
34+
import org.junit.Before;
35+
import org.junit.Test;
36+
37+
public class TestAlterTable extends SparkCatalogTestBase {
38+
private final TableIdentifier renamedIdent = TableIdentifier.of(Namespace.of("default"), "table2");
39+
40+
public TestAlterTable(String catalogName, String implementation, Map<String, String> config) {
41+
super(catalogName, implementation, config);
42+
}
43+
44+
@Before
45+
public void createTable() {
46+
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
47+
}
48+
49+
@After
50+
public void removeTable() {
51+
sql("DROP TABLE IF EXISTS %s", tableName);
52+
sql("DROP TABLE IF EXISTS %s2", tableName);
53+
}
54+
55+
@Test
56+
public void testAddColumn() {
57+
sql("ALTER TABLE %s ADD COLUMN point struct<x: double NOT NULL, y: double NOT NULL> AFTER id", tableName);
58+
59+
Types.StructType expectedSchema = Types.StructType.of(
60+
NestedField.required(1, "id", Types.LongType.get()),
61+
NestedField.optional(3, "point", Types.StructType.of(
62+
NestedField.required(4, "x", Types.DoubleType.get()),
63+
NestedField.required(5, "y", Types.DoubleType.get())
64+
)),
65+
NestedField.optional(2, "data", Types.StringType.get()));
66+
67+
Assert.assertEquals("Schema should match expected",
68+
expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct());
69+
70+
sql("ALTER TABLE %s ADD COLUMN point.z double COMMENT 'May be null' FIRST", tableName);
71+
72+
Types.StructType expectedSchema2 = Types.StructType.of(
73+
NestedField.required(1, "id", Types.LongType.get()),
74+
NestedField.optional(3, "point", Types.StructType.of(
75+
NestedField.optional(6, "z", Types.DoubleType.get(), "May be null"),
76+
NestedField.required(4, "x", Types.DoubleType.get()),
77+
NestedField.required(5, "y", Types.DoubleType.get())
78+
)),
79+
NestedField.optional(2, "data", Types.StringType.get()));
80+
81+
Assert.assertEquals("Schema should match expected",
82+
expectedSchema2, validationCatalog.loadTable(tableIdent).schema().asStruct());
83+
}
84+
85+
@Test
86+
public void testDropColumn() {
87+
sql("ALTER TABLE %s DROP COLUMN data", tableName);
88+
89+
Types.StructType expectedSchema = Types.StructType.of(
90+
NestedField.required(1, "id", Types.LongType.get()));
91+
92+
Assert.assertEquals("Schema should match expected",
93+
expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct());
94+
}
95+
96+
@Test
97+
public void testRenameColumn() {
98+
sql("ALTER TABLE %s RENAME COLUMN id TO row_id", tableName);
99+
100+
Types.StructType expectedSchema = Types.StructType.of(
101+
NestedField.required(1, "row_id", Types.LongType.get()),
102+
NestedField.optional(2, "data", Types.StringType.get()));
103+
104+
Assert.assertEquals("Schema should match expected",
105+
expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct());
106+
}
107+
108+
@Test
109+
public void testAlterColumnComment() {
110+
sql("ALTER TABLE %s ALTER COLUMN id COMMENT 'Record id'", tableName);
111+
112+
Types.StructType expectedSchema = Types.StructType.of(
113+
NestedField.required(1, "id", Types.LongType.get(), "Record id"),
114+
NestedField.optional(2, "data", Types.StringType.get()));
115+
116+
Assert.assertEquals("Schema should match expected",
117+
expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct());
118+
}
119+
120+
@Test
121+
public void testAlterColumnType() {
122+
sql("ALTER TABLE %s ADD COLUMN count int", tableName);
123+
sql("ALTER TABLE %s ALTER COLUMN count TYPE bigint", tableName);
124+
125+
Types.StructType expectedSchema = Types.StructType.of(
126+
NestedField.required(1, "id", Types.LongType.get()),
127+
NestedField.optional(2, "data", Types.StringType.get()),
128+
NestedField.optional(3, "count", Types.LongType.get()));
129+
130+
Assert.assertEquals("Schema should match expected",
131+
expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct());
132+
}
133+
134+
@Test
135+
public void testAlterColumnDropNotNull() {
136+
sql("ALTER TABLE %s ALTER COLUMN id DROP NOT NULL", tableName);
137+
138+
Types.StructType expectedSchema = Types.StructType.of(
139+
NestedField.optional(1, "id", Types.LongType.get()),
140+
NestedField.optional(2, "data", Types.StringType.get()));
141+
142+
Assert.assertEquals("Schema should match expected",
143+
expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct());
144+
}
145+
146+
@Test
147+
public void testAlterColumnSetNotNull() {
148+
// no-op changes are allowed
149+
sql("ALTER TABLE %s ALTER COLUMN id SET NOT NULL", tableName);
150+
151+
Types.StructType expectedSchema = Types.StructType.of(
152+
NestedField.required(1, "id", Types.LongType.get()),
153+
NestedField.optional(2, "data", Types.StringType.get()));
154+
155+
Assert.assertEquals("Schema should match expected",
156+
expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct());
157+
158+
AssertHelpers.assertThrows("Should reject adding NOT NULL constraint to an optional column",
159+
AnalysisException.class, "Cannot change nullable column to non-nullable: data",
160+
() -> sql("ALTER TABLE %s ALTER COLUMN data SET NOT NULL", tableName));
161+
}
162+
163+
@Test
164+
public void testAlterColumnPositionAfter() {
165+
sql("ALTER TABLE %s ADD COLUMN count int", tableName);
166+
sql("ALTER TABLE %s ALTER COLUMN count AFTER id", tableName);
167+
168+
Types.StructType expectedSchema = Types.StructType.of(
169+
NestedField.required(1, "id", Types.LongType.get()),
170+
NestedField.optional(3, "count", Types.IntegerType.get()),
171+
NestedField.optional(2, "data", Types.StringType.get()));
172+
173+
Assert.assertEquals("Schema should match expected",
174+
expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct());
175+
}
176+
177+
@Test
178+
public void testAlterColumnPositionFirst() {
179+
sql("ALTER TABLE %s ADD COLUMN count int", tableName);
180+
sql("ALTER TABLE %s ALTER COLUMN count FIRST", tableName);
181+
182+
Types.StructType expectedSchema = Types.StructType.of(
183+
NestedField.optional(3, "count", Types.IntegerType.get()),
184+
NestedField.required(1, "id", Types.LongType.get()),
185+
NestedField.optional(2, "data", Types.StringType.get()));
186+
187+
Assert.assertEquals("Schema should match expected",
188+
expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct());
189+
}
190+
191+
@Test
192+
public void testTableRename() {
193+
Assume.assumeFalse("Hadoop catalog does not support rename", validationCatalog instanceof HadoopCatalog);
194+
195+
Assert.assertTrue("Initial name should exist", validationCatalog.tableExists(tableIdent));
196+
Assert.assertFalse("New name should not exist", validationCatalog.tableExists(renamedIdent));
197+
198+
sql("ALTER TABLE %s RENAME TO %s2", tableName, tableName);
199+
200+
Assert.assertFalse("Initial name should not exist", validationCatalog.tableExists(tableIdent));
201+
Assert.assertTrue("New name should exist", validationCatalog.tableExists(renamedIdent));
202+
}
203+
204+
@Test
205+
public void testSetTableProperties() {
206+
sql("ALTER TABLE %s SET TBLPROPERTIES ('prop'='value')", tableName);
207+
208+
Assert.assertEquals("Should have the new table property",
209+
"value", validationCatalog.loadTable(tableIdent).properties().get("prop"));
210+
211+
sql("ALTER TABLE %s UNSET TBLPROPERTIES ('prop')", tableName);
212+
213+
Assert.assertNull("Should not have the removed table property",
214+
validationCatalog.loadTable(tableIdent).properties().get("prop"));
215+
}
216+
}

0 commit comments

Comments
 (0)