Skip to content

Commit d2bc1da

Browse files
committed
Add CTAS tests.
1 parent a2dafe1 commit d2bc1da

File tree

6 files changed

+204
-6
lines changed

6 files changed

+204
-6
lines changed

core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,17 @@ public Transaction newReplaceTableTransaction(
110110
throw new NoSuchTableException("No such table: " + identifier);
111111
}
112112

113-
String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
114-
Map<String, String> tableProperties = properties != null ? properties : Maps.newHashMap();
115-
TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, baseLocation, tableProperties);
113+
TableMetadata metadata;
114+
if (ops.current() != null) {
115+
String baseLocation = location != null ? location : ops.current().location();
116+
Map<String, String> tableProperties = properties != null ? properties : Maps.newHashMap();
117+
metadata = ops.current().buildReplacement(schema, spec, baseLocation, tableProperties);
118+
} else {
119+
String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
120+
Map<String, String> tableProperties = properties != null ? properties : Maps.newHashMap();
121+
metadata = TableMetadata.newTableMetadata(schema, spec, baseLocation, tableProperties);
122+
}
123+
116124
if (orCreate) {
117125
return Transactions.createOrReplaceTableTransaction(identifier.toString(), ops, metadata);
118126
} else {

core/src/main/java/org/apache/iceberg/TableMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
566566

567567
// The caller is responsible to pass a updatedPartitionSpec with correct partition field IDs
568568
public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec updatedPartitionSpec,
569-
Map<String, String> updatedProperties) {
569+
String newLocation, Map<String, String> updatedProperties) {
570570
ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(updatedPartitionSpec),
571571
"Spec does not use sequential IDs that are required in v1: %s", updatedPartitionSpec);
572572

@@ -602,7 +602,7 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update
602602
newProperties.putAll(this.properties);
603603
newProperties.putAll(updatedProperties);
604604

605-
return new TableMetadata(null, formatVersion, uuid, location,
605+
return new TableMetadata(null, formatVersion, uuid, newLocation,
606606
lastSequenceNumber, System.currentTimeMillis(), nextLastColumnId.get(), freshSchema,
607607
specId, builder.build(), ImmutableMap.copyOf(newProperties),
608608
-1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties));

core/src/test/java/org/apache/iceberg/TestTables.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public static Transaction beginReplace(File temp, String name, Schema schema, Pa
7878

7979
TableMetadata metadata;
8080
if (current != null) {
81-
metadata = current.buildReplacement(schema, spec, properties);
81+
metadata = current.buildReplacement(schema, spec, current.location(), properties);
8282
return Transactions.replaceTableTransaction(name, ops, metadata);
8383
} else {
8484
metadata = newTableMetadata(schema, spec, temp.toString(), properties);

spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ protected void assertEquals(String context, List<Object[]> expectedRows, List<Ob
101101
for (int row = 0; row < expectedRows.size(); row += 1) {
102102
Object[] expected = expectedRows.get(row);
103103
Object[] actual = actualRows.get(row);
104+
Assert.assertEquals("Number of columns should match", expected.length, actual.length);
104105
for (int col = 0; col < actualRows.get(row).length; col += 1) {
105106
if (expected[col] != ANY) {
106107
Assert.assertEquals(context + ": row " + row + " col " + col + " contents should match",

spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,8 @@ public SparkCatalogTestBase(String catalogName, String implementation, Map<Strin
9797

9898
this.tableName = (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default.table";
9999
}
100+
101+
protected String tableName(String name) {
102+
return (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default." + name;
103+
}
100104
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.spark.sql;
21+
22+
import com.google.common.collect.Iterables;
23+
import java.util.Map;
24+
import org.apache.iceberg.PartitionSpec;
25+
import org.apache.iceberg.Schema;
26+
import org.apache.iceberg.Table;
27+
import org.apache.iceberg.spark.SparkCatalogTestBase;
28+
import org.apache.iceberg.types.Types;
29+
import org.junit.After;
30+
import org.junit.Assert;
31+
import org.junit.Test;
32+
33+
public class TestCreateTableAsSelect extends SparkCatalogTestBase {
34+
35+
private final String sourceName;
36+
37+
public TestCreateTableAsSelect(String catalogName, String implementation, Map<String, String> config) {
38+
super(catalogName, implementation, config);
39+
this.sourceName = tableName("source");
40+
41+
sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) " +
42+
"USING iceberg PARTITIONED BY (truncate(id, 3))", sourceName);
43+
sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName);
44+
}
45+
46+
@After
47+
public void removeTables() {
48+
sql("DROP TABLE IF EXISTS %s", tableName);
49+
}
50+
51+
@Test
52+
public void testUnpartitionedCTAS() {
53+
sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", tableName, sourceName);
54+
55+
Schema expectedSchema = new Schema(
56+
Types.NestedField.optional(1, "id", Types.LongType.get()),
57+
Types.NestedField.optional(2, "data", Types.StringType.get())
58+
);
59+
60+
Table ctasTable = validationCatalog.loadTable(tableIdent);
61+
62+
Assert.assertEquals("Should have expected nullable schema",
63+
expectedSchema.asStruct(), ctasTable.schema().asStruct());
64+
Assert.assertEquals("Should be an unpartitioned table",
65+
0, ctasTable.spec().fields().size());
66+
assertEquals("Should have rows matching the source table",
67+
sql("SELECT * FROM %s ORDER BY id", sourceName),
68+
sql("SELECT * FROM %s ORDER BY id", tableName));
69+
}
70+
71+
@Test
72+
public void testPartitionedCTAS() {
73+
sql("CREATE TABLE %s USING iceberg PARTITIONED BY (id) AS SELECT * FROM %s ORDER BY id", tableName, sourceName);
74+
75+
Schema expectedSchema = new Schema(
76+
Types.NestedField.optional(1, "id", Types.LongType.get()),
77+
Types.NestedField.optional(2, "data", Types.StringType.get())
78+
);
79+
80+
PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
81+
.identity("id")
82+
.build();
83+
84+
Table ctasTable = validationCatalog.loadTable(tableIdent);
85+
86+
Assert.assertEquals("Should have expected nullable schema",
87+
expectedSchema.asStruct(), ctasTable.schema().asStruct());
88+
Assert.assertEquals("Should be partitioned by id",
89+
expectedSpec, ctasTable.spec());
90+
assertEquals("Should have rows matching the source table",
91+
sql("SELECT * FROM %s ORDER BY id", sourceName),
92+
sql("SELECT * FROM %s ORDER BY id", tableName));
93+
}
94+
95+
@Test
96+
public void testRTAS() {
97+
sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", tableName, sourceName);
98+
99+
assertEquals("Should have rows matching the source table",
100+
sql("SELECT * FROM %s ORDER BY id", sourceName),
101+
sql("SELECT * FROM %s ORDER BY id", tableName));
102+
103+
sql("REPLACE TABLE %s USING iceberg PARTITIONED BY (part) AS " +
104+
"SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " +
105+
"FROM %s ORDER BY 3, 1", tableName, sourceName);
106+
107+
// spark_catalog does not use an atomic replace, so the table history and old spec is dropped
108+
// the other catalogs do use atomic replace, so the spec id is incremented
109+
boolean isAtomic = !"spark_catalog".equals(catalogName);
110+
111+
Schema expectedSchema = new Schema(
112+
Types.NestedField.optional(1, "id", Types.LongType.get()),
113+
Types.NestedField.optional(2, "data", Types.StringType.get()),
114+
Types.NestedField.optional(3, "part", Types.StringType.get())
115+
);
116+
117+
int specId = isAtomic ? 1 : 0;
118+
PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
119+
.identity("part")
120+
.withSpecId(specId)
121+
.build();
122+
123+
Table rtasTable = validationCatalog.loadTable(tableIdent);
124+
125+
// the replacement table has a different schema and partition spec than the original
126+
Assert.assertEquals("Should have expected nullable schema",
127+
expectedSchema.asStruct(), rtasTable.schema().asStruct());
128+
Assert.assertEquals("Should be partitioned by part",
129+
expectedSpec, rtasTable.spec());
130+
131+
assertEquals("Should have rows matching the source table",
132+
sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " +
133+
"FROM %s ORDER BY id", sourceName),
134+
sql("SELECT * FROM %s ORDER BY id", tableName));
135+
136+
Assert.assertEquals("Table should have expected snapshots",
137+
isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
138+
}
139+
140+
@Test
141+
public void testCreateRTAS() {
142+
sql("CREATE OR REPLACE TABLE %s USING iceberg PARTITIONED BY (part) AS " +
143+
"SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " +
144+
"FROM %s ORDER BY 3, 1", tableName, sourceName);
145+
146+
assertEquals("Should have rows matching the source table",
147+
sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " +
148+
"FROM %s ORDER BY id", sourceName),
149+
sql("SELECT * FROM %s ORDER BY id", tableName));
150+
151+
sql("CREATE OR REPLACE TABLE %s USING iceberg PARTITIONED BY (part) AS " +
152+
"SELECT 2 * id as id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " +
153+
"FROM %s ORDER BY 3, 1", tableName, sourceName);
154+
155+
// spark_catalog does not use an atomic replace, so the table history is dropped
156+
boolean isAtomic = !"spark_catalog".equals(catalogName);
157+
158+
Schema expectedSchema = new Schema(
159+
Types.NestedField.optional(1, "id", Types.LongType.get()),
160+
Types.NestedField.optional(2, "data", Types.StringType.get()),
161+
Types.NestedField.optional(3, "part", Types.StringType.get())
162+
);
163+
164+
PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
165+
.identity("part")
166+
.withSpecId(0) // the spec is identical and should be reused
167+
.build();
168+
169+
Table rtasTable = validationCatalog.loadTable(tableIdent);
170+
171+
// the replacement table has a different schema and partition spec than the original
172+
Assert.assertEquals("Should have expected nullable schema",
173+
expectedSchema.asStruct(), rtasTable.schema().asStruct());
174+
Assert.assertEquals("Should be partitioned by part",
175+
expectedSpec, rtasTable.spec());
176+
177+
assertEquals("Should have rows matching the source table",
178+
sql("SELECT 2 * id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " +
179+
"FROM %s ORDER BY id", sourceName),
180+
sql("SELECT * FROM %s ORDER BY id", tableName));
181+
182+
Assert.assertEquals("Table should have expected snapshots",
183+
isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
184+
}
185+
}

0 commit comments

Comments
 (0)