Skip to content

feat(kv): enhance namespaced key-value store operations and handlers #2613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import com.automq.shell.metrics.S3MetricsExporter;
import com.automq.stream.api.KeyValue;
import com.automq.stream.api.KeyValue.ValueAndEpoch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -95,6 +96,32 @@ public KeyValue.Value getKV(String key) throws IOException {
throw code.exception();
}

public ValueAndEpoch getKV(String key, String namespace) throws IOException {

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ClientKVClient]: Get KV: {} Namespace: {}", key, namespace);
}

GetKVsRequestData data = new GetKVsRequestData()
.setGetKeyRequests(List.of(new GetKVsRequestData.GetKVRequest().setKey(key).setNamespace(namespace)));

long now = Time.SYSTEM.milliseconds();
ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()),
new GetKVsRequest.Builder(data), now, true, 3000, null);

ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM);
GetKVsResponseData responseData = (GetKVsResponseData) response.responseBody().data();

Errors code = Errors.forCode(responseData.errorCode());
if (Objects.requireNonNull(code) == Errors.NONE) {
return ValueAndEpoch.of(
responseData.getKVResponses().get(0).value(),
responseData.getKVResponses().get(0).epoch());
}

throw code.exception();
}

public KeyValue.Value putKV(String key, byte[] value) throws IOException {
long now = Time.SYSTEM.milliseconds();

Expand All @@ -119,6 +146,32 @@ public KeyValue.Value putKV(String key, byte[] value) throws IOException {
throw code.exception();
}

public ValueAndEpoch putKV(String key, byte[] value, String namespace, long epoch) throws IOException {
long now = Time.SYSTEM.milliseconds();

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ClientKVClient]: put KV: {}", key);
}

PutKVsRequestData data = new PutKVsRequestData()
.setPutKVRequests(List.of(new PutKVsRequestData.PutKVRequest().setKey(key).setValue(value).setNamespace(namespace).setEpoch(epoch)));

ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()),
new PutKVsRequest.Builder(data), now, true, 3000, null);

ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM);
PutKVsResponseData responseData = (PutKVsResponseData) response.responseBody().data();

Errors code = Errors.forCode(responseData.errorCode());
if (Objects.requireNonNull(code) == Errors.NONE) {
return ValueAndEpoch.of(
responseData.putKVResponses().get(0).value(),
responseData.putKVResponses().get(0).epoch());
}

throw code.exception();
}

public KeyValue.Value deleteKV(String key) throws IOException {
long now = Time.SYSTEM.milliseconds();

Expand All @@ -142,4 +195,30 @@ public KeyValue.Value deleteKV(String key) throws IOException {

throw code.exception();
}

public ValueAndEpoch deleteKV(String key, String namespace, long epoch) throws IOException {
long now = Time.SYSTEM.milliseconds();

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[ClientKVClient]: Delete KV: {}", key);
}

DeleteKVsRequestData data = new DeleteKVsRequestData()
.setDeleteKVRequests(List.of(new DeleteKVsRequestData.DeleteKVRequest().setKey(key).setNamespace(namespace).setEpoch(epoch)));

ClientRequest clientRequest = networkClient.newClientRequest(String.valueOf(bootstrapServer.id()),
new DeleteKVsRequest.Builder(data), now, true, 3000, null);

ClientResponse response = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, Time.SYSTEM);
DeleteKVsResponseData responseData = (DeleteKVsResponseData) response.responseBody().data();

Errors code = Errors.forCode(responseData.errorCode());
if (Objects.requireNonNull(code) == Errors.NONE) {
return ValueAndEpoch.of(
responseData.deleteKVResponses().get(0).value(),
responseData.deleteKVResponses().get(0).epoch());
}

throw code.exception();
}
}
42 changes: 42 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,48 @@ default ListClientMetricsResourcesResult listClientMetricsResources() {
* @return {@link UpdateGroupResult}
*/
UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, UpdateGroupOptions options);

GetNamespacedKVResult getNamespacedKV(
Optional<Set<TopicPartition>> partitions,
String namespace,
String key,
GetNamespacedKVOptions options
);

/**
* Put a key-value pair in the namespaced KV store.
* @param partitions
* @param namespace
* @param key
* @param value
* @param options
* @return
*/
PutNamespacedKVResult putNamespacedKV(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the parameter "partitions" exist?

Optional<Set<TopicPartition>> partitions,
String namespace,
String key,
String value,
PutNamespacedKVOptions options
);

/**
* Delete a key-value pair in the namespaced KV store.
* @param partitions
* @param namespace
* @param key
* @param options
* @return
*/
DeleteNamespacedKVResult deleteNamespacedKV(
Optional<Set<TopicPartition>> partitions,
String namespace,
String key,
DeleteNamespacedKVOptions options
);



// AutoMQ inject end

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.kafka.clients.admin;

public class DeleteNamespacedKVOptions extends AbstractOptions<DeleteNamespacedKVOptions> {

private long ifMatchEpoch = 0L;

public DeleteNamespacedKVOptions ifMatchEpoch(long epoch) {
this.ifMatchEpoch = epoch;
return this;
}

public long ifMatchEpoch() {
return ifMatchEpoch;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

public class DeleteNamespacedKVResult extends AbstractOptions<DeleteNamespacedKVResult> {

private final Map<TopicPartition, KafkaFuture<Void>> futures;

public DeleteNamespacedKVResult(Map<TopicPartition, KafkaFuture<Void>> futures) {
this.futures = futures;
}

public KafkaFuture<Map<TopicPartition, KafkaFuture<Void>>> all() {
return KafkaFuture.completedFuture(futures);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,5 +320,20 @@ public UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec,
return delegate.updateGroup(groupId, groupSpec, options);
}

@Override
public GetNamespacedKVResult getNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, GetNamespacedKVOptions options) {
return delegate.getNamespacedKV(partitions, namespace, key, options);
}

@Override
public PutNamespacedKVResult putNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) {
return delegate.putNamespacedKV(partitions, namespace, key, value, options);
}

@Override
public DeleteNamespacedKVResult deleteNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, DeleteNamespacedKVOptions options) {
return delegate.deleteNamespacedKV(partitions, namespace, key, options);
}

// AutoMQ inject end
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package org.apache.kafka.clients.admin;

public class GetNamespacedKVOptions extends AbstractOptions<GetNamespacedKVOptions> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.GetKVsResponseData.GetKVResponse;

import java.util.Map;
import java.util.concurrent.ExecutionException;

public class GetNamespacedKVResult {

private final Map<TopicPartition, KafkaFuture<GetKVResponse>> futures;

public GetNamespacedKVResult(Map<TopicPartition, KafkaFuture<GetKVResponse>> futures) {
this.futures = futures;
}

public KafkaFuture<Map<TopicPartition, KafkaFuture<GetKVResponse>>> all() throws ExecutionException, InterruptedException {
return KafkaFuture.completedFuture(futures);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,19 @@
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DeleteNamespacedKVHandler;
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
import org.apache.kafka.clients.admin.internals.GetNamespacedKVHandler;
import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListOffsetsHandler;
import org.apache.kafka.clients.admin.internals.ListTransactionsHandler;
import org.apache.kafka.clients.admin.internals.NamespacedKVRecordsToGet;
import org.apache.kafka.clients.admin.internals.NamespacedKVRecordsToPut;
import org.apache.kafka.clients.admin.internals.PutNamespacedKVHandler;
import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler;
import org.apache.kafka.clients.admin.internals.UpdateGroupHandler;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand Down Expand Up @@ -132,6 +137,7 @@
import org.apache.kafka.common.message.DeleteAclsResponseData;
import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult;
import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl;
import org.apache.kafka.common.message.DeleteKVsRequestData;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState;
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
Expand All @@ -152,6 +158,8 @@
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.GetKVsRequestData;
import org.apache.kafka.common.message.GetKVsResponseData;
import org.apache.kafka.common.message.GetNextNodeIdRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
Expand All @@ -160,6 +168,8 @@
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.PutKVsRequestData;
import org.apache.kafka.common.message.PutKVsResponseData;
import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.UnregisterBrokerRequestData;
Expand Down Expand Up @@ -266,6 +276,7 @@

import org.slf4j.Logger;

import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
Expand Down Expand Up @@ -4866,6 +4877,86 @@ public UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec,
return new UpdateGroupResult(future.get(CoordinatorKey.byGroupId(groupId)));
}

@Override
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add AutoMQ extended code in the last of file and wrap the code with AutoMQ inject start / end.

public GetNamespacedKVResult getNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, GetNamespacedKVOptions options) {
Set<TopicPartition> targetPartitions = partitions.orElseThrow(() ->
new IllegalArgumentException("Partitions cannot be empty")
);

NamespacedKVRecordsToGet.Builder recordsToGetBuilder = NamespacedKVRecordsToGet.newBuilder();
for (TopicPartition tp : targetPartitions) {
GetKVsRequestData.GetKVRequest kvRequest = new GetKVsRequestData.GetKVRequest()
.setKey(key)
.setNamespace(namespace);

recordsToGetBuilder.addRecord(tp, kvRequest);
}

NamespacedKVRecordsToGet recordsToGet = recordsToGetBuilder.build();
GetNamespacedKVHandler handler = new GetNamespacedKVHandler(logContext, recordsToGet);
SimpleAdminApiFuture<TopicPartition, GetKVsResponseData.GetKVResponse> future = GetNamespacedKVHandler.newFuture(targetPartitions);

invokeDriver(handler, future, options.timeoutMs);

return new GetNamespacedKVResult(future.all());
}

@Override
public PutNamespacedKVResult putNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) {
Set<TopicPartition> targetPartitions = partitions.orElseThrow(() ->
new IllegalArgumentException("Partitions cannot be empty")
);

NamespacedKVRecordsToPut.Builder recordsToPutBuilder = NamespacedKVRecordsToPut.newBuilder();
for (TopicPartition tp : targetPartitions) {
PutKVsRequestData.PutKVRequest kvRequest = new PutKVsRequestData.PutKVRequest()
.setKey(key)
.setValue(value.getBytes(StandardCharsets.UTF_8))
.setNamespace(namespace)
.setOverwrite(options.overwrite())
.setEpoch(options.ifMatchEpoch());

recordsToPutBuilder.addRecord(tp, kvRequest);
}

NamespacedKVRecordsToPut recordsToPut = recordsToPutBuilder.build();

PutNamespacedKVHandler handler = new PutNamespacedKVHandler(logContext, recordsToPut);
SimpleAdminApiFuture<TopicPartition, PutKVsResponseData.PutKVResponse> future = PutNamespacedKVHandler.newFuture(targetPartitions);

invokeDriver(handler, future, options.timeoutMs);

return new PutNamespacedKVResult(future.all());
}


@Override
public DeleteNamespacedKVResult deleteNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, DeleteNamespacedKVOptions options) {

Set<TopicPartition> targetPartitions = partitions.orElseThrow(() ->
new IllegalArgumentException("Partitions cannot be empty")
);

NamespacedKVRecordsToDelete.Builder recordsToDeleteBuilder = NamespacedKVRecordsToDelete.newBuilder();
for (TopicPartition tp : targetPartitions) {
DeleteKVsRequestData.DeleteKVRequest kvRequest = new DeleteKVsRequestData.DeleteKVRequest()
.setKey(key)
.setNamespace(namespace)
.setEpoch(options.ifMatchEpoch());

recordsToDeleteBuilder.addRecord(tp, kvRequest);
}

NamespacedKVRecordsToDelete recordsToDelete = recordsToDeleteBuilder.build();

DeleteNamespacedKVHandler handler = new DeleteNamespacedKVHandler(logContext, recordsToDelete);
SimpleAdminApiFuture<TopicPartition, Void> future = DeleteNamespacedKVHandler.newFuture(targetPartitions);

invokeDriver(handler, future, options.timeoutMs);

return new DeleteNamespacedKVResult(future.all());
}

private <K, V> void invokeDriver(
AdminApiHandler<K, V> handler,
AdminApiFuture<K, V> future,
Expand Down
Loading
Loading