Skip to content

Add Delete functionality #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/src/main/java/org/vss/KVStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ public interface KVStore {

PutObjectResponse put(PutObjectRequest request);

DeleteObjectResponse delete(DeleteObjectRequest request);

ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request);
}
34 changes: 34 additions & 0 deletions app/src/main/java/org/vss/api/DeleteObjectApi.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.vss.api;

import jakarta.inject.Inject;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.vss.DeleteObjectRequest;
import org.vss.DeleteObjectResponse;
import org.vss.KVStore;

@Path(VssApiEndpoint.DELETE_OBJECT)
@Slf4j
public class DeleteObjectApi extends AbstractVssApi {
@Inject
public DeleteObjectApi(KVStore kvstore) {
super(kvstore);
}

@POST
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Response execute(byte[] payload) {
try {
DeleteObjectRequest request = DeleteObjectRequest.parseFrom(payload);
DeleteObjectResponse response = kvStore.delete(request);
return toResponse(response);
} catch (Exception e) {
log.error("Exception in DeleteObjectApi: ", e);
return toErrorResponse(e);
}
}
}
1 change: 1 addition & 0 deletions app/src/main/java/org/vss/api/VssApiEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
public class VssApiEndpoint {
public static final String GET_OBJECT = "/getObject";
public static final String PUT_OBJECTS = "/putObjects";
public static final String DELETE_OBJECT = "/deleteObject";
public static final String LIST_KEY_VERSIONS = "/listKeyVersions";
}
38 changes: 34 additions & 4 deletions app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.jooq.Insert;
import org.jooq.Query;
import org.jooq.Update;
import org.vss.DeleteObjectRequest;
import org.vss.DeleteObjectResponse;
import org.vss.GetObjectRequest;
import org.vss.GetObjectResponse;
import org.vss.KVStore;
Expand Down Expand Up @@ -66,23 +68,31 @@ public PutObjectResponse put(PutObjectRequest request) {

String storeId = request.getStoreId();

List<VssDbRecord> vssRecords = new ArrayList<>(request.getTransactionItemsList().stream()
List<VssDbRecord> vssPutRecords = new ArrayList<>(request.getTransactionItemsList().stream()
.map(kv -> buildVssRecord(storeId, kv)).toList());

List<VssDbRecord> vssDeleteRecords = new ArrayList<>(request.getDeleteItemsList().stream()
.map(kv -> buildVssRecord(storeId, kv)).toList());

if (request.hasGlobalVersion()) {
VssDbRecord globalVersionRecord = buildVssRecord(storeId,
KeyValue.newBuilder()
.setKey(GLOBAL_VERSION_KEY)
.setVersion(request.getGlobalVersion())
.setValue(ByteString.EMPTY)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there different behavior if this is omitted?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really,
it was just some inconcstency that i noticed and just to make it more explicit.

.build());

vssRecords.add(globalVersionRecord);
vssPutRecords.add(globalVersionRecord);
}

context.transaction((ctx) -> {
DSLContext dsl = ctx.dsl();
List<Query> batchQueries = vssRecords.stream()
.map(vssRecord -> buildPutObjectQuery(dsl, vssRecord)).toList();
List<Query> batchQueries = new ArrayList<>();

batchQueries.addAll(vssPutRecords.stream()
.map(vssRecord -> buildPutObjectQuery(dsl, vssRecord)).toList());
batchQueries.addAll(vssDeleteRecords.stream()
.map(vssRecord -> buildDeleteObjectQuery(dsl, vssRecord)).toList());
Comment on lines +92 to +95
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does there need to be a check for distinct keys across both lists?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will implement request validation as part of api class and not db-implementation.
Currently we assume all requests are valid and there are couple of pending request validations to be added.
For example, we are also not checking if keys in put_items are unique currently or size or number of keys. (will work on this separately)


int[] batchResult = dsl.batch(batchQueries).execute();

Expand All @@ -97,6 +107,12 @@ public PutObjectResponse put(PutObjectRequest request) {
return PutObjectResponse.newBuilder().build();
}

private Query buildDeleteObjectQuery(DSLContext dsl, VssDbRecord vssRecord) {
return dsl.deleteFrom(VSS_DB).where(VSS_DB.STORE_ID.eq(vssRecord.getStoreId())
.and(VSS_DB.KEY.eq(vssRecord.getKey()))
.and(VSS_DB.VERSION.eq(vssRecord.getVersion())));
}

private Query buildPutObjectQuery(DSLContext dsl, VssDbRecord vssRecord) {
return vssRecord.getVersion() == 0 ? buildInsertRecordQuery(dsl, vssRecord)
: buildUpdateRecordQuery(dsl, vssRecord);
Expand Down Expand Up @@ -126,6 +142,20 @@ private VssDbRecord buildVssRecord(String storeId, KeyValue kv) {
.setVersion(kv.getVersion());
}

@Override
public DeleteObjectResponse delete(DeleteObjectRequest request) {
String storeId = request.getStoreId();
VssDbRecord vssDbRecord = buildVssRecord(storeId, request.getKeyValue());

context.transaction((ctx) -> {
DSLContext dsl = ctx.dsl();
Query deleteObjectQuery = buildDeleteObjectQuery(dsl, vssDbRecord);
dsl.execute(deleteObjectQuery);
});

return DeleteObjectResponse.newBuilder().build();
}

@Override
public ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request) {
String storeId = request.getStoreId();
Expand Down
57 changes: 54 additions & 3 deletions app/src/main/proto/vss.proto
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
syntax = "proto3";
package vss;
option java_multiple_files = true;
package org.vss;
option java_package = "org.vss";

// Request payload to be used for `GetObject` API call to server.
message GetObjectRequest {

// store_id is a keyspace identifier.
Expand All @@ -24,12 +26,14 @@ message GetObjectRequest {
string key = 2;
}

// Server response for `GetObject` API.
message GetObjectResponse {

// Fetched value and version along with the corresponding key in the request.
KeyValue value = 2;
}

// Request payload to be used for `PutObject` API call to server.
message PutObjectRequest {

// store_id is a keyspace identifier.
Expand Down Expand Up @@ -65,9 +69,9 @@ message PutObjectRequest {
// Clients can choose to encrypt the keys client-side in order to obfuscate their usage patterns.
// If the write is successful, the previous value corresponding to the key will be overwritten.
//
// Multiple items in transaction_items of a single PutObjectRequest are written in
// Multiple items in transaction_items and delete_items of a single PutObjectRequest are written in
// a database-transaction in an all-or-nothing fashion.
// Items in a single PutObjectRequest must have distinct keys.
// All Items in a single PutObjectRequest must have distinct keys.
//
// Clients are expected to store a version against every key.
// The write will succeed if the current DB version against the key is the same as in the request.
Expand All @@ -93,11 +97,56 @@ message PutObjectRequest {
// All PutObjectRequests are strongly consistent i.e. they provide read-after-write and
// read-after-update consistency guarantees.
repeated KeyValue transaction_items = 3;

// Items to be deleted as a result of this PutObjectRequest.
//
// Each item in the `delete_items` field consists of a key and its corresponding version.
// The version is used to perform a version check before deleting the item.
// The delete will only succeed if the current database version against the key is the same as the version
// specified in the request.
//
// Fails with `CONFLICT_EXCEPTION` as the ErrorCode if:
// * The requested item does not exist.
// * The requested item does exist but there is a version-number mismatch with the one in the database.
//
// Multiple items in the `delete_items` field, along with the `transaction_items`, are written in a
// database transaction in an all-or-nothing fashion.
//
// All items within a single `PutObjectRequest` must have distinct keys.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to do so in this PR, but we should be consistent with using ticks for identifiers. Or possibly not use them at all, for that matter. In Rust, they are used to format the markdown. But that's not the case for proto docs, presumably.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opening another PR to consistently add ticks around all identifiers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of #16

repeated KeyValue delete_items = 4;
}

// Server response for `PutObject` API.
message PutObjectResponse {
}

// Request payload to be used for `DeleteObject` API call to server.
message DeleteObjectRequest {
// store_id is a keyspace identifier.
// Ref: https://en.wikipedia.org/wiki/Keyspace_(distributed_data_store)
// All APIs operate within a single store_id.
// It is up to clients to use single or multiple stores for their use-case.
// This can be used for client-isolation/ rate-limiting / throttling on the server-side.
// Authorization and billing can also be performed at the store_id level.
string store_id = 1;

// Item to be deleted as a result of this DeleteObjectRequest.
//
// An item consists of a key and its corresponding version.
// The item is only deleted if the current database version against the key is the same as the version
// specified in the request.
// This operation is idempotent, that is, multiple delete calls for the same item will not fail.
//
// If the requested item does not exist, this operation will not fail.
// If you wish to perform stricter checks while deleting an item, consider using PutObject API.
KeyValue key_value = 2;
}

// Server response for `DeleteObject` API.
message DeleteObjectResponse{
}

// Request payload to be used for `ListKeyVersions` API call to server.
message ListKeyVersionsRequest {

// store_id is a keyspace identifier.
Expand Down Expand Up @@ -133,6 +182,7 @@ message ListKeyVersionsRequest {
optional string page_token = 4;
}

// Server response for `ListKeyVersions` API.
message ListKeyVersionsResponse {

// Fetched keys and versions.
Expand Down Expand Up @@ -206,6 +256,7 @@ enum ErrorCode {
INTERNAL_SERVER_EXCEPTION = 3;
}

// Represents KeyValue pair to be stored or retrieved.
message KeyValue {

// Key against which the value is stored.
Expand Down
79 changes: 79 additions & 0 deletions app/src/test/java/org/vss/AbstractKVStoreIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,66 @@ void putShouldSucceedWhenNoGlobalVersionIsGiven() {
assertThat(getObject(KVStore.GLOBAL_VERSION_KEY).getVersion(), is(0L));
}

@Test
void putAndDeleteShouldSucceedAsAtomicTransaction() {
assertDoesNotThrow(() -> putObjects(null, List.of(kv("k1", "k1v1", 0))));
// Put and Delete succeeds
assertDoesNotThrow(() -> putAndDeleteObjects(null, List.of(kv("k2", "k2v1", 0)), List.of(kv("k1", "", 1))));

KeyValue response = getObject("k2");
assertThat(response.getKey(), is("k2"));
assertThat(response.getVersion(), is(1L));
assertThat(response.getValue().toStringUtf8(), is("k2v1"));

assertTrue(getObject("k1").getValue().isEmpty());

// Delete fails (and hence put as well) due to mismatched version for the deleted item.
assertThrows(ConflictException.class, () -> putAndDeleteObjects(null, List.of(kv("k3", "k3v1", 0)), List.of(kv("k2", "", 3))));

assertTrue(getObject("k3").getValue().isEmpty());
assertFalse(getObject("k2").getValue().isEmpty());

// Put fails (and hence delete as well) due to mismatched version for the put item.
assertThrows(ConflictException.class, () -> putAndDeleteObjects(null, List.of(kv("k3", "k3v1", 1)), List.of(kv("k2", "", 1))));

assertTrue(getObject("k3").getValue().isEmpty());
assertFalse(getObject("k2").getValue().isEmpty());

// Put and delete both fail due to mismatched global version.
assertThrows(ConflictException.class, () -> putAndDeleteObjects(2L, List.of(kv("k3", "k3v1", 0)), List.of(kv("k2", "", 1))));

assertTrue(getObject("k3").getValue().isEmpty());
assertFalse(getObject("k2").getValue().isEmpty());

assertThat(getObject(KVStore.GLOBAL_VERSION_KEY).getVersion(), is(0L));
}

@Test
void deleteShouldSucceedWhenItemExists() {
assertDoesNotThrow(() -> putObjects(null, List.of(kv("k1", "k1v1", 0))));
assertDoesNotThrow(() -> deleteObject(kv("k1", "", 1)));

KeyValue response = getObject("k1");
assertThat(response.getKey(), is("k1"));
assertTrue(response.getValue().isEmpty());
}

@Test
void deleteShouldSucceedWhenItemDoesNotExist() {
assertDoesNotThrow(() -> deleteObject(kv("non_existent_key", "", 0)));
}

@Test
void deleteShouldBeIdempotent() {
assertDoesNotThrow(() -> putObjects(null, List.of(kv("k1", "k1v1", 0))));
assertDoesNotThrow(() -> deleteObject(kv("k1", "", 1)));
assertDoesNotThrow(() -> deleteObject(kv("k1", "", 1)));

KeyValue response = getObject("k1");
assertThat(response.getKey(), is("k1"));
assertTrue(response.getValue().isEmpty());
}

@Test
void getShouldReturnEmptyResponseWhenKeyDoesNotExist() {
KeyValue response = getObject("non_existent_key");
Expand Down Expand Up @@ -370,6 +430,25 @@ private void putObjects(@Nullable Long globalVersion, List<KeyValue> keyValues)
this.kvStore.put(putObjectRequestBuilder.build());
}

private void putAndDeleteObjects(@Nullable Long globalVersion, List<KeyValue> putKeyValues, List<KeyValue> deleteKeyValues) {
PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.newBuilder()
.setStoreId(STORE_ID)
.addAllTransactionItems(putKeyValues)
.addAllDeleteItems(deleteKeyValues);

if (Objects.nonNull(globalVersion)) {
putObjectRequestBuilder.setGlobalVersion(globalVersion);
}

this.kvStore.put(putObjectRequestBuilder.build());
}

private void deleteObject(KeyValue keyValue) {
DeleteObjectRequest request = DeleteObjectRequest.newBuilder()
.setStoreId(STORE_ID).setKeyValue(keyValue).build();
this.kvStore.delete(request);
}

private ListKeyVersionsResponse list(@Nullable String nextPageToken, @Nullable Integer pageSize,
@Nullable String keyPrefix) {
ListKeyVersionsRequest.Builder listRequestBuilder = ListKeyVersionsRequest.newBuilder()
Expand Down
Loading