Skip to content

Commit 375d69c

Browse files
committed
Support force option on spark registerTable procedure
1 parent d397ae3 commit 375d69c

File tree

15 files changed

+137
-28
lines changed

15 files changed

+137
-28
lines changed

.palantir/revapi.yml

+6
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@ acceptedBreaks:
109109
- code: "java.method.addedToInterface"
110110
new: "method org.apache.iceberg.ReplacePartitions org.apache.iceberg.ReplacePartitions::validateNoConflictingDeletes()"
111111
justification: "Accept all changes prior to introducing API compatibility checks"
112+
- code: "java.method.numberOfParametersChanged"
113+
old: "method org.apache.iceberg.Table org.apache.iceberg.catalog.Catalog::registerTable(org.apache.iceberg.catalog.TableIdentifier,\
114+
\ java.lang.String)"
115+
new: "method org.apache.iceberg.Table org.apache.iceberg.catalog.Catalog::registerTable(org.apache.iceberg.catalog.TableIdentifier,\
116+
\ java.lang.String, boolean)"
117+
justification: "Add 'force' option to register existing table, this is optional"
112118
- code: "java.method.numberOfParametersChanged"
113119
old: "method void org.apache.iceberg.events.IncrementalScanEvent::<init>(java.lang.String,\
114120
\ long, long, org.apache.iceberg.expressions.Expression, org.apache.iceberg.Schema)"

api/src/main/java/org/apache/iceberg/catalog/Catalog.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -356,10 +356,11 @@ default void invalidateTable(TableIdentifier identifier) {
356356
*
357357
* @param identifier a table identifier
358358
* @param metadataFileLocation the location of a metadata file
359+
* @param force default is false. If true, do register table even if table exists otherwise it will throw error.
359360
* @return a Table instance
360361
* @throws AlreadyExistsException if the table already exists in the catalog.
361362
*/
362-
default Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
363+
default Table registerTable(TableIdentifier identifier, String metadataFileLocation, boolean force) {
363364
throw new UnsupportedOperationException("Registering tables is not supported");
364365
}
365366

api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,11 @@ public Map<String, String> properties() {
147147
* @param context session context
148148
* @param ident a table identifier
149149
* @param metadataFileLocation the location of a metadata file
150+
* @param force default is false. If true, do register table even if table exists otherwise it will throw error.
150151
* @return a Table instance
151152
* @throws AlreadyExistsException if the table already exists in the catalog.
152153
*/
153-
Table registerTable(SessionContext context, TableIdentifier ident, String metadataFileLocation);
154+
Table registerTable(SessionContext context, TableIdentifier ident, String metadataFileLocation, boolean force);
154155

155156
/**
156157
* Check whether table exists.

core/src/main/java/org/apache/iceberg/CachingCatalog.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ public void invalidateTable(TableIdentifier ident) {
183183
}
184184

185185
@Override
186-
public Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
187-
Table table = catalog.registerTable(identifier, metadataFileLocation);
186+
public Table registerTable(TableIdentifier identifier, String metadataFileLocation, boolean force) {
187+
Table table = catalog.registerTable(identifier, metadataFileLocation, force);
188188
invalidateTable(identifier);
189189
return table;
190190
}

core/src/main/java/org/apache/iceberg/TableMetadata.java

+9
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class TableMetadata implements Serializable {
5858
static final int INITIAL_SCHEMA_ID = 0;
5959

6060
private static final long ONE_MINUTE = TimeUnit.MINUTES.toMillis(1);
61+
private boolean registerFlag = false;
6162

6263
public static TableMetadata newTableMetadata(Schema schema,
6364
PartitionSpec spec,
@@ -714,6 +715,14 @@ private Map<Integer, Schema> indexSchemas() {
714715
return builder.build();
715716
}
716717

718+
public boolean isForRegister() {
719+
return this.registerFlag;
720+
}
721+
722+
public void markForRegister() {
723+
this.registerFlag = true;
724+
}
725+
717726
private static Map<Integer, PartitionSpec> indexSpecs(List<PartitionSpec> specs) {
718727
ImmutableMap.Builder<Integer, PartitionSpec> builder = ImmutableMap.builder();
719728
for (PartitionSpec spec : specs) {

core/src/main/java/org/apache/iceberg/catalog/BaseSessionCatalog.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ public TableBuilder buildTable(TableIdentifier ident, Schema schema) {
8787
}
8888

8989
@Override
90-
public Table registerTable(TableIdentifier ident, String metadataFileLocation) {
91-
return BaseSessionCatalog.this.registerTable(context, ident, metadataFileLocation);
90+
public Table registerTable(TableIdentifier ident, String metadataFileLocation, boolean force) {
91+
return BaseSessionCatalog.this.registerTable(context, ident, metadataFileLocation, force);
9292
}
9393

9494
@Override

core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,8 @@ public void renameTable(TableIdentifier from, TableIdentifier to) {
182182
}
183183

184184
@Override
185-
public Table registerTable(TableIdentifier ident, String metadataFileLocation) {
186-
return delegate.registerTable(ident, metadataFileLocation);
185+
public Table registerTable(TableIdentifier ident, String metadataFileLocation, boolean force) {
186+
return delegate.registerTable(ident, metadataFileLocation, force);
187187
}
188188

189189
@Override

core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ public void invalidateTable(SessionContext context, TableIdentifier ident) {
268268
}
269269

270270
@Override
271-
public Table registerTable(SessionContext context, TableIdentifier ident, String metadataFileLocation) {
271+
public Table registerTable(SessionContext context, TableIdentifier ident,
272+
String metadataFileLocation, boolean force) {
272273
throw new UnsupportedOperationException("Register table is not supported");
273274
}
274275

hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -235,19 +235,21 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
235235
}
236236

237237
@Override
238-
public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
238+
public org.apache.iceberg.Table registerTable(TableIdentifier identifier,
239+
String metadataFileLocation, boolean force) {
239240
Preconditions.checkArgument(isValidIdentifier(identifier), "Invalid identifier: %s", identifier);
240241

241-
// Throw an exception if this table already exists in the catalog.
242-
if (tableExists(identifier)) {
242+
// Throw an exception if this table already exists in the catalog and no force option.
243+
if (tableExists(identifier) && !force) {
243244
throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", identifier);
244245
}
246+
HiveTableOperations ops = (HiveTableOperations) newTableOps(identifier);
247+
TableMetadata base = ops.current();
245248

246-
TableOperations ops = newTableOps(identifier);
247249
InputFile metadataFile = fileIO.newInputFile(metadataFileLocation);
248250
TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
249-
ops.commit(null, metadata);
250-
251+
metadata.markForRegister();
252+
ops.commit(base, metadata);
251253
return new BaseTable(ops, identifier.toString());
252254
}
253255

hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,9 @@ protected void doRefresh() {
215215
@SuppressWarnings("checkstyle:CyclomaticComplexity")
216216
@Override
217217
protected void doCommit(TableMetadata base, TableMetadata metadata) {
218-
String newMetadataLocation = base == null && metadata.metadataFileLocation() != null ?
219-
metadata.metadataFileLocation() : writeNewMetadata(metadata, currentVersion() + 1);
218+
boolean isTableForRegister = metadata.isForRegister();
219+
String newMetadataLocation = isTableForRegister ? metadata.metadataFileLocation()
220+
: writeNewMetadata(metadata, currentVersion() + 1);
220221
boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
221222
boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false);
222223

@@ -309,7 +310,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
309310
throw new RuntimeException("Interrupted during commit", e);
310311

311312
} finally {
312-
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex);
313+
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex, isTableForRegister);
313314
}
314315

315316
LOG.info("Committed to table {} with the new metadata location {}", fullName, newMetadataLocation);
@@ -571,9 +572,9 @@ long acquireLock() throws UnknownHostException, TException, InterruptedException
571572
}
572573

573574
private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation, Optional<Long> lockId,
574-
ReentrantLock tableLevelMutex) {
575+
ReentrantLock tableLevelMutex, boolean isTableForRegister) {
575576
try {
576-
if (commitStatus == CommitStatus.FAILURE) {
577+
if (!isTableForRegister && commitStatus == CommitStatus.FAILURE) {
577578
// If we are sure the commit failed, clean up the uncommitted metadata file
578579
io().deleteFile(metadataLocation);
579580
}

hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ public void testRegisterTable() throws TException {
380380
List<String> metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
381381
Assert.assertEquals(1, metadataVersionFiles.size());
382382

383-
catalog.registerTable(TABLE_IDENTIFIER, "file:" + metadataVersionFiles.get(0));
383+
catalog.registerTable(TABLE_IDENTIFIER, "file:" + metadataVersionFiles.get(0), false);
384384

385385
org.apache.hadoop.hive.metastore.api.Table newTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
386386

@@ -424,7 +424,7 @@ public void testRegisterHadoopTableToHiveCatalog() throws IOException, TExceptio
424424

425425
// register the table to hive catalog using the latest metadata file
426426
String latestMetadataFile = ((BaseTable) table).operations().current().metadataFileLocation();
427-
catalog.registerTable(identifier, "file:" + latestMetadataFile);
427+
catalog.registerTable(identifier, "file:" + latestMetadataFile, false);
428428
Assert.assertNotNull(metastoreClient.getTable(DB_NAME, "table1"));
429429

430430
// load the table in hive catalog
@@ -484,7 +484,7 @@ public void testRegisterExistingTable() throws TException {
484484
AssertHelpers.assertThrows(
485485
"Should complain that the table already exists", AlreadyExistsException.class,
486486
"Table already exists",
487-
() -> catalog.registerTable(TABLE_IDENTIFIER, "file:" + metadataVersionFiles.get(0)));
487+
() -> catalog.registerTable(TABLE_IDENTIFIER, "file:" + metadataVersionFiles.get(0), false));
488488
}
489489

490490
@Test

spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java

+39
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424
import org.apache.iceberg.HasTableOperations;
2525
import org.apache.iceberg.Table;
26+
import org.apache.iceberg.exceptions.AlreadyExistsException;
2627
import org.apache.iceberg.hive.HiveTableOperations;
2728
import org.apache.iceberg.spark.Spark3Util;
2829
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -86,4 +87,42 @@ public void testRegisterTable() throws NoSuchTableException, ParseException {
8687
Assert.assertEquals("Should have the right datafile count in the procedure result",
8788
originalFileCount, result.get(0)[2]);
8889
}
90+
91+
@Test
92+
public void testForceRegisterTable() throws NoSuchTableException, ParseException {
93+
Assume.assumeTrue("Register only implemented on Hive Catalogs",
94+
spark.conf().get("spark.sql.catalog." + catalogName + ".type").equals("hive"));
95+
96+
long numRows1 = 100;
97+
long numRows2 = 200;
98+
sql("CREATE TABLE %s (id int, data string) using ICEBERG", tableName);
99+
100+
spark.range(0, numRows1)
101+
.withColumn("data", functions.col("id").cast(DataTypes.StringType))
102+
.writeTo(tableName)
103+
.append();
104+
Table table = Spark3Util.loadIcebergTable(spark, tableName);
105+
String metaLocation = ((HiveTableOperations) (((HasTableOperations) table).operations())).currentMetadataLocation();
106+
spark.range(0, numRows2)
107+
.withColumn("data", functions.col("id").cast(DataTypes.StringType))
108+
.writeTo(tableName)
109+
.append();
110+
table = Spark3Util.loadIcebergTable(spark, tableName);
111+
String newMetalocation = ((HiveTableOperations) (((HasTableOperations) table).operations()))
112+
.currentMetadataLocation();
113+
114+
115+
sql("CALL %s.system.register_table('%s', '%s')", catalogName, targetName, metaLocation);
116+
List<Object[]> oldResults = sql("SELECT * FROM %s", targetName);
117+
sql("CALL %s.system.register_table('%s', '%s', 'force')", catalogName, targetName, newMetalocation);
118+
List<Object[]> newResults = sql("SELECT * FROM %s", targetName);
119+
120+
Assert.assertEquals("Should have the right row count in the procedure result",
121+
numRows1, oldResults.size());
122+
Assert.assertEquals("Should have the right row count in the procedure result",
123+
numRows1 + numRows2, newResults.size());
124+
Assert.assertThrows("Can't repeat the register table without the force option.",
125+
AlreadyExistsException.class,
126+
() -> sql("CALL %s.system.register_table('%s', '%s')", catalogName, targetName, newMetalocation));
127+
}
89128
}

spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
class RegisterTableProcedure extends BaseProcedure {
4040
private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
4141
ProcedureParameter.required("table", DataTypes.StringType),
42-
ProcedureParameter.required("metadata_file", DataTypes.StringType)
42+
ProcedureParameter.required("metadata_file", DataTypes.StringType),
43+
ProcedureParameter.optional("force", DataTypes.StringType)
4344
};
4445

4546
private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
@@ -75,13 +76,17 @@ public StructType outputType() {
7576
public InternalRow[] call(InternalRow args) {
7677
TableIdentifier tableName = Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table"));
7778
String metadataFile = args.getString(1);
79+
boolean forceOption = false;
80+
if (!args.isNullAt(2) && args.getString(2).equalsIgnoreCase("force")) {
81+
forceOption = true;
82+
}
7883
Preconditions.checkArgument(tableCatalog() instanceof HasIcebergCatalog,
7984
"Cannot use Register Table in a non-Iceberg catalog");
8085
Preconditions.checkArgument(metadataFile != null && !metadataFile.isEmpty(),
8186
"Cannot handle an empty argument metadata_file");
8287

8388
Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog();
84-
Table table = icebergCatalog.registerTable(tableName, metadataFile);
89+
Table table = icebergCatalog.registerTable(tableName, metadataFile, forceOption);
8590
Long currentSnapshotId = null;
8691
Long totalDataFiles = null;
8792
Long totalRecords = null;

spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRegisterTableProcedure.java

+39
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424
import org.apache.iceberg.HasTableOperations;
2525
import org.apache.iceberg.Table;
26+
import org.apache.iceberg.exceptions.AlreadyExistsException;
2627
import org.apache.iceberg.hive.HiveTableOperations;
2728
import org.apache.iceberg.spark.Spark3Util;
2829
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -86,4 +87,42 @@ public void testRegisterTable() throws NoSuchTableException, ParseException {
8687
Assert.assertEquals("Should have the right datafile count in the procedure result",
8788
originalFileCount, result.get(0)[2]);
8889
}
90+
91+
@Test
92+
public void testForceRegisterTable() throws NoSuchTableException, ParseException {
93+
Assume.assumeTrue("Register only implemented on Hive Catalogs",
94+
spark.conf().get("spark.sql.catalog." + catalogName + ".type").equals("hive"));
95+
96+
long numRows1 = 100;
97+
long numRows2 = 200;
98+
sql("CREATE TABLE %s (id int, data string) using ICEBERG", tableName);
99+
100+
spark.range(0, numRows1)
101+
.withColumn("data", functions.col("id").cast(DataTypes.StringType))
102+
.writeTo(tableName)
103+
.append();
104+
Table table = Spark3Util.loadIcebergTable(spark, tableName);
105+
String metaLocation = ((HiveTableOperations) (((HasTableOperations) table).operations())).currentMetadataLocation();
106+
spark.range(0, numRows2)
107+
.withColumn("data", functions.col("id").cast(DataTypes.StringType))
108+
.writeTo(tableName)
109+
.append();
110+
table = Spark3Util.loadIcebergTable(spark, tableName);
111+
String newMetalocation = ((HiveTableOperations) (((HasTableOperations) table).operations()))
112+
.currentMetadataLocation();
113+
114+
115+
sql("CALL %s.system.register_table('%s', '%s')", catalogName, targetName, metaLocation);
116+
List<Object[]> oldResults = sql("SELECT * FROM %s", targetName);
117+
sql("CALL %s.system.register_table('%s', '%s', 'force')", catalogName, targetName, newMetalocation);
118+
List<Object[]> newResults = sql("SELECT * FROM %s", targetName);
119+
120+
Assert.assertEquals("Should have the right row count in the procedure result",
121+
numRows1, oldResults.size());
122+
Assert.assertEquals("Should have the right row count in the procedure result",
123+
numRows1 + numRows2, newResults.size());
124+
Assert.assertThrows("Can't repeat the register table without the force option.",
125+
AlreadyExistsException.class,
126+
() -> sql("CALL %s.system.register_table('%s', '%s')", catalogName, targetName, newMetalocation));
127+
}
89128
}

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@
3838

3939
class RegisterTableProcedure extends BaseProcedure {
4040
private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{
41-
ProcedureParameter.required("table", DataTypes.StringType),
42-
ProcedureParameter.required("metadata_file", DataTypes.StringType)
41+
ProcedureParameter.required("table", DataTypes.StringType),
42+
ProcedureParameter.required("metadata_file", DataTypes.StringType),
43+
ProcedureParameter.optional("force", DataTypes.StringType)
4344
};
4445

4546
private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
@@ -75,13 +76,17 @@ public StructType outputType() {
7576
public InternalRow[] call(InternalRow args) {
7677
TableIdentifier tableName = Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), "table"));
7778
String metadataFile = args.getString(1);
79+
boolean forceOption = false;
80+
if (!args.isNullAt(2) && args.getString(2).equalsIgnoreCase("force")) {
81+
forceOption = true;
82+
}
7883
Preconditions.checkArgument(tableCatalog() instanceof HasIcebergCatalog,
7984
"Cannot use Register Table in a non-Iceberg catalog");
8085
Preconditions.checkArgument(metadataFile != null && !metadataFile.isEmpty(),
8186
"Cannot handle an empty argument metadata_file");
8287

8388
Catalog icebergCatalog = ((HasIcebergCatalog) tableCatalog()).icebergCatalog();
84-
Table table = icebergCatalog.registerTable(tableName, metadataFile);
89+
Table table = icebergCatalog.registerTable(tableName, metadataFile, forceOption);
8590
Long currentSnapshotId = null;
8691
Long totalDataFiles = null;
8792
Long totalRecords = null;

0 commit comments

Comments
 (0)