Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#121] feat(storage): Implement RocksDB storage backend for EntityStore #125

Merged
merged 17 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ dependencies {
implementation(libs.commons.lang3)
implementation(libs.commons.io)
implementation(libs.caffeine)
implementation(libs.rocksdbjni)

compileOnly(libs.lombok)
compileOnly(libs.lombok)
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved
annotationProcessor(libs.lombok)
testCompileOnly(libs.lombok)
testAnnotationProcessor(libs.lombok)
Expand Down
27 changes: 24 additions & 3 deletions core/src/main/java/com/datastrato/graviton/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,35 @@
import com.datastrato.graviton.config.ConfigEntry;

public interface Configs {
String DEFUALT_ENTITY_STORE = "kv";
String ENTITY_STORE_KEY = "graviton.entity.store";

String DEFUALT_ENTITY_KV_STORE = "rocksdb";
String ENTITY_KV_STORE_KEY = "graviton.entity.store.kv";

String ENTITY_KV_ROCKSDB_BACKEND_PATH_KEY = "graviton.entity.store.kv.rocskdb.path";
String DEFAULT_KV_ROCKSDB_BACKEND_PATH = "/tmp/graviton";

ConfigEntry<String> ENTITY_STORE =
new ConfigBuilder("graviton.entity.store")
new ConfigBuilder(ENTITY_STORE_KEY)
.doc("The entity store to use")
.version("0.1.0")
.stringConf()
// TODO. Change this when we have a EntityStore implementation. @Jerry
.createWithDefault("in-memory");
.createWithDefault(DEFUALT_ENTITY_STORE);

ConfigEntry<String> ENTITY_KV_STORE =
new ConfigBuilder(ENTITY_KV_STORE_KEY)
.doc("The kv entity store to use")
.version("0.1.0")
.stringConf()
.createWithDefault(DEFUALT_ENTITY_KV_STORE);

ConfigEntry<String> ENTRY_KV_ROCKSDB_BACKEND_PATH =
new ConfigBuilder(ENTITY_KV_ROCKSDB_BACKEND_PATH_KEY)
.doc("The RocksDB backend path for entity store")
.version("0.1.0")
.stringConf()
.createWithDefault(DEFAULT_KV_ROCKSDB_BACKEND_PATH);

ConfigEntry<String> ENTITY_SERDE =
new ConfigBuilder("graviton.entity.serde")
Expand Down
36 changes: 34 additions & 2 deletions core/src/main/java/com/datastrato/graviton/EntityStoreFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
*/
package com.datastrato.graviton;

import com.google.common.collect.ImmutableMap;
import static com.datastrato.graviton.Configs.ENTITY_KV_STORE;

import com.datastrato.graviton.storage.kv.KvBackend;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -15,7 +19,19 @@ public class EntityStoreFactory {

// Register EntityStore's short name to its full qualified class name in the map. So that user
// don't need to specify the full qualified class name when creating an EntityStore.
private static final Map<String, String> ENTITY_STORES = ImmutableMap.of();
private static final Map<String, String> ENTITY_STORES =
new HashMap<String, String>() {
{
put("kv", "com.datastrato.graviton.storage.kv.KvEntityStore");
}
};

public static final Map<String, String> KV_BACKENDS =
new HashMap<String, String>() {
{
put("rocksdb", "com.datastrato.graviton.storage.kv.RocksDBKvBackend");
}
};
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved

private EntityStoreFactory() {}

Expand All @@ -32,4 +48,20 @@ public static EntityStore createEntityStore(Config config) {
throw new RuntimeException("Failed to create and initialize EntityStore: " + name, e);
}
}

public static KvBackend createKvEntityBackend(Config config) {
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved
String backendName = config.get(ENTITY_KV_STORE);
String className = KV_BACKENDS.get(backendName);
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved
if (Objects.isNull(className)) {
throw new RuntimeException("Unsupported backend type..." + backendName);
}

try {
return (KvBackend) Class.forName(className).newInstance();
} catch (Exception e) {
LOG.error("Failed to create and initialize KvBackend by name '{}'.", backendName, e);
throw new RuntimeException(
"Failed to create and initialize KvBackend by name: " + backendName, e);
}
}
}
27 changes: 25 additions & 2 deletions core/src/main/java/com/datastrato/graviton/meta/CatalogEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
import com.datastrato.graviton.Field;
import com.datastrato.graviton.HasIdentifier;
import com.datastrato.graviton.Namespace;
import com.google.common.base.Objects;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

@EqualsAndHashCode
@ToString
public class CatalogEntity implements Entity, Auditable, HasIdentifier {

Expand Down Expand Up @@ -135,4 +134,28 @@ public CatalogEntity build() {
return catalog;
}
}

/** Attention, this method ignores the namespace field in the comparison */
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CatalogEntity that = (CatalogEntity) o;
return Objects.equal(id, that.id)
&& Objects.equal(metalakeId, that.metalakeId)
&& Objects.equal(name, that.name)
&& type == that.type
&& Objects.equal(comment, that.comment)
&& Objects.equal(properties, that.properties)
&& Objects.equal(auditInfo, that.auditInfo);
}

@Override
public int hashCode() {
return Objects.hashCode(id, metalakeId, name, type, comment, properties, auditInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.graviton.proto;

import com.datastrato.graviton.meta.AuditInfo;
import com.datastrato.graviton.meta.CatalogEntity;

public class CatalogEntitySerDe implements ProtoSerDe<CatalogEntity, Catalog> {
@Override
public Catalog serialize(CatalogEntity catalogEntity) {
Catalog.Builder builder =
Catalog.newBuilder()
.setId(catalogEntity.getId())
.setName(catalogEntity.name())
.setAuditInfo(new AuditInfoSerDe().serialize((AuditInfo) catalogEntity.auditInfo()));

if (catalogEntity.getComment() != null) {
builder.setComment(catalogEntity.getComment());
}

if (catalogEntity.getProperties() != null && !catalogEntity.getProperties().isEmpty()) {
builder.putAllProperties(catalogEntity.getProperties());
}

com.datastrato.graviton.proto.Catalog.Type type =
com.datastrato.graviton.proto.Catalog.Type.valueOf(catalogEntity.getType().name());
builder.setType(type);
builder.setMetalakeId(catalogEntity.getMetalakeId());

// Attention we have ignored namespace field here
return builder.build();
}

@Override
public CatalogEntity deserialize(Catalog p) {
CatalogEntity.Builder builder = new CatalogEntity.Builder();
builder
.withId(p.getId())
.withName(p.getName())
.withAuditInfo(new AuditInfoSerDe().deserialize(p.getAuditInfo()));

if (p.hasComment()) {
builder.withComment(p.getComment());
}

if (p.getPropertiesCount() > 0) {
builder.withProperties(p.getPropertiesMap());
}

builder.withType(com.datastrato.graviton.Catalog.Type.valueOf(p.getType().name()));
builder.withMetalakeId(p.getMetalakeId());
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ public class ProtoEntitySerDe implements EntitySerDe {
"com.datastrato.graviton.meta.AuditInfo",
"com.datastrato.graviton.proto.AuditInfoSerDe",
"com.datastrato.graviton.meta.BaseMetalake",
"com.datastrato.graviton.proto.BaseMetalakeSerDe");
"com.datastrato.graviton.proto.BaseMetalakeSerDe",
"com.datastrato.graviton.meta.CatalogEntity",
"com.datastrato.graviton.proto.CatalogEntitySerDe");

private static final Map<String, String> ENTITY_TO_PROTO =
ImmutableMap.of(
"com.datastrato.graviton.meta.AuditInfo", "com.datastrato.graviton.proto.AuditInfo",
"com.datastrato.graviton.meta.BaseMetalake", "com.datastrato.graviton.proto.Metalake");
"com.datastrato.graviton.meta.BaseMetalake", "com.datastrato.graviton.proto.Metalake",
"com.datastrato.graviton.meta.CatalogEntity", "com.datastrato.graviton.proto.Catalog");

private final Map<Class<? extends Entity>, ProtoSerDe<? extends Entity, ? extends Message>>
entityToSerDe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,32 @@

package com.datastrato.graviton.storage.kv;

import static com.datastrato.graviton.EntityStoreFactory.createKvEntityBackend;

import com.datastrato.graviton.Config;
import com.datastrato.graviton.Configs;
import com.datastrato.graviton.Entity;
import com.datastrato.graviton.EntityAlreadyExistsException;
import com.datastrato.graviton.EntitySerDe;
import com.datastrato.graviton.EntitySerDeFactory;
import com.datastrato.graviton.EntityStore;
import com.datastrato.graviton.HasIdentifier;
import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.Namespace;
import com.datastrato.graviton.NoSuchEntityException;
import com.datastrato.graviton.util.Bytes;
import com.datastrato.graviton.util.Executable;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.tuple.Pair;

/**
* KV store to store entities. This means we can store entities in a key value store. i.e. RocksDB,
* Cassandra, etc. If you want to use a different backend, you can implement the {@link
* com.datastrato.graviton.storage.kv.KvBackend} interface
* Cassandra, etc. If you want to use a different backend, you can implement the {@link KvBackend}
* interface
*/
public class KvEntityStore implements EntityStore {
private KvBackend backend;
Expand All @@ -34,7 +42,17 @@ public class KvEntityStore implements EntityStore {

@Override
public void initialize(Config config) throws RuntimeException {
// TODO
try {
this.backend = createKvEntityBackend(config);
this.backend.initialize(config);

EntitySerDe serDe = EntitySerDeFactory.createEntitySerDe(config.get(Configs.ENTITY_SERDE));
this.setSerDe(serDe);
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved
this.lock = new ReentrantLock();
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved
this.entityKeyEncoder = new CustomEntityKeyEncoder();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
Expand All @@ -45,31 +63,72 @@ public void setSerDe(EntitySerDe entitySerDe) {
@Override
public <E extends Entity & HasIdentifier> List<E> list(Namespace namespace, Class<E> e)
throws IOException {
// TODO
return null;
byte[] startKey = entityKeyEncoder.encode(namespace);
byte[] endKey = Bytes.increment(Bytes.wrap(startKey)).get();
List<Pair<byte[], byte[]>> kvs =
backend.scan(
new KvRangeScan.KvRangeScanBuilder()
.start(startKey)
.end(endKey)
.startInclusive(true)
.endInclusive(false)
.limit(Integer.MAX_VALUE)
.build());

List<E> entities = Lists.newArrayList();
for (Pair<byte[], byte[]> pairs : kvs) {
entities.add(serDe.deserialize(pairs.getRight(), e));
}
// TODO (yuqi), if the list is too large, we need to do pagination or streaming
return entities;
}

@Override
public boolean exists(NameIdentifier ident) throws IOException {
return false;
return backend.get(entityKeyEncoder.encode(ident)) != null;
}

@Override
public <E extends Entity & HasIdentifier> void put(NameIdentifier ident, E e, boolean overwritten)
throws IOException, EntityAlreadyExistsException {
// TODO
// Simple implementation, just use the entity's identifier as the key
byte[] key = entityKeyEncoder.encode(ident);
byte[] value = serDe.serialize(e);

executeInTransaction(
() -> {
if (overwritten) {
backend.put(key, value);
} else {
byte[] origin = backend.get(key);
if (origin == null) {
backend.put(key, value);
} else {
throw new EntityAlreadyExistsException(ident.toString());
}
}
return null;
});
}

@Override
public <E extends Entity & HasIdentifier> E get(NameIdentifier ident, Class<E> type)
throws NoSuchEntityException, IOException {
// TODO
return null;
byte[] key = entityKeyEncoder.encode(ident);
byte[] value = backend.get(key);
if (value == null) {
throw new NoSuchEntityException(ident.toString());
}
return serDe.deserialize(value, type);
}

@Override
public boolean delete(NameIdentifier ident) throws IOException {
return false;
return executeInTransaction(
() -> {
byte[] key = entityKeyEncoder.encode(ident);
return backend.delete(key);
});
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Loading