Skip to content

Commit

Permalink
test: add txn test
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Mar 11, 2024
1 parent 0f718d1 commit 05fecda
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 10 deletions.
54 changes: 50 additions & 4 deletions jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Stream;

import cloud.xline.jxline.Txn;
import cloud.xline.jxline.kv.TxnResponse;
import com.xline.protobuf.Command;
import com.xline.protobuf.RequestWithToken;
import com.xline.protobuf.TxnRequest;
import com.xline.protobuf.*;
import io.etcd.jetcd.ByteSequence;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -108,8 +107,55 @@ private Command toTxnRequest() {
requestBuilder.addFailure(o.toRequestOp(namespace));
}

TxnRequest txnReq = requestBuilder.build();

return Command.newBuilder()
.setRequest(RequestWithToken.newBuilder().setTxnRequest(requestBuilder).build())
.addAllKeys(getTxnReqKeyRanges(txnReq))
.setRequest(RequestWithToken.newBuilder().setTxnRequest(txnReq).build())
.build();
}

private static List<KeyRange> getTxnReqKeyRanges(TxnRequest req) {
List<KeyRange> keyRanges = new ArrayList<>();
req.getCompareList()
.forEach(
cmp ->
keyRanges.add(
KeyRange.newBuilder()
.setKey(cmp.getKey())
.setRangeEnd(cmp.getRangeEnd())
.build()));
Stream.concat(req.getSuccessList().stream(), req.getFailureList().stream())
.forEach(
op -> {
if (op.hasRequestRange()) {
keyRanges.add(
KeyRange.newBuilder()
.setKey(op.getRequestRange().getKey())
.setRangeEnd(op.getRequestRange().getRangeEnd())
.build());
return;
}
if (op.hasRequestPut()) {
keyRanges.add(
KeyRange.newBuilder()
.setKey(op.getRequestPut().getKey())
.build());
return;
}
if (op.hasRequestDeleteRange()) {
keyRanges.add(
KeyRange.newBuilder()
.setKey(op.getRequestDeleteRange().getKey())
.setRangeEnd(
op.getRequestDeleteRange().getRangeEnd())
.build());
return;
}
if (op.hasRequestTxn()) {
keyRanges.addAll(getTxnReqKeyRanges(op.getRequestTxn()));
}
});
return keyRanges;
}
}
30 changes: 24 additions & 6 deletions jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static Command mapPutCommand(
ByteSequence key, ByteSequence value, PutOption option, ByteSequence namespace) {
PutRequest req = mapPutRequest(key, value, option, namespace);
return Command.newBuilder()
.addKeys(KeyRange.newBuilder().setKey(ByteString.copyFrom(key.getBytes())).build())
.addKeys(KeyRange.newBuilder().setKey(Util.prefixNamespace(key, namespace)))
.setRequest(
RequestWithToken.newBuilder().setPutRequest(req).build()) // TODO: add token
.build();
Expand All @@ -51,11 +51,20 @@ public static RangeRequest.Builder mapRangeRequest(
public static Command mapRangeCommand(
ByteSequence key, GetOption option, ByteSequence namespace) {
RangeRequest.Builder builder = mapRangeRequest(key, option, namespace);
KeyRange.Builder keyRange =
KeyRange.newBuilder().setKey(Util.prefixNamespace(key, namespace));

defineRangeRequestEnd(
key, option.getEndKey(), option.isPrefix(), namespace, builder::setRangeEnd);
key,
option.getEndKey(),
option.isPrefix(),
namespace,
endKey -> {
builder.setRangeEnd(endKey);
keyRange.setRangeEnd(endKey);
});
return Command.newBuilder()
.addKeys(KeyRange.newBuilder().setKey(ByteString.copyFrom(key.getBytes())).build())
.addKeys(keyRange)
.setRequest(
RequestWithToken.newBuilder()
.setRangeRequest(builder.build())
Expand All @@ -73,11 +82,20 @@ public static DeleteRangeRequest.Builder mapDeleteRequest(
public static Command mapDeleteCommand(
ByteSequence key, DeleteOption option, ByteSequence namespace) {
DeleteRangeRequest.Builder builder = mapDeleteRequest(key, option, namespace);
defineRangeRequestEnd(
key, option.getEndKey(), option.isPrefix(), namespace, builder::setRangeEnd);
KeyRange.Builder keyRange =
KeyRange.newBuilder().setKey(Util.prefixNamespace(key, namespace));

defineRangeRequestEnd(
key,
option.getEndKey(),
option.isPrefix(),
namespace,
endKey -> {
builder.setRangeEnd(endKey);
keyRange.setRangeEnd(endKey);
});
return Command.newBuilder()
.addKeys(KeyRange.newBuilder().setKey(ByteString.copyFrom(key.getBytes())).build())
.addKeys(keyRange)
.setRequest(
RequestWithToken.newBuilder()
.setDeleteRangeRequest(builder.build())
Expand Down
113 changes: 113 additions & 0 deletions jxline-core/src/test/java/KVTest.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import cloud.xline.jxline.Client;
import cloud.xline.jxline.KV;
import cloud.xline.jxline.Txn;
import cloud.xline.jxline.kv.DeleteResponse;
import cloud.xline.jxline.kv.GetResponse;
import cloud.xline.jxline.kv.PutResponse;
import cloud.xline.jxline.kv.TxnResponse;
import cloud.xline.jxline.op.Cmp;
import cloud.xline.jxline.op.CmpTarget;
import cloud.xline.jxline.op.Op;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.assertj.core.api.Assertions.*;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -121,4 +128,110 @@ public void testDelete() throws Exception {
DeleteResponse delResp = deleteFuture.get();
assertThat(delResp.getDeleted()).isEqualTo(resp.getKvs().size());
}

@Test
public void testGetSortedPrefix() throws Exception {
String prefix = randomString();
int numPrefix = 3;
putKeysWithPrefix(prefix, numPrefix);

GetOption option =
GetOption.builder()
.withSortField(GetOption.SortTarget.KEY)
.withSortOrder(GetOption.SortOrder.DESCEND)
.isPrefix(true)
.build();
CompletableFuture<GetResponse> getFeature = kvClient.get(bytesOf(prefix), option);
GetResponse response = getFeature.get();

assertThat(response.getKvs()).hasSize(numPrefix);
for (int i = 0; i < numPrefix; i++) {
assertThat(response.getKvs().get(i).getKey().toString(StandardCharsets.UTF_8))
.isEqualTo(prefix + (numPrefix - i - 1));
assertThat(response.getKvs().get(i).getValue().toString(StandardCharsets.UTF_8))
.isEqualTo(String.valueOf(numPrefix - i - 1));
}
}

@Test
public void testGetAndDeleteWithPrefix() throws Exception {
String prefix = randomString();
ByteSequence key = bytesOf(prefix);
int numPrefixes = 10;

putKeysWithPrefix(prefix, numPrefixes);

// verify get withPrefix.
CompletableFuture<GetResponse> getFuture =
kvClient.get(key, GetOption.builder().isPrefix(true).build());
GetResponse getResp = getFuture.get();
assertThat(getResp.getCount()).isEqualTo(numPrefixes);

// verify del withPrefix.
DeleteOption deleteOpt = DeleteOption.builder().isPrefix(true).build();
CompletableFuture<DeleteResponse> delFuture = kvClient.delete(key, deleteOpt);
DeleteResponse delResp = delFuture.get();
assertThat(delResp.getDeleted()).isEqualTo(numPrefixes);
}

private static void putKeysWithPrefix(String prefix, int numPrefixes)
throws ExecutionException, InterruptedException {
for (int i = 0; i < numPrefixes; i++) {
ByteSequence key = bytesOf(prefix + i);
ByteSequence value = bytesOf("" + i);
kvClient.put(key, value).get();
}
}

@Test
public void testTxn() throws Exception {
ByteSequence sampleKey = bytesOf("txn_key");
ByteSequence sampleValue = bytesOf("xyz");
ByteSequence cmpValue = bytesOf("abc");
ByteSequence putValue = bytesOf("XYZ");
ByteSequence putValueNew = bytesOf("ABC");
// put the original txn key value pair
kvClient.put(sampleKey, sampleValue).get();

// construct txn operation
Txn txn = kvClient.txn();
Cmp cmp = new Cmp(sampleKey, Cmp.Op.GREATER, CmpTarget.value(cmpValue));
CompletableFuture<TxnResponse> txnResp =
txn.If(cmp)
.Then(Op.put(sampleKey, putValue, PutOption.DEFAULT))
.Else(Op.put(sampleKey, putValueNew, PutOption.DEFAULT))
.commit();
txnResp.get();
// get the value
GetResponse getResp = kvClient.get(sampleKey).get();
assertThat(getResp.getKvs()).hasSize(1);
assertThat(getResp.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8))
.isEqualTo(putValue.toString(StandardCharsets.UTF_8));
}

@Test
public void testTxnForCmpOpNotEqual() throws Exception {
ByteSequence sampleKey = bytesOf("txn_key");
ByteSequence sampleValue = bytesOf("xyz");
ByteSequence cmpValue = bytesOf("abc");
ByteSequence putValue = bytesOf("XYZ");
ByteSequence putValueNew = bytesOf("ABC");
// put the original txn key value pair
kvClient.put(sampleKey, sampleValue).get();

// construct txn operation
Txn txn = kvClient.txn();
Cmp cmp = new Cmp(sampleKey, Cmp.Op.NOT_EQUAL, CmpTarget.value(cmpValue));
CompletableFuture<TxnResponse> txnResp =
txn.If(cmp)
.Then(Op.put(sampleKey, putValue, PutOption.DEFAULT))
.Else(Op.put(sampleKey, putValueNew, PutOption.DEFAULT))
.commit();
txnResp.get();
// get the value
GetResponse getResp = kvClient.get(sampleKey).get();
assertThat(getResp.getKvs()).hasSize(1);
assertThat(getResp.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8))
.isEqualTo(putValue.toString(StandardCharsets.UTF_8));
}
}

0 comments on commit 05fecda

Please sign in to comment.