Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Implement BaseMetastoreCatalog.registerTable() #5037

Merged
merged 12 commits into from
Jul 22, 2022
21 changes: 21 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -65,6 +66,26 @@ public Table loadTable(TableIdentifier identifier) {
return result;
}

@Override
public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
Preconditions.checkArgument(
(identifier != null) && isValidIdentifier(identifier), "Invalid identifier: %s", identifier);
Preconditions.checkArgument(metadataFileLocation != null && !metadataFileLocation.isEmpty(),
"Cannot register an empty metadata file location as a table");

// Throw an exception if this table already exists in the catalog.
if (tableExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", identifier);
}

TableOperations ops = newTableOps(identifier);
InputFile metadataFile = ops.io().newInputFile(metadataFileLocation);
TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
ops.commit(null, metadata);

return new BaseTable(ops, identifier.toString());
}

@Override
public TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
return new BaseMetastoreCatalogTableBuilder(identifier, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand All @@ -47,6 +50,7 @@
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -589,4 +593,31 @@ public void testTablePropsDefinedAtCatalogLevel() throws IOException {
"table-key5",
table.properties().get("key5"));
}

@Test
public void testRegisterTable() throws IOException {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
TableIdentifier identifier2 = TableIdentifier.of("a", "t2");
HadoopCatalog catalog = hadoopCatalog();
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((HadoopTableOperations) ops).current().metadataFileLocation();
Assertions.assertThat(catalog.registerTable(identifier2, metadataLocation)).isNotNull();
Table newTable = catalog.loadTable(identifier2);
Assertions.assertThat(newTable).isNotNull();
}

@Test
public void testRegisterExistingTable() throws IOException {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
HadoopCatalog catalog = hadoopCatalog();
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((HadoopTableOperations) ops).current().metadataFileLocation();
Assertions.assertThatThrownBy(() -> catalog.registerTable(identifier, metadataLocation))
.isInstanceOf(AlreadyExistsException.class)
.hasMessage("Table already exists: a.t1");
}
}
27 changes: 27 additions & 0 deletions core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -58,6 +60,7 @@
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -639,4 +642,28 @@ public void testConversions() {
Assert.assertEquals(ns, JdbcUtil.stringToNamespace(nsString));
}

@Test
public void testRegisterTable() {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
catalog.dropTable(identifier, false);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((JdbcTableOperations) ops).currentMetadataLocation();
Assertions.assertThat(catalog.registerTable(identifier, metadataLocation)).isNotNull();
Table newTable = catalog.loadTable(identifier);
Assertions.assertThat(newTable).isNotNull();
}

@Test
public void testRegisterExistingTable() {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((JdbcTableOperations) ops).currentMetadataLocation();
Assertions.assertThatThrownBy(() -> catalog.registerTable(identifier, metadataLocation))
.isInstanceOf(AlreadyExistsException.class)
.hasMessage("Table already exists: a.t1");
}
}
30 changes: 30 additions & 0 deletions dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.dell.mock.ecs.EcsS3MockRule;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand All @@ -35,6 +39,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -170,4 +175,29 @@ public void testRenameTable() {
Assert.assertFalse("Old table does not exist", ecsCatalog.tableExists(TableIdentifier.of("a", "t1")));
Assert.assertTrue("New table exists", ecsCatalog.tableExists(TableIdentifier.of("b", "t2")));
}

@Test
public void testRegisterTable() {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
ecsCatalog.createTable(identifier, SCHEMA);
Table registeringTable = ecsCatalog.loadTable(identifier);
ecsCatalog.dropTable(identifier, false);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((EcsTableOperations) ops).currentMetadataLocation();
Assertions.assertThat(ecsCatalog.registerTable(identifier, metadataLocation)).isNotNull();
Table newTable = ecsCatalog.loadTable(identifier);
Assertions.assertThat(newTable).isNotNull();
}

@Test
public void testRegisterExistingTable() {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
ecsCatalog.createTable(identifier, SCHEMA);
Table registeringTable = ecsCatalog.loadTable(identifier);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((EcsTableOperations) ops).currentMetadataLocation();
Assertions.assertThatThrownBy(() -> ecsCatalog.registerTable(identifier, metadataLocation))
.isInstanceOf(AlreadyExistsException.class)
.hasMessage("Table already exists: a.t1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
Expand All @@ -51,7 +49,6 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -234,23 +231,6 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
}
}

@Override
public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
Preconditions.checkArgument(isValidIdentifier(identifier), "Invalid identifier: %s", identifier);

// Throw an exception if this table already exists in the catalog.
if (tableExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", identifier);
}

TableOperations ops = newTableOps(identifier);
InputFile metadataFile = fileIO.newInputFile(metadataFileLocation);
TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
ops.commit(null, metadata);

return new BaseTable(ops, identifier.toString());
}

@Override
public void createNamespace(Namespace namespace, Map<String, String> meta) {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
}
}

String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
String newMetadataLocation = (base == null) && (metadata.metadataFileLocation() != null) ?
metadata.metadataFileLocation() : writeNewMetadata(metadata, currentVersion() + 1);

boolean delete = true;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
Expand All @@ -56,6 +58,7 @@
import org.projectnessie.model.ImmutableTableReference;
import org.projectnessie.model.LogResponse.LogEntry;
import org.projectnessie.model.Operation;
import org.projectnessie.model.Tag;

import static org.apache.iceberg.TableMetadataParser.getFileExtension;
import static org.apache.iceberg.types.Types.NestedField.optional;
Expand Down Expand Up @@ -385,6 +388,81 @@ public void testDropTable() throws IOException {
verifyCommitMetadata();
}

private void testRegister(TableIdentifier identifier, String metadataVersionFiles) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we separate these tests for Nessie specific and general ones?
Could we run the "general" tests on every catalog, and move the testRegisterTable, and testRegisterExistingTable from HiveTableTest to this general place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test cases are specific to Nessie.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the PR moves the HiveCatalog.registerTable to the BaseMetastoreCatalog so it will be available for every Catalog which is inheriting from BaseMetastoreCatalog. So we should have a test for testing all of the catalogs that implement the registerTable that the method is working as expected.
I understand that there are some Nessie specific test cases, but I think the basic functionality should be available and tested for all of the catalogs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add the said test-cases and would ping you then, thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still miss:

  • DynamoDbCatalog
  • GlueCatalog

I do not like the duplicated test code, and I think it would be good to have a general Catalog API test for checking the functionality of the new Catalog implementations. Something like TestCatalog but for all/most of the catalog API methods. We can do it in a different PR, but it would be good to have them. The test for registerTable() is a good candidate for these common tests.

Copy link
Contributor Author

@Mehul2500 Mehul2500 Jul 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have put in test cases for Glue and DynamoDb as "Ignore" because I currently do not have an AWS account for testing them. I working on it.
Also, I would address the general Catalog API test in a follow-up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got my AWS account setup, and have amended the test cases in Glue and DynamoDb Catalog, @pvary please have a look over the test cases.

Copy link
Contributor

@kbendick kbendick Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like the duplicated test code, and I think it would be good to have a general Catalog API test for checking the functionality of the new Catalog implementations. Something like TestCatalog but for all/most of the catalog API methods. We can do it in a different PR, but it would be good to have them. The test for registerTable() is a good candidate for these common tests.

We do have CatalogTests, which is what TestRESTCatalog is built off of. It’s nice in that it has some methods for subclasses to declare if the catalog being tested supports certain features, so individual tests can exit early if they don’t support an optional feature (similar to JUnit Assume).

For example if the catalog in question doesn’t support namespace properties.

Might be good to see if that can be made to fit this need or can otherwise get some ideas from it. I believe Nessie also implements these tests.

Copy link
Contributor Author

@Mehul2500 Mehul2500 Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good suggestion of using CatalogTests, but the duplicated test code referred to under these comments above shall not be resolved using CatalogTests.
Reason:
The duplicated test code is with two of the test cases, i.e., testRegisterTable() and testRegisterExistingTable() in three of the catalog test files.

  • Jdbc Catalog
  • Ecs Catalog
  • Hadoop Catalog

Out of these only the Jdbc catalog is one which can use CatalogTests, others do not use CatalogTests.
Thus, it won't be helpful to consider CatalogTests under this comment.

@pvary Also has mentioned that we must consider a Catalog API test similar to CatalogTests, but this time for all the catalogs,
but as decided we must come up with a follow-up PR, here, let's only focus on the registerTable functionality in BaseMetastoreCatalog.

Assertions.assertThat(catalog.registerTable(identifier, "file:" + metadataVersionFiles)).isNotNull();
Table newTable = catalog.loadTable(identifier);
Assertions.assertThat(newTable).isNotNull();
TableOperations ops = ((HasTableOperations) newTable).operations();
String metadataLocation = ((NessieTableOperations) ops).currentMetadataLocation();
Assertions.assertThat("file:" + metadataVersionFiles).isEqualTo(metadataLocation);
Assertions.assertThat(catalog.dropTable(identifier, false)).isTrue();
}

@Test
public void testRegisterTableWithGivenBranch() {
List<String> metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
Assertions.assertThat(1).isEqualTo(metadataVersionFiles.size());
ImmutableTableReference tableReference =
ImmutableTableReference.builder().reference("main").name(TABLE_NAME).build();
TableIdentifier identifier = TableIdentifier.of(DB_NAME, tableReference.toString());
testRegister(identifier, metadataVersionFiles.get(0));
}

@Test
public void testRegisterTableNegativeScenarios() throws NessieConflictException, NessieNotFoundException {
List<String> metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
Assertions.assertThat(1).isEqualTo(metadataVersionFiles.size());
// Case 1: Branch does not exist
Assertions.assertThatThrownBy(
() -> catalog.registerTable(
TableIdentifier.of(DB_NAME, "`" + TABLE_NAME + "`@default"),
"file:" + metadataVersionFiles.get(0)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Nessie ref 'default' does not exist");
// Case 2: Table Already Exists
Assertions.assertThatThrownBy(() -> catalog.registerTable(TABLE_IDENTIFIER, "file:" + metadataVersionFiles.get(0)))
.isInstanceOf(AlreadyExistsException.class)
.hasMessage("Table already exists: db.tbl");
// Case 3: Registering using a tag
api.createReference().sourceRefName(BRANCH).reference(Tag.of("tag_1", catalog.currentHash())).create();
Assertions.assertThatThrownBy(
() -> catalog.registerTable(
TableIdentifier.of(DB_NAME, "`" + TABLE_NAME + "`@tag_1"),
"file:" + metadataVersionFiles.get(0)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("You can only mutate tables when using a branch without a hash or timestamp.");
// Case 4: non-null metadata path with null metadata location
Assertions.assertThat(catalog.dropTable(TABLE_IDENTIFIER, false)).isTrue();
Assertions.assertThatThrownBy(
() -> catalog.registerTable(TABLE_IDENTIFIER, "file:" + metadataVersionFiles.get(0) + "invalidName"))
.isInstanceOf(NotFoundException.class);
// Case 5: null identifier
Assertions.assertThatThrownBy(
() -> catalog.registerTable(null, "file:" + metadataVersionFiles.get(0) + "invalidName"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid identifier: null");
}

@Test
public void testRegisterTableWithDefaultBranch() {
List<String> metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
Assertions.assertThat(1).isEqualTo(metadataVersionFiles.size());
Assertions.assertThat(catalog.dropTable(TABLE_IDENTIFIER, false)).isTrue();
testRegister(TABLE_IDENTIFIER, metadataVersionFiles.get(0));
}

@Test
public void testRegisterTableMoreThanOneBranch() {
List<String> metadataVersionFiles = metadataVersionFiles(TABLE_NAME);
Assertions.assertThat(1).isEqualTo(metadataVersionFiles.size());
ImmutableTableReference tableReference =
ImmutableTableReference.builder().reference("main").name(TABLE_NAME).build();
TableIdentifier identifier = TableIdentifier.of(DB_NAME, tableReference.toString());
testRegister(identifier, metadataVersionFiles.get(0));
Assertions.assertThat(catalog.dropTable(TABLE_IDENTIFIER, false)).isTrue();
testRegister(TABLE_IDENTIFIER, metadataVersionFiles.get(0));
}

@Test
public void testExistingTableUpdate() {
Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
Expand Down