Skip to content

Commit

Permalink
[#1529] feat(spark-connector): support table operation (#2133)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1. support basic table DDL and DML operation. (alter table are not
supported)
2. add basic table operation to GravitinoCatalog
3. add GravitinoBaseTable class which get schema from Gravitino and do
IO with internal spark catalog table.

### Why are the changes needed?

Fix: #1529 

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
add spark SQL IT
  • Loading branch information
FANNG1 authored Mar 6, 2024
1 parent 26911d1 commit bb8946f
Show file tree
Hide file tree
Showing 17 changed files with 1,069 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ private void initSparkEnv() {
.config("spark.plugins", GravitinoSparkPlugin.class.getName())
.config(GravitinoSparkConfig.GRAVITINO_URI, gravitinoUri)
.config(GravitinoSparkConfig.GRAVITINO_METALAKE, metalakeName)
.config(
"spark.sql.warehouse.dir",
String.format(
"hdfs://%s:%d/user/hive/warehouse",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HDFS_DEFAULTFS_PORT))
.enableHiveSupport()
.getOrCreate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,23 @@
*/
package com.datastrato.gravitino.integration.test.spark;

import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.IntegerType$;
import org.apache.spark.sql.types.StringType$;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand All @@ -19,9 +32,33 @@
@TestInstance(Lifecycle.PER_CLASS)
public class SparkIT extends SparkEnvIT {

private static final String SELECT_ALL_TEMPLATE = "SELECT * FROM %s";
private static final String INSERT_WITHOUT_PARTITION_TEMPLATE = "INSERT INTO %s VALUES (%s)";

// To generate test data for write&read table.
private static final Map<DataType, String> typeConstant =
ImmutableMap.of(IntegerType$.MODULE$, "2", StringType$.MODULE$, "'gravitino_it_test'");

// Use a custom database not the original default database because SparkIT couldn't read&write
// data to tables in default database. The main reason is default database location is
// determined by `hive.metastore.warehouse.dir` in hive-site.xml which is local HDFS address
// not real HDFS address. The location of tables created under default database is like
// hdfs://localhost:9000/xxx which couldn't read write data from SparkIT. Will use default
// database after spark connector support Alter database xx set location command.
@BeforeAll
void initDefaultDatabase() {
sql("USE " + hiveCatalogName);
createDatabaseIfNotExists(getDefaultDatabase());
}

@BeforeEach
void init() {
sql("USE " + hiveCatalogName);
sql("USE " + getDefaultDatabase());
}

private String getDefaultDatabase() {
return "default_db";
}

@Test
Expand All @@ -33,6 +70,7 @@ void testLoadCatalogs() {
@Test
void testCreateAndLoadSchema() {
String testDatabaseName = "t_create1";
dropDatabaseIfExists(testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName);
Map<String, String> databaseMeta = getDatabaseMetadata(testDatabaseName);
Assertions.assertFalse(databaseMeta.containsKey("Comment"));
Expand All @@ -42,6 +80,7 @@ void testCreateAndLoadSchema() {
Assertions.assertTrue(StringUtils.isBlank(properties));

testDatabaseName = "t_create2";
dropDatabaseIfExists(testDatabaseName);
String testDatabaseLocation = "/tmp/" + testDatabaseName;
sql(
String.format(
Expand Down Expand Up @@ -92,4 +131,189 @@ void testDropSchema() {
Assertions.assertThrowsExactly(
NoSuchNamespaceException.class, () -> sql("DROP DATABASE notExists"));
}

@Test
void testCreateSimpleTable() {
String tableName = "simple_table";
dropTableIfExists(tableName);
createSimpleTable(tableName);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withColumns(getSimpleTableColumn())
.withComment(null);
checker.check(tableInfo);

checkTableReadWrite(tableInfo);
}

@Test
void testCreateTableWithDatabase() {
// test db.table as table identifier
String databaseName = "db1";
String tableName = "table1";
createDatabaseIfNotExists(databaseName);
String tableIdentifier = String.join(".", databaseName, tableName);

createSimpleTable(tableIdentifier);
SparkTableInfo tableInfo = getTableInfo(tableIdentifier);
SparkTableInfoChecker checker =
SparkTableInfoChecker.create().withName(tableName).withColumns(getSimpleTableColumn());
checker.check(tableInfo);
checkTableReadWrite(tableInfo);

// use db then create table with table name
databaseName = "db2";
tableName = "table2";
createDatabaseIfNotExists(databaseName);

sql("USE " + databaseName);
createSimpleTable(tableName);
tableInfo = getTableInfo(tableName);
checker =
SparkTableInfoChecker.create().withName(tableName).withColumns(getSimpleTableColumn());
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
}

@Test
void testCreateTableWithComment() {
String tableName = "comment_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
String tableComment = "tableComment";
createTableSql = String.format("%s COMMENT '%s'", createTableSql, tableComment);
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withColumns(getSimpleTableColumn())
.withComment(tableComment);
checker.check(tableInfo);

checkTableReadWrite(tableInfo);
}

@Test
void testDropTable() {
String tableName = "drop_table";
createSimpleTable(tableName);
Assertions.assertEquals(true, tableExists(tableName));

dropTableIfExists(tableName);
Assertions.assertEquals(false, tableExists(tableName));

Assertions.assertThrowsExactly(NoSuchTableException.class, () -> sql("DROP TABLE not_exists"));
}

@Test
void testRenameTable() {
String tableName = "rename1";
String newTableName = "rename2";
dropTableIfExists(tableName);
dropTableIfExists(newTableName);

createSimpleTable(tableName);
Assertions.assertTrue(tableExists(tableName));
Assertions.assertFalse(tableExists(newTableName));

sql(String.format("ALTER TABLE %s RENAME TO %s", tableName, newTableName));
Assertions.assertTrue(tableExists(newTableName));
Assertions.assertFalse(tableExists(tableName));

// rename to an existing table
createSimpleTable(tableName);
Assertions.assertThrows(
RuntimeException.class,
() -> sql(String.format("ALTER TABLE %s RENAME TO %s", tableName, newTableName)));

// rename a not existing tables
Assertions.assertThrowsExactly(
AnalysisException.class, () -> sql("ALTER TABLE not_exists1 RENAME TO not_exist2"));
}

@Test
void testListTable() {
String table1 = "list1";
String table2 = "list2";
createSimpleTable(table1);
createSimpleTable(table2);
Set<String> tables = listTableNames();
Assertions.assertTrue(tables.contains(table1));
Assertions.assertTrue(tables.contains(table2));

// show tables from not current db
String database = "db_list";
String table3 = "list3";
String table4 = "list4";
createDatabaseIfNotExists(database);
createSimpleTable(String.join(".", database, table3));
createSimpleTable(String.join(".", database, table4));
tables = listTableNames(database);

Assertions.assertTrue(tables.contains(table3));
Assertions.assertTrue(tables.contains(table4));

Assertions.assertThrows(NoSuchNamespaceException.class, () -> listTableNames("not_exists_db"));
}

private void checkTableReadWrite(SparkTableInfo table) {
String name = table.getTableIdentifier();
String insertValues =
table.getColumns().stream()
.map(columnInfo -> typeConstant.get(columnInfo.getType()))
.map(Object::toString)
.collect(Collectors.joining(","));

sql(String.format(INSERT_WITHOUT_PARTITION_TEMPLATE, name, insertValues));

// remove "'" from values, such as 'a' is trans to a
String checkValues =
table.getColumns().stream()
.map(columnInfo -> typeConstant.get(columnInfo.getType()))
.map(Object::toString)
.map(
s -> {
String tmp = org.apache.commons.lang3.StringUtils.removeEnd(s, "'");
tmp = org.apache.commons.lang3.StringUtils.removeStart(tmp, "'");
return tmp;
})
.collect(Collectors.joining(","));

List<String> queryResult =
sql(String.format(SELECT_ALL_TEMPLATE, name)).stream()
.map(
line ->
Arrays.stream(line)
.map(item -> item.toString())
.collect(Collectors.joining(",")))
.collect(Collectors.toList());
Assertions.assertTrue(
queryResult.size() == 1, "Should just one row, table content: " + queryResult);
Assertions.assertEquals(checkValues, queryResult.get(0));
}

private String getCreateSimpleTableString(String tableName) {
return String.format(
"CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', age INT)",
tableName);
}

private List<SparkColumnInfo> getSimpleTableColumn() {
return Arrays.asList(
SparkColumnInfo.of("id", IntegerType$.MODULE$, "id comment"),
SparkColumnInfo.of("name", StringType$.MODULE$, ""),
SparkColumnInfo.of("age", IntegerType$.MODULE$, null));
}

// Helper method to create a simple table, and could use corresponding
// getSimpleTableColumn to check table column.
private void createSimpleTable(String identifier) {
String createTableSql = getCreateSimpleTableString(identifier);
sql(createTableSql);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.integration.test.util.spark;

import com.datastrato.gravitino.spark.ConnectorConstants;
import com.datastrato.gravitino.spark.table.SparkBaseTable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.types.DataType;
import org.junit.jupiter.api.Assertions;

/** SparkTableInfo is used to check the result in test. */
@Data
public class SparkTableInfo {
private String tableName;
private String database;
private String comment;
private List<SparkColumnInfo> columns;
private Map<String, String> tableProperties;
private List<String> unknownItems = new ArrayList<>();

public SparkTableInfo() {}

public String getTableName() {
return tableName;
}

// Include database name and table name
public String getTableIdentifier() {
if (StringUtils.isNotBlank(database)) {
return String.join(".", database, tableName);
} else {
return tableName;
}
}

static SparkTableInfo create(SparkBaseTable baseTable) {
SparkTableInfo sparkTableInfo = new SparkTableInfo();
String identifier = baseTable.name();
String[] items = identifier.split("\\.");
Assertions.assertTrue(
items.length == 2, "Table name format should be $db.$table, but is: " + identifier);
sparkTableInfo.tableName = items[1];
sparkTableInfo.database = items[0];
sparkTableInfo.columns =
Arrays.stream(baseTable.schema().fields())
.map(
sparkField ->
new SparkColumnInfo(
sparkField.name(),
sparkField.dataType(),
sparkField.getComment().isDefined() ? sparkField.getComment().get() : null,
sparkField.nullable()))
.collect(Collectors.toList());
sparkTableInfo.comment = baseTable.properties().remove(ConnectorConstants.COMMENT);
sparkTableInfo.tableProperties = baseTable.properties();
return sparkTableInfo;
}

@Data
public static class SparkColumnInfo {
private String name;
private DataType type;
private String comment;
private boolean isNullable;

private SparkColumnInfo(String name, DataType type, String comment, boolean isNullable) {
this.name = name;
this.type = type;
this.comment = comment;
this.isNullable = isNullable;
}

public static SparkColumnInfo of(String name, DataType type) {
return of(name, type, null);
}

public static SparkColumnInfo of(String name, DataType type, String comment) {
return new SparkColumnInfo(name, type, comment, true);
}

public static SparkColumnInfo of(
String name, DataType type, String comment, boolean isNullable) {
return new SparkColumnInfo(name, type, comment, isNullable);
}
}
}
Loading

0 comments on commit bb8946f

Please sign in to comment.