Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,17 @@ public Transaction newReplaceTableTransaction(
throw new NoSuchTableException("No such table: " + identifier);
}

String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
Map<String, String> tableProperties = properties != null ? properties : Maps.newHashMap();
TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, baseLocation, tableProperties);

TableMetadata metadata;
if (ops.current() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear on whether this really should be the right behavior. Basically we're saying that a replace table will keep the existing location (as opposed to using defaults). I suspect we don't have create or replace with location semantics, but this is making some assumptions that a replacement is somehow the same as the old. If we were to go with id based pathing convention, this wouldn't work.

I don't think this is an issue at this point, but it might make sense to push this down to the location provider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct because the REPLACE TABLE doesn't completely wipe out the old table. In most ways, it is the same table.

Calling buildReplacement will replace the schema and partition spec so that the transaction can add a new snapshot. Table history, old snapshots, and existing table properties are kept so that you can inspect the table and don't need to add table configuration every time you run the SQL.

We could add a flag to turn off this behavior and wipe out the old by default, but I don't think that's what users really want. It makes sense for things like default format and other settings to persist across replace operations, so that table configuration and table operations are orthogonal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I guess that makes sense. I assumed the semantics to be the same a transactional drop and create, but based on a little searching, it's much less clear than that. For example, with db2 create or replace will actually retain all data (assuming if aligns with the new table definition) by default.

I agree that preserving table properties make sense, but wouldn't this wipe out comments (which we may also want to preserve)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would probably wipe out column comments. I'll have to fix that.

String baseLocation = location != null ? location : ops.current().location();
metadata = ops.current().buildReplacement(schema, spec, baseLocation, tableProperties);
} else {
String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
metadata = TableMetadata.newTableMetadata(schema, spec, baseLocation, tableProperties);
}

if (orCreate) {
return Transactions.createOrReplaceTableTransaction(identifier.toString(), ops, metadata);
} else {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {

// The caller is responsible to pass a updatedPartitionSpec with correct partition field IDs
public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec updatedPartitionSpec,
Map<String, String> updatedProperties) {
String newLocation, Map<String, String> updatedProperties) {
ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(updatedPartitionSpec),
"Spec does not use sequential IDs that are required in v1: %s", updatedPartitionSpec);

Expand Down Expand Up @@ -602,7 +602,7 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update
newProperties.putAll(this.properties);
newProperties.putAll(updatedProperties);

return new TableMetadata(null, formatVersion, uuid, location,
return new TableMetadata(null, formatVersion, uuid, newLocation,
lastSequenceNumber, System.currentTimeMillis(), nextLastColumnId.get(), freshSchema,
specId, builder.build(), ImmutableMap.copyOf(newProperties),
-1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties));
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/org/apache/iceberg/TestTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static Transaction beginReplace(File temp, String name, Schema schema, Pa

TableMetadata metadata;
if (current != null) {
metadata = current.buildReplacement(schema, spec, properties);
metadata = current.buildReplacement(schema, spec, current.location(), properties);
return Transactions.replaceTableTransaction(name, ops, metadata);
} else {
metadata = newTableMetadata(schema, spec, temp.toString(), properties);
Expand Down
18 changes: 17 additions & 1 deletion hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,12 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
}

@Override
public void renameTable(TableIdentifier from, TableIdentifier to) {
public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
if (!isValidIdentifier(from)) {
throw new NoSuchTableException("Invalid identifier: %s", from);
}

TableIdentifier to = removeCatalogName(originalTo);
Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to);

String toDatabase = to.namespace().level(0);
Expand Down Expand Up @@ -347,6 +349,20 @@ protected boolean isValidIdentifier(TableIdentifier tableIdentifier) {
return tableIdentifier.namespace().levels().length == 1;
}

private TableIdentifier removeCatalogName(TableIdentifier to) {
if (isValidIdentifier(to)) {
return to;
}

// check if the identifier includes the catalog name and remove it
if (to.namespace().levels().length == 2 && name().equalsIgnoreCase(to.namespace().level(0))) {
return TableIdentifier.of(Namespace.of(to.namespace().level(1)), to.name());
}

// return the original unmodified
return to;
}

private boolean isValidateNamespace(Namespace namespace) {
return namespace.levels().length == 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void testReplaceTableTxn() {
txn.commitTransaction();

Table table = catalog.loadTable(TABLE_IDENTIFIER);
Assert.assertEquals("Partition spec should match", PartitionSpec.unpartitioned(), table.spec());
Assert.assertEquals("Partition spec should be unpartitioned", 0, table.spec().fields().size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is any PartitionSpecwith 0 fields the unpartitioned spec or should it match a certain spec (like in the removed version)? Seems like the equals method takes fields and specId into the contract.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any partition spec with 0 fields is considered unpartitioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should note that this needed to be updated because the spec's ID didn't match, causing the test to fail.

PartitionSpec.unpartitioned() doesn't necessarily have the right ID for a given table. We primarily use it when creating tables, where the spec gets rebuilt and assigned the right ID.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I supposed that was the reason for the change (spec's ID not matching). Thanks for the context!

}

@Test
Expand Down Expand Up @@ -233,7 +233,7 @@ public void testCreateOrReplaceTableTxnTableExists() {
txn.commitTransaction();

Table table = catalog.loadTable(TABLE_IDENTIFIER);
Assert.assertEquals("Partition spec should match", PartitionSpec.unpartitioned(), table.spec());
Assert.assertEquals("Partition spec should be unpartitioned", 0, table.spec().fields().size());
}

@Test
Expand Down
39 changes: 36 additions & 3 deletions spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,20 @@
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;

public class SparkTestBase {

protected static final Object ANY = new Object();

private static TestHiveMetastore metastore = null;
private static HiveConf hiveConf = null;
protected static SparkSession spark = null;
Expand All @@ -48,6 +53,7 @@ public static void startMetastoreAndSpark() {

SparkTestBase.spark = SparkSession.builder()
.master("local[2]")
.config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
.config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
.enableHiveSupport()
.getOrCreate();
Expand All @@ -65,19 +71,46 @@ public static void stopMetastoreAndSpark() {
SparkTestBase.spark = null;
}

protected List<String[]> sql(String query, Object... args) {
protected List<Object[]> sql(String query, Object... args) {
List<Row> rows = spark.sql(String.format(query, args)).collectAsList();
if (rows.size() < 1) {
return ImmutableList.of();
}

return rows.stream()
.map(row -> IntStream.range(0, row.size())
.mapToObj(pos -> row.isNullAt(pos) ? null : row.get(pos).toString())
.toArray(String[]::new)
.mapToObj(pos -> row.isNullAt(pos) ? null : row.get(pos))
.toArray(Object[]::new)
).collect(Collectors.toList());
}

protected Object scalarSql(String query, Object... args) {
List<Object[]> rows = sql(query, args);
Assert.assertEquals("Scalar SQL should return one row", 1, rows.size());
Object[] row = Iterables.getOnlyElement(rows);
Assert.assertEquals("Scalar SQL should return one value", 1, row.length);
return row[0];
}

protected Object[] row(Object... values) {
return values;
}

protected void assertEquals(String context, List<Object[]> expectedRows, List<Object[]> actualRows) {
Assert.assertEquals(context + ": number of results should match", expectedRows.size(), actualRows.size());
for (int row = 0; row < expectedRows.size(); row += 1) {
Object[] expected = expectedRows.get(row);
Object[] actual = actualRows.get(row);
Assert.assertEquals("Number of columns should match", expected.length, actual.length);
for (int col = 0; col < actualRows.get(row).length; col += 1) {
if (expected[col] != ANY) {
Assert.assertEquals(context + ": row " + row + " col " + col + " contents should match",
expected[col], actual[col]);
}
}
}
}

protected static String dbPath(String dbName) {
return metastore.getDatabasePath(dbName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SimpleRecord {
public SimpleRecord() {
}

SimpleRecord(Integer id, String data) {
public SimpleRecord(Integer id, String data) {
this.id = id;
this.data = data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.AfterClass;
Expand Down Expand Up @@ -76,6 +78,8 @@ public static Object[][] parameters() {
protected final String catalogName;
protected final Catalog validationCatalog;
protected final SupportsNamespaces validationNamespaceCatalog;
protected final TableIdentifier tableIdent = TableIdentifier.of(Namespace.of("default"), "table");
protected final String tableName;

public SparkCatalogTestBase(String catalogName, String implementation, Map<String, String> config) {
this.catalogName = catalogName;
Expand All @@ -90,5 +94,13 @@ public SparkCatalogTestBase(String catalogName, String implementation, Map<Strin
if (config.get("type").equalsIgnoreCase("hadoop")) {
spark.conf().set("spark.sql.catalog." + catalogName + ".warehouse", "file:" + warehouse);
}

this.tableName = (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default.table";

sql("CREATE NAMESPACE IF NOT EXISTS default");
}

protected String tableName(String name) {
return (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default." + name;
}
}
Loading