Skip to content

Commit

Permalink
[apache#3856] feat(core): Support embedded JDBC storage backend (apac…
Browse files Browse the repository at this point in the history
…he#3922)

### What changes were proposed in this pull request?

Add support for an embedded JDBC storage backend.

### Why are the changes needed?

We plan to replace RocksDB KV backend with an embedded JDBC backend as
the default storage backend.

Fix: apache#3856

### Does this PR introduce _any_ user-facing change?

N/A

### How was this patch tested?

Existing UT
  • Loading branch information
yuqi1129 authored Jun 27, 2024
1 parent ec3a555 commit f5f092f
Show file tree
Hide file tree
Showing 23 changed files with 496 additions and 82 deletions.
1 change: 1 addition & 0 deletions LICENSE.bin
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@
JTA
AOP Alliance Repackaged
OSGi Resource Locator
H2 Database Engine

This product bundles various third-party components also under the
Eclipse Public License 2.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
package com.datastrato.gravitino.catalog.hadoop;

import static com.datastrato.gravitino.Configs.DEFAULT_ENTITY_KV_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_KV_ROCKSDB_BACKEND_PATH;
import static com.datastrato.gravitino.Configs.ENTITY_KV_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_STORE;
import static com.datastrato.gravitino.Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH;
import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME;
import static com.datastrato.gravitino.catalog.hadoop.HadoopCatalog.CATALOG_PROPERTIES_META;
Expand Down Expand Up @@ -107,9 +107,9 @@ public static void setUp() {
Mockito.when(config.get(ENTITY_STORE)).thenReturn("kv");
Mockito.when(config.get(ENTITY_KV_STORE)).thenReturn(DEFAULT_ENTITY_KV_STORE);
Mockito.when(config.get(Configs.ENTITY_SERDE)).thenReturn("proto");
Mockito.when(config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH)).thenReturn(ROCKS_DB_STORE_PATH);
Mockito.when(config.get(ENTITY_KV_ROCKSDB_BACKEND_PATH)).thenReturn(ROCKS_DB_STORE_PATH);

Assertions.assertEquals(ROCKS_DB_STORE_PATH, config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH));
Assertions.assertEquals(ROCKS_DB_STORE_PATH, config.get(ENTITY_KV_ROCKSDB_BACKEND_PATH));
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

import static com.datastrato.gravitino.Catalog.Type.MESSAGING;
import static com.datastrato.gravitino.Configs.DEFAULT_ENTITY_KV_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_KV_ROCKSDB_BACKEND_PATH;
import static com.datastrato.gravitino.Configs.ENTITY_KV_STORE;
import static com.datastrato.gravitino.Configs.ENTITY_STORE;
import static com.datastrato.gravitino.Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH;
import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME;
import static com.datastrato.gravitino.StringIdentifier.ID_KEY;
Expand Down Expand Up @@ -100,9 +100,9 @@ public static void setUp() {
Mockito.when(config.get(ENTITY_STORE)).thenReturn("kv");
Mockito.when(config.get(ENTITY_KV_STORE)).thenReturn(DEFAULT_ENTITY_KV_STORE);
Mockito.when(config.get(Configs.ENTITY_SERDE)).thenReturn("proto");
Mockito.when(config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH)).thenReturn(ROCKS_DB_STORE_PATH);
Mockito.when(config.get(ENTITY_KV_ROCKSDB_BACKEND_PATH)).thenReturn(ROCKS_DB_STORE_PATH);

Assertions.assertEquals(ROCKS_DB_STORE_PATH, config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH));
Assertions.assertEquals(ROCKS_DB_STORE_PATH, config.get(ENTITY_KV_ROCKSDB_BACKEND_PATH));
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L);

Expand Down
11 changes: 10 additions & 1 deletion core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
implementation(libs.commons.io)
implementation(libs.commons.lang3)
implementation(libs.guava)
implementation(libs.h2db)
implementation(libs.mybatis)
implementation(libs.protobuf.java.util) {
exclude("com.google.guava", "guava")
Expand All @@ -36,10 +37,18 @@ dependencies {
testCompileOnly(libs.lombok)

testImplementation(libs.awaitility)
testImplementation(libs.h2db)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.mockito.core)

testRuntimeOnly(libs.junit.jupiter.engine)
}

tasks.test {
val testMode = project.properties["testMode"] as? String ?: "embedded"
if (testMode == "embedded") {
environment("GRAVITINO_HOME", project.rootDir.path)
} else {
environment("GRAVITINO_HOME", project.rootDir.path + "/distribution/package")
}
}
36 changes: 30 additions & 6 deletions core/src/main/java/com/datastrato/gravitino/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ private Configs() {}
public static final String ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD_KEY =
"gravitino.entity.store.relational.jdbcPassword";

public static final String ENTITY_RELATIONAL_JDBC_BACKEND_STORAGE_PATH_KEY =
"gravitino.entity.store.relational.storagePath";

public static final String ENTITY_KV_ROCKSDB_BACKEND_PATH_KEY =
"gravitino.entity.store.kv.rocksdbPath";

Expand Down Expand Up @@ -66,6 +69,17 @@ private Configs() {}
public static final String DEFAULT_KV_ROCKSDB_BACKEND_PATH =
String.join(File.separator, System.getenv("GRAVITINO_HOME"), "data", "rocksdb");

public static final String DEFAULT_RELATIONAL_JDBC_BACKEND_PATH =
String.join(File.separator, System.getenv("GRAVITINO_HOME"), "data", "jdbc");

public static final String DEFAULT_RELATIONAL_JDBC_BACKEND_URL = "jdbc:h2";

public static final String DEFAULT_RELATIONAL_JDBC_BACKEND_DRIVER = "org.h2.Driver";

public static final String DEFAULT_RELATIONAL_JDBC_BACKEND_USERNAME = "gravitino";

public static final String DEFAULT_RELATIONAL_JDBC_BACKEND_PASSWORD = "gravitino";

public static final int GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT = 100;
public static final long MAX_NODE_IN_MEMORY = 100000L;

Expand Down Expand Up @@ -101,33 +115,43 @@ private Configs() {}
.version(ConfigConstants.VERSION_0_5_0)
.stringConf()
.checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG)
.create();
.createWithDefault(DEFAULT_RELATIONAL_JDBC_BACKEND_URL);

public static final ConfigEntry<String> ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER =
new ConfigBuilder(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER_KEY)
.doc("Driver Name of `JDBCBackend`")
.version(ConfigConstants.VERSION_0_5_0)
.stringConf()
.checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG)
.create();
.createWithDefault(DEFAULT_RELATIONAL_JDBC_BACKEND_DRIVER);

public static final ConfigEntry<String> ENTITY_RELATIONAL_JDBC_BACKEND_USER =
new ConfigBuilder(ENTITY_RELATIONAL_JDBC_BACKEND_USER_KEY)
.doc("Username of `JDBCBackend`")
.version(ConfigConstants.VERSION_0_5_0)
.stringConf()
.checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG)
.create();
.createWithDefault(DEFAULT_RELATIONAL_JDBC_BACKEND_USERNAME);

public static final ConfigEntry<String> ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD =
new ConfigBuilder(ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD_KEY)
.doc("Password of `JDBCBackend`")
.version(ConfigConstants.VERSION_0_5_0)
.stringConf()
.checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG)
.create();
.createWithDefault(DEFAULT_RELATIONAL_JDBC_BACKEND_PASSWORD);

public static final ConfigEntry<String> ENTITY_RELATIONAL_JDBC_BACKEND_PATH =
new ConfigBuilder(ENTITY_RELATIONAL_JDBC_BACKEND_STORAGE_PATH_KEY)
.doc(
"The storage path for JDBC storage implementation. It supports both absolute and"
+ " relative path, if the value is a relative path, the final path is "
+ "`${GRAVITINO_HOME}/${PATH_YOU_HAVA_SET}`, default value is "
+ "`${GRAVITINO_HOME}/data/jdbc`")
.version(ConfigConstants.VERSION_0_6_0)
.stringConf()
.createWithDefault(DEFAULT_RELATIONAL_JDBC_BACKEND_PATH);

public static final ConfigEntry<String> ENTRY_KV_ROCKSDB_BACKEND_PATH =
public static final ConfigEntry<String> ENTITY_KV_ROCKSDB_BACKEND_PATH =
new ConfigBuilder(ENTITY_KV_ROCKSDB_BACKEND_PATH_KEY)
.doc(
"The storage path for RocksDB storage implementation. It supports both absolute and"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private RocksDB initRocksDB(Config config) throws RocksDBException {

@VisibleForTesting
String getStoragePath(Config config) {
String dbPath = config.get(Configs.ENTRY_KV_ROCKSDB_BACKEND_PATH);
String dbPath = config.get(Configs.ENTITY_KV_ROCKSDB_BACKEND_PATH);
if (StringUtils.isBlank(dbPath)) {
return Configs.DEFAULT_KV_ROCKSDB_BACKEND_PATH;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.datastrato.gravitino.meta.TopicEntity;
import com.datastrato.gravitino.meta.UserEntity;
import com.datastrato.gravitino.storage.relational.converters.SQLExceptionConverterFactory;
import com.datastrato.gravitino.storage.relational.database.H2Database;
import com.datastrato.gravitino.storage.relational.service.CatalogMetaService;
import com.datastrato.gravitino.storage.relational.service.FilesetMetaService;
import com.datastrato.gravitino.storage.relational.service.GroupMetaService;
Expand All @@ -37,8 +38,10 @@
import com.datastrato.gravitino.storage.relational.service.TopicMetaService;
import com.datastrato.gravitino.storage.relational.service.UserMetaService;
import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
Expand All @@ -49,9 +52,17 @@
*/
public class JDBCBackend implements RelationalBackend {

private static final Map<JDBCBackendType, String> EMBEDDED_JDBC_DATABASE_MAP =
ImmutableMap.of(JDBCBackendType.H2, H2Database.class.getCanonicalName());

// Database instance of this JDBCBackend.
private JDBCDatabase jdbcDatabase;

/** Initialize the jdbc backend instance. */
@Override
public void initialize(Config config) {
jdbcDatabase = startJDBCDatabaseIfNecessary(config);

SqlSessionFactoryHelper.getInstance().init(config);
SQLExceptionConverterFactory.initConverter(config);
}
Expand Down Expand Up @@ -279,5 +290,53 @@ public int deleteOldVersionData(Entity.EntityType entityType, long versionRetent
@Override
public void close() throws IOException {
SqlSessionFactoryHelper.getInstance().close();

if (jdbcDatabase != null) {
jdbcDatabase.close();
}
}

enum JDBCBackendType {
H2(true),
MYSQL(false);

private final boolean embedded;

JDBCBackendType(boolean embedded) {
this.embedded = embedded;
}

public static JDBCBackendType fromURI(String jdbcURI) {
if (jdbcURI.startsWith("jdbc:h2")) {
return JDBCBackendType.H2;
} else if (jdbcURI.startsWith("jdbc:mysql")) {
return JDBCBackendType.MYSQL;
} else {
throw new IllegalArgumentException("Unknown JDBC URI: " + jdbcURI);
}
}
}

/** Start JDBC database if necessary. For example, start the H2 database if the backend is H2. */
private static JDBCDatabase startJDBCDatabaseIfNecessary(Config config) {
String jdbcUrl = config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL);
JDBCBackendType jdbcBackendType = JDBCBackendType.fromURI(jdbcUrl);

// Not an embedded database.
if (!jdbcBackendType.embedded) {
return null;
}

try {
JDBCDatabase jdbcDatabase =
(JDBCDatabase)
Class.forName(EMBEDDED_JDBC_DATABASE_MAP.get(jdbcBackendType))
.getDeclaredConstructor()
.newInstance();
jdbcDatabase.initialize(config);
return jdbcDatabase;
} catch (Exception e) {
throw new RuntimeException("Failed to create and initialize JDBCBackend.", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.storage.relational;

import com.datastrato.gravitino.Config;
import java.io.Closeable;

public interface JDBCDatabase extends Closeable {

/**
* Initializes the Relational database environment with the provided configuration.
*
* @param config The configuration for the database backend.
* @throws RuntimeException
*/
void initialize(Config config);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.storage.relational.database;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Configs;
import com.datastrato.gravitino.storage.relational.JDBCDatabase;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class H2Database implements JDBCDatabase {
private static final Logger LOG = LoggerFactory.getLogger(H2Database.class);
private String h2ConnectionUri;
private String username;
private String password;

@Override
public void initialize(Config config) {
this.h2ConnectionUri = startH2Database(config);
}

public String startH2Database(Config config) {
String gravitinoHome = System.getenv("GRAVITINO_HOME");
String storagePath = getStoragePath(config);
String originalJDBCUrl = config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL);
this.username = config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER);
this.password = config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD);

String connectionUrl = constructH2URI(originalJDBCUrl, storagePath);

try (Connection connection = DriverManager.getConnection(connectionUrl, username, password);
Statement statement = connection.createStatement()) {
String sqlContent =
FileUtils.readFileToString(
new File(gravitinoHome + "/scripts/h2/schema-h2.sql"), StandardCharsets.UTF_8);

statement.execute(sqlContent);
} catch (Exception e) {
LOG.error("Failed to create table for H2 database.", e);
throw new RuntimeException("Failed to create table for H2 database.", e);
}

config.set(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL, connectionUrl);

return connectionUrl;
}

private static String constructH2URI(String originURI, String storagePath) {
if (!originURI.contains(":file:")) {
originURI = "jdbc:h2:file:" + storagePath;
}

if (!originURI.contains("DB_CLOSE_DELAY")) {
originURI = originURI + ";DB_CLOSE_DELAY=-1";
}

if (!originURI.contains("MODE")) {
originURI = originURI + ";MODE=MYSQL";
}

return originURI;
}

private static String getStoragePath(Config config) {
String dbPath = config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PATH);
if (StringUtils.isBlank(dbPath)) {
return Configs.DEFAULT_RELATIONAL_JDBC_BACKEND_PATH;
}

Path path = Paths.get(dbPath);
// Relative Path
if (!path.isAbsolute()) {
path = Paths.get(System.getenv("GRAVITINO_HOME"), dbPath);
return path.toString();
}

return dbPath;
}

@Override
public void close() throws IOException {
try (Connection connection = DriverManager.getConnection(h2ConnectionUri, username, password);
Statement statement = connection.createStatement()) {
statement.execute("SHUTDOWN");
} catch (Exception e) {
LOG.error("Failed to shutdown H2 database.", e);
throw new RuntimeException("Failed to shutdown H2 database.", e);
}
}
}
Loading

0 comments on commit f5f092f

Please sign in to comment.