Skip to content

Commit 7946c9e

Browse files
committed
Add resetTable procedure
1 parent 3d00780 commit 7946c9e

File tree

8 files changed

+281
-0
lines changed

8 files changed

+281
-0
lines changed

api/src/main/java/org/apache/iceberg/catalog/Catalog.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,17 @@ default Table registerTable(TableIdentifier identifier, String metadataFileLocat
347347
throw new UnsupportedOperationException("Registering tables is not supported");
348348
}
349349

350+
/**
351+
* Reset the table with a meta file.
352+
*
353+
* @param identifier a table identifier
354+
* @param metadataFileLocation the location of a metadata file
355+
* @return a Table instance
356+
*/
357+
default Table resetTable(TableIdentifier identifier, String metadataFileLocation) {
358+
throw new UnsupportedOperationException("Reset tables is not supported");
359+
}
360+
350361
/**
351362
* /** Instantiate a builder to either create a table or start a create/replace transaction.
352363
*

core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,21 @@ public Table registerTable(TableIdentifier identifier, String metadataFileLocati
8686
return new BaseTable(ops, identifier.toString());
8787
}
8888

89+
@Override
90+
public Table resetTable(TableIdentifier identifier, String metadataFileLocation) {
91+
Preconditions.checkArgument(
92+
identifier != null && isValidIdentifier(identifier), "Invalid identifier: %s", identifier);
93+
Preconditions.checkArgument(
94+
metadataFileLocation != null && !metadataFileLocation.isEmpty(),
95+
"Cannot reset table with an empty metadata file location");
96+
if (!tableExists(identifier)) {
97+
throw new NoSuchTableException("Table doesn't exist: %s", identifier);
98+
}
99+
TableOperations ops = newTableOps(identifier);
100+
ops.reset(metadataFileLocation);
101+
return new BaseTable(ops, identifier.toString());
102+
}
103+
89104
@Override
90105
public TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
91106
return new BaseMetastoreCatalogTableBuilder(identifier, schema);

core/src/main/java/org/apache/iceberg/CachingCatalog.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,13 @@ public Table registerTable(TableIdentifier identifier, String metadataFileLocati
194194
return table;
195195
}
196196

197+
@Override
198+
public Table resetTable(TableIdentifier identifier, String metadataFileLocation) {
199+
Table table = catalog.resetTable(identifier, metadataFileLocation);
200+
invalidateTable(identifier);
201+
return table;
202+
}
203+
197204
private Iterable<TableIdentifier> metadataTableIdentifiers(TableIdentifier ident) {
198205
ImmutableList.Builder<TableIdentifier> builder = ImmutableList.builder();
199206

core/src/main/java/org/apache/iceberg/TableOperations.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,15 @@ public interface TableOperations {
6565
/** Returns a {@link FileIO} to read and write table data and metadata files. */
6666
FileIO io();
6767

68+
/**
69+
* Reset the table with a meta file.
70+
*
71+
* @param metadataFileLocation the location of a metadata file
72+
*/
73+
default void reset(String metadataFileLocation) {
74+
throw new UnsupportedOperationException("Reset table operation is not supported.");
75+
}
76+
6877
/**
6978
* Returns a {@link org.apache.iceberg.encryption.EncryptionManager} to encrypt and decrypt data
7079
* files.

hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,30 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
345345
"Committed to table {} with the new metadata location {}", fullName, newMetadataLocation);
346346
}
347347

348+
@Override
349+
public void reset(String metadataFileLocation) {
350+
Optional<Long> lockId = Optional.empty();
351+
ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true));
352+
tableLevelMutex.lock();
353+
try {
354+
lockId = Optional.of(acquireLock());
355+
Table tbl = loadHmsTable();
356+
validateTableIsIceberg(tbl, fullName);
357+
tbl.getParameters().put(METADATA_LOCATION_PROP, metadataFileLocation);
358+
persistTable(tbl, true);
359+
} catch (TException | UnknownHostException e) {
360+
throw new RuntimeException(
361+
String.format("Metastore operation failed for %s.%s", database, tableName), e);
362+
} catch (InterruptedException e) {
363+
Thread.currentThread().interrupt();
364+
throw new RuntimeException("Interrupted during reset", e);
365+
} finally {
366+
unlock(lockId);
367+
tableLevelMutex.unlock();
368+
}
369+
LOG.info("Reset table {} with the new metadata location {}", fullName, metadataFileLocation);
370+
}
371+
348372
@VisibleForTesting
349373
void persistTable(Table hmsTable, boolean updateHiveTable)
350374
throws TException, InterruptedException {
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark.extensions;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
import org.apache.iceberg.HasTableOperations;
24+
import org.apache.iceberg.Table;
25+
import org.apache.iceberg.hive.HiveTableOperations;
26+
import org.apache.iceberg.spark.Spark3Util;
27+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
28+
import org.apache.spark.sql.catalyst.parser.ParseException;
29+
import org.apache.spark.sql.functions;
30+
import org.apache.spark.sql.types.DataTypes;
31+
import org.junit.After;
32+
import org.junit.Assert;
33+
import org.junit.Assume;
34+
import org.junit.Rule;
35+
import org.junit.Test;
36+
import org.junit.rules.TemporaryFolder;
37+
38+
public class TestResetTableProcedure extends SparkExtensionsTestBase {
39+
40+
private final String targetName;
41+
42+
public TestResetTableProcedure(
43+
String catalogName, String implementation, Map<String, String> config) {
44+
super(catalogName, implementation, config);
45+
targetName = tableName("reset_table");
46+
}
47+
48+
@Rule public TemporaryFolder temp = new TemporaryFolder();
49+
50+
@After
51+
public void dropTables() {
52+
sql("DROP TABLE IF EXISTS %s", tableName);
53+
sql("DROP TABLE IF EXISTS %s", targetName);
54+
}
55+
56+
@Test
57+
public void testResetTable() throws NoSuchTableException, ParseException {
58+
Assume.assumeTrue(
59+
"Register/Reset only implemented on Hive Catalogs",
60+
spark.conf().get("spark.sql.catalog." + catalogName + ".type").equals("hive"));
61+
62+
long numRows1 = 100;
63+
long numRows2 = 200;
64+
sql("CREATE TABLE %s (id int, data string) using ICEBERG", tableName);
65+
66+
spark
67+
.range(0, numRows1)
68+
.withColumn("data", functions.col("id").cast(DataTypes.StringType))
69+
.writeTo(tableName)
70+
.append();
71+
Table table = Spark3Util.loadIcebergTable(spark, tableName);
72+
String metaLocation =
73+
((HiveTableOperations) (((HasTableOperations) table).operations()))
74+
.currentMetadataLocation();
75+
spark
76+
.range(0, numRows2)
77+
.withColumn("data", functions.col("id").cast(DataTypes.StringType))
78+
.writeTo(tableName)
79+
.append();
80+
table = Spark3Util.loadIcebergTable(spark, tableName);
81+
String newMetalocation =
82+
((HiveTableOperations) (((HasTableOperations) table).operations()))
83+
.currentMetadataLocation();
84+
85+
sql("CALL %s.system.register_table('%s', '%s')", catalogName, targetName, metaLocation);
86+
List<Object[]> oldResults = sql("SELECT * FROM %s", targetName);
87+
sql("CALL %s.system.reset_table('%s', '%s')", catalogName, targetName, newMetalocation);
88+
List<Object[]> newResults = sql("SELECT * FROM %s", targetName);
89+
90+
Assert.assertEquals(
91+
"Should have the right row count in the procedure result", numRows1, oldResults.size());
92+
Assert.assertEquals(
93+
"Should have the right row count in the procedure result",
94+
numRows1 + numRows2,
95+
newResults.size());
96+
Assert.assertThrows(
97+
"Can't reset a nonexistent table.",
98+
org.apache.iceberg.exceptions.NoSuchTableException.class,
99+
() ->
100+
sql(
101+
"CALL %s.system.reset_table('%s', '%s')",
102+
catalogName, "nonExistTableName", newMetalocation));
103+
}
104+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark.procedures;
20+
21+
import org.apache.iceberg.Snapshot;
22+
import org.apache.iceberg.SnapshotSummary;
23+
import org.apache.iceberg.Table;
24+
import org.apache.iceberg.catalog.Catalog;
25+
import org.apache.iceberg.catalog.TableIdentifier;
26+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
27+
import org.apache.iceberg.spark.Spark3Util;
28+
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
29+
import org.apache.iceberg.spark.source.HasIcebergCatalog;
30+
import org.apache.spark.sql.catalyst.InternalRow;
31+
import org.apache.spark.sql.connector.catalog.TableCatalog;
32+
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
33+
import org.apache.spark.sql.types.DataTypes;
34+
import org.apache.spark.sql.types.Metadata;
35+
import org.apache.spark.sql.types.StructField;
36+
import org.apache.spark.sql.types.StructType;
37+
38+
class ResetTableProcedure extends BaseProcedure {
39+
private static final ProcedureParameter[] PARAMETERS =
40+
new ProcedureParameter[] {
41+
ProcedureParameter.required("table", DataTypes.StringType),
42+
ProcedureParameter.required("metadata_file", DataTypes.StringType)
43+
};
44+
45+
private static final StructType OUTPUT_TYPE =
46+
new StructType(
47+
new StructField[] {
48+
new StructField("current_snapshot_id", DataTypes.LongType, true, Metadata.empty()),
49+
new StructField("total_records_count", DataTypes.LongType, true, Metadata.empty()),
50+
new StructField("total_data_files_count", DataTypes.LongType, true, Metadata.empty())
51+
});
52+
53+
private ResetTableProcedure(TableCatalog tableCatalog) {
54+
super(tableCatalog);
55+
}
56+
57+
public static ProcedureBuilder builder() {
58+
return new Builder<ResetTableProcedure>() {
59+
@Override
60+
protected ResetTableProcedure doBuild() {
61+
return new ResetTableProcedure(tableCatalog());
62+
}
63+
};
64+
}
65+
66+
@Override
67+
public ProcedureParameter[] parameters() {
68+
return PARAMETERS;
69+
}
70+
71+
@Override
72+
public StructType outputType() {
73+
return OUTPUT_TYPE;
74+
}
75+
76+
@Override
77+
public InternalRow[] call(InternalRow args) {
78+
TableIdentifier tableName =
79+
Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table"));
80+
String metadataFile = args.getString(1);
81+
Preconditions.checkArgument(
82+
tableCatalog() instanceof HasIcebergCatalog,
83+
"Cannot use Register Table in a non-Iceberg catalog");
84+
Preconditions.checkArgument(
85+
metadataFile != null && !metadataFile.isEmpty(),
86+
"Cannot handle an empty argument metadata_file");
87+
88+
Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog();
89+
Table table = icebergCatalog.resetTable(tableName, metadataFile);
90+
Long currentSnapshotId = null;
91+
Long totalDataFiles = null;
92+
Long totalRecords = null;
93+
94+
Snapshot currentSnapshot = table.currentSnapshot();
95+
if (currentSnapshot != null) {
96+
currentSnapshotId = currentSnapshot.snapshotId();
97+
totalDataFiles =
98+
Long.parseLong(currentSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
99+
totalRecords =
100+
Long.parseLong(currentSnapshot.summary().get(SnapshotSummary.TOTAL_RECORDS_PROP));
101+
}
102+
103+
return new InternalRow[] {newInternalRow(currentSnapshotId, totalRecords, totalDataFiles)};
104+
}
105+
106+
@Override
107+
public String description() {
108+
return "ResetTableProcedure";
109+
}
110+
}

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
5252
mapBuilder.put("add_files", AddFilesProcedure::builder);
5353
mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
5454
mapBuilder.put("register_table", RegisterTableProcedure::builder);
55+
mapBuilder.put("reset_table", ResetTableProcedure::builder);
5556
mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
5657
return mapBuilder.build();
5758
}

0 commit comments

Comments
 (0)