Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#209]feat(catalog): Hive table entity serde and store support #233

Merged
merged 2 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.datastrato.graviton.rel.TableChange;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -402,26 +403,33 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
EntityStore store = GravitonEnv.getInstance().entityStore();
Namespace schemaNamespace =
Namespace.of(ArrayUtils.add(ident.namespace().levels(), ident.name()));
List<BaseTable> tables = Lists.newArrayList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary initiation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use addAll in line 416

if (!cascade) {
if (listTables(schemaNamespace).length > 0) {
throw new NonEmptySchemaException(
String.format(
"Hive schema (database) %s is not empty. One or more tables exist in Hive metastore.",
ident.name()));
}
// TODO(minghuang): check if there are tables in Graviton store after we support hive table
// serde

try {
tables.addAll(store.list(schemaNamespace, BaseTable.class, TABLE));
} catch (IOException e) {
throw new RuntimeException("Failed to list table from Graviton store", e);
}
if (!tables.isEmpty()) {
throw new NonEmptySchemaException(
String.format(
"Hive schema (database) %s is not empty. One or more tables exist in Graviton store.",
ident.name()));
}
}

try {
store.executeInTransaction(
() -> {
store.delete(ident, SCHEMA);
for (BaseTable t :
store.list(
Namespace.of(ArrayUtils.add(ident.namespace().levels(), ident.name())),
HiveTable.class,
TABLE)) {
for (BaseTable t : tables) {
store.delete(NameIdentifier.of(schemaNamespace, t.name()), TABLE);
}
clientPool.run(
Expand Down Expand Up @@ -531,38 +539,37 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
String.format(
"Cannot support invalid namespace in Hive Metastore: %s", schemaIdent.namespace()));

HiveSchema schema = loadSchema(schemaIdent);
try {
org.apache.hadoop.hive.metastore.api.Table hiveTable =
clientPool.run(c -> c.getTable(schemaIdent.name(), tableIdent.name()));
HiveTable.Builder builder = new HiveTable.Builder();

// TODO: We should also fetch the customized HiveTable entity fields from our own
// underlying storage, like id, auditInfo, etc.
EntityStore store = GravitonEnv.getInstance().entityStore();
BaseTable baseTable = store.get(tableIdent, TABLE, BaseTable.class);

builder =
builder
.withId(1L /* TODO: Fetch id from underlying storage */)
.withSchemaId((Long) schema.fields().get(BaseSchema.ID))
.withId(baseTable.getId())
.withSchemaId(baseTable.getSchemaId())
.withName(tableIdent.name())
.withNameSpace(tableIdent.namespace())
.withAuditInfo(
/* TODO: Fetch audit info from underlying storage */
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build());
.withAuditInfo(baseTable.auditInfo());
HiveTable table = HiveTable.fromInnerTable(hiveTable, builder);

LOG.info("Loaded Hive table {} from Hive Metastore ", tableIdent.name());

return table;
} catch (TException e) {
} catch (NoSuchObjectException e) {
throw new NoSuchTableException(
String.format("Hive table does not exist: %s in Hive Metastore", tableIdent.name()), e);

} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (InterruptedException | TException e) {
throw new RuntimeException(
"Failed to load Hive table " + tableIdent.name() + " from Hive metastore", e);

} catch (IOException e) {
throw new RuntimeException(
"Failed to load Hive table " + tableIdent.name() + " from Graviton store", e);
}
}

Expand Down Expand Up @@ -593,40 +600,47 @@ public Table createTable(
try {
HiveSchema schema = loadSchema(schemaIdent);

HiveTable table =
new HiveTable.Builder()
.withId(1L /* TODO: Use ID generator*/)
.withSchemaId((Long) schema.fields().get(BaseSchema.ID))
.withName(tableIdent.name())
.withNameSpace(tableIdent.namespace())
.withColumns(columns)
.withComment(comment)
.withProperties(properties)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build())
.build();
clientPool.run(
c -> {
c.createTable(table.toInnerTable());
return null;
});

// TODO. We should also store the customized HiveTable entity fields into our own
// underlying storage, like id, auditInfo, etc.
EntityStore store = GravitonEnv.getInstance().entityStore();
HiveTable hiveTable =
store.executeInTransaction(
() -> {
HiveTable createdTable =
new HiveTable.Builder()
.withId(1L /* TODO: Use ID generator*/)
.withSchemaId(schema.getId())
.withName(tableIdent.name())
.withNameSpace(tableIdent.namespace())
.withColumns(columns)
.withComment(comment)
.withProperties(properties)
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build())
.build();
store.put(createdTable, false);
clientPool.run(
c -> {
c.createTable(createdTable.toInnerTable());
return null;
});
return createdTable;
});

LOG.info("Created Hive table {} in Hive Metastore", tableIdent.name());

return table;
return hiveTable;

} catch (AlreadyExistsException e) {
} catch (AlreadyExistsException | EntityAlreadyExistsException e) {
throw new TableAlreadyExistsException("Table already exists: " + tableIdent.name(), e);
} catch (TException e) {
} catch (TException | InterruptedException e) {
throw new RuntimeException(
"Failed to create Hive table " + tableIdent.name() + " in Hive Metastore", e);
} catch (InterruptedException e) {
} catch (IOException e) {
throw new RuntimeException(
"Failed to create Hive table " + tableIdent.name() + " in Graviton store", e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -704,36 +718,49 @@ public Table alterTable(NameIdentifier tableIdent, TableChange... changes)
}
}

clientPool.run(
c -> {
c.alter_table(schemaIdent.name(), tableIdent.name(), alteredHiveTable);
return null;
});
EntityStore store = GravitonEnv.getInstance().entityStore();
HiveTable updatedTable =
store.executeInTransaction(
() -> {
HiveTable.Builder builder = new HiveTable.Builder();
builder =
builder
.withId(table.getId())
.withSchemaId(table.getSchemaId())
.withName(alteredHiveTable.getTableName())
.withNameSpace(tableIdent.namespace())
.withAuditInfo(
new AuditInfo.Builder()
.withCreator(table.auditInfo().creator())
.withCreateTime(table.auditInfo().createTime())
.withLastModifier(currentUser())
.withLastModifiedTime(Instant.now())
.build());
HiveTable alteredTable = HiveTable.fromInnerTable(alteredHiveTable, builder);
store.delete(tableIdent, TABLE);
store.put(alteredTable, false);
clientPool.run(
c -> {
c.alter_table(schemaIdent.name(), tableIdent.name(), alteredHiveTable);
return null;
});
return alteredTable;
});

// TODO(@Minghuang). We should also update the customized HiveTable entity fields into our own
// if necessary
HiveTable.Builder builder = new HiveTable.Builder();
builder =
builder
.withId((Long) table.fields().get(BaseTable.ID))
.withSchemaId((Long) table.fields().get(BaseTable.SCHEMA_ID))
.withName(alteredHiveTable.getTableName())
.withNameSpace(tableIdent.namespace())
.withAuditInfo(
/* TODO(@Minghuang): Fetch audit info from underlying storage */
new AuditInfo.Builder()
.withCreator(currentUser())
.withCreateTime(Instant.now())
.build());
HiveTable alteredTable = HiveTable.fromInnerTable(alteredHiveTable, builder);
LOG.info("Altered Hive table {} in Hive Metastore", tableIdent.name());

return alteredTable;
return updatedTable;

} catch (NoSuchObjectException e) {
throw new NoSuchTableException(
String.format("Hive table does not exist: %s in Hive Metastore", tableIdent.name()), e);
} catch (TException | InterruptedException e) {
throw new RuntimeException(
"Failed to alter Hive table " + tableIdent.name() + " in Hive metastore", e);
} catch (IOException e) {
throw new RuntimeException(
"Failed to alter Hive table " + tableIdent.name() + " in Graviton store", e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -886,24 +913,31 @@ private boolean dropHiveTable(NameIdentifier tableIdent, boolean deleteData, boo
"Cannot support invalid namespace in Hive Metastore: %s", schemaIdent.namespace()));

try {
clientPool.run(
c -> {
c.dropTable(schemaIdent.name(), tableIdent.name(), deleteData, false, ifPurge);
EntityStore store = GravitonEnv.getInstance().entityStore();
store.executeInTransaction(
() -> {
store.delete(tableIdent, TABLE);
clientPool.run(
c -> {
c.dropTable(schemaIdent.name(), tableIdent.name(), deleteData, false, ifPurge);
return null;
});
return null;
});

// TODO. we should also delete the Hive table from our own underlying storage

LOG.info("Dropped Hive table {}", tableIdent.name());
return true;

} catch (NoSuchObjectException e) {
LOG.warn("Hive table {} does not exist in Hive Metastore", tableIdent.name());
return false;
} catch (TException e) {
} catch (TException | InterruptedException e) {
throw new RuntimeException(
"Failed to drop Hive table " + tableIdent.name() + " in Hive Metastore", e);
} catch (InterruptedException e) {
} catch (IOException e) {
throw new RuntimeException(
"Failed to drop Hive table " + tableIdent.name() + " in Graviton store", e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand Down
Loading