-
Notifications
You must be signed in to change notification settings - Fork 459
feat(kv): enhance namespaced key-value store operations and handlers #2613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
1sonofqiu
wants to merge
8
commits into
main
Choose a base branch
from
kvrecord_enhancement
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
0ef0bed
feat(kv): enhance namespaced key-value store operations and handlers
1sonofqiu 9a3b426
fix(build): remove unnecessary featureControl parameter from NodeCont…
1sonofqiu ab077ec
feat(kv): simplify key handling in KVControlManager and add namespace…
1sonofqiu e709fb9
feat(kv): test put and delete epoch validation
1sonofqiu 28045f4
feat(kv): test put and delete epoch validation
1sonofqiu 36a804b
chore(kv): correct namespace spelling in KVControlManager and RemoveK…
1sonofqiu 9013068
feat(kv): refactor getNamespacedKV methods to remove value parameter …
1sonofqiu 546cf76
chore(kv): remove testPutKV_NewKey method from KafkaAdminClientTest
1sonofqiu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
15 changes: 15 additions & 0 deletions
15
clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVOptions.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package org.apache.kafka.clients.admin; | ||
|
||
public class DeleteNamespacedKVOptions extends AbstractOptions<DeleteNamespacedKVOptions> { | ||
|
||
private long ifMatchEpoch = 0L; | ||
|
||
public DeleteNamespacedKVOptions ifMatchEpoch(long epoch) { | ||
this.ifMatchEpoch = epoch; | ||
return this; | ||
} | ||
|
||
public long ifMatchEpoch() { | ||
return ifMatchEpoch; | ||
} | ||
} |
19 changes: 19 additions & 0 deletions
19
clients/src/main/java/org/apache/kafka/clients/admin/DeleteNamespacedKVResult.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package org.apache.kafka.clients.admin; | ||
|
||
import org.apache.kafka.common.KafkaFuture; | ||
import org.apache.kafka.common.TopicPartition; | ||
|
||
import java.util.Map; | ||
|
||
public class DeleteNamespacedKVResult extends AbstractOptions<DeleteNamespacedKVResult> { | ||
|
||
private final Map<TopicPartition, KafkaFuture<Void>> futures; | ||
|
||
public DeleteNamespacedKVResult(Map<TopicPartition, KafkaFuture<Void>> futures) { | ||
this.futures = futures; | ||
} | ||
|
||
public KafkaFuture<Map<TopicPartition, KafkaFuture<Void>>> all() { | ||
return KafkaFuture.completedFuture(futures); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
4 changes: 4 additions & 0 deletions
4
clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVOptions.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
package org.apache.kafka.clients.admin; | ||
|
||
public class GetNamespacedKVOptions extends AbstractOptions<GetNamespacedKVOptions> { | ||
} |
21 changes: 21 additions & 0 deletions
21
clients/src/main/java/org/apache/kafka/clients/admin/GetNamespacedKVResult.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package org.apache.kafka.clients.admin; | ||
|
||
import org.apache.kafka.common.KafkaFuture; | ||
import org.apache.kafka.common.TopicPartition; | ||
import org.apache.kafka.common.message.GetKVsResponseData.GetKVResponse; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.ExecutionException; | ||
|
||
public class GetNamespacedKVResult { | ||
|
||
private final Map<TopicPartition, KafkaFuture<GetKVResponse>> futures; | ||
|
||
public GetNamespacedKVResult(Map<TopicPartition, KafkaFuture<GetKVResponse>> futures) { | ||
this.futures = futures; | ||
} | ||
|
||
public KafkaFuture<Map<TopicPartition, KafkaFuture<GetKVResponse>>> all() throws ExecutionException, InterruptedException { | ||
return KafkaFuture.completedFuture(futures); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,14 +47,19 @@ | |
import org.apache.kafka.clients.admin.internals.CoordinatorKey; | ||
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler; | ||
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler; | ||
import org.apache.kafka.clients.admin.internals.DeleteNamespacedKVHandler; | ||
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler; | ||
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler; | ||
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler; | ||
import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler; | ||
import org.apache.kafka.clients.admin.internals.FenceProducersHandler; | ||
import org.apache.kafka.clients.admin.internals.GetNamespacedKVHandler; | ||
import org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler; | ||
import org.apache.kafka.clients.admin.internals.ListOffsetsHandler; | ||
import org.apache.kafka.clients.admin.internals.ListTransactionsHandler; | ||
import org.apache.kafka.clients.admin.internals.NamespacedKVRecordsToGet; | ||
import org.apache.kafka.clients.admin.internals.NamespacedKVRecordsToPut; | ||
import org.apache.kafka.clients.admin.internals.PutNamespacedKVHandler; | ||
import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHandler; | ||
import org.apache.kafka.clients.admin.internals.UpdateGroupHandler; | ||
import org.apache.kafka.clients.consumer.OffsetAndMetadata; | ||
|
@@ -132,6 +137,7 @@ | |
import org.apache.kafka.common.message.DeleteAclsResponseData; | ||
import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult; | ||
import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl; | ||
import org.apache.kafka.common.message.DeleteKVsRequestData; | ||
import org.apache.kafka.common.message.DeleteTopicsRequestData; | ||
import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState; | ||
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; | ||
|
@@ -152,6 +158,8 @@ | |
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName; | ||
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; | ||
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; | ||
import org.apache.kafka.common.message.GetKVsRequestData; | ||
import org.apache.kafka.common.message.GetKVsResponseData; | ||
import org.apache.kafka.common.message.GetNextNodeIdRequestData; | ||
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; | ||
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; | ||
|
@@ -160,6 +168,8 @@ | |
import org.apache.kafka.common.message.ListGroupsResponseData; | ||
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; | ||
import org.apache.kafka.common.message.MetadataRequestData; | ||
import org.apache.kafka.common.message.PutKVsRequestData; | ||
import org.apache.kafka.common.message.PutKVsResponseData; | ||
import org.apache.kafka.common.message.RemoveRaftVoterRequestData; | ||
import org.apache.kafka.common.message.RenewDelegationTokenRequestData; | ||
import org.apache.kafka.common.message.UnregisterBrokerRequestData; | ||
|
@@ -266,6 +276,7 @@ | |
|
||
import org.slf4j.Logger; | ||
|
||
import java.nio.charset.StandardCharsets; | ||
import java.security.InvalidKeyException; | ||
import java.security.NoSuchAlgorithmException; | ||
import java.time.Duration; | ||
|
@@ -4866,6 +4877,86 @@ public UpdateGroupResult updateGroup(String groupId, UpdateGroupSpec groupSpec, | |
return new UpdateGroupResult(future.get(CoordinatorKey.byGroupId(groupId))); | ||
} | ||
|
||
@Override | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add AutoMQ extended code in the last of file and wrap the code with AutoMQ inject start / end. |
||
public GetNamespacedKVResult getNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, GetNamespacedKVOptions options) { | ||
Set<TopicPartition> targetPartitions = partitions.orElseThrow(() -> | ||
new IllegalArgumentException("Partitions cannot be empty") | ||
); | ||
|
||
NamespacedKVRecordsToGet.Builder recordsToGetBuilder = NamespacedKVRecordsToGet.newBuilder(); | ||
for (TopicPartition tp : targetPartitions) { | ||
GetKVsRequestData.GetKVRequest kvRequest = new GetKVsRequestData.GetKVRequest() | ||
.setKey(key) | ||
.setNamespace(namespace); | ||
|
||
recordsToGetBuilder.addRecord(tp, kvRequest); | ||
} | ||
|
||
NamespacedKVRecordsToGet recordsToGet = recordsToGetBuilder.build(); | ||
GetNamespacedKVHandler handler = new GetNamespacedKVHandler(logContext, recordsToGet); | ||
SimpleAdminApiFuture<TopicPartition, GetKVsResponseData.GetKVResponse> future = GetNamespacedKVHandler.newFuture(targetPartitions); | ||
|
||
invokeDriver(handler, future, options.timeoutMs); | ||
|
||
return new GetNamespacedKVResult(future.all()); | ||
} | ||
|
||
@Override | ||
public PutNamespacedKVResult putNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, String value, PutNamespacedKVOptions options) { | ||
Set<TopicPartition> targetPartitions = partitions.orElseThrow(() -> | ||
new IllegalArgumentException("Partitions cannot be empty") | ||
); | ||
|
||
NamespacedKVRecordsToPut.Builder recordsToPutBuilder = NamespacedKVRecordsToPut.newBuilder(); | ||
for (TopicPartition tp : targetPartitions) { | ||
PutKVsRequestData.PutKVRequest kvRequest = new PutKVsRequestData.PutKVRequest() | ||
.setKey(key) | ||
.setValue(value.getBytes(StandardCharsets.UTF_8)) | ||
.setNamespace(namespace) | ||
.setOverwrite(options.overwrite()) | ||
.setEpoch(options.ifMatchEpoch()); | ||
|
||
recordsToPutBuilder.addRecord(tp, kvRequest); | ||
} | ||
|
||
NamespacedKVRecordsToPut recordsToPut = recordsToPutBuilder.build(); | ||
|
||
PutNamespacedKVHandler handler = new PutNamespacedKVHandler(logContext, recordsToPut); | ||
SimpleAdminApiFuture<TopicPartition, PutKVsResponseData.PutKVResponse> future = PutNamespacedKVHandler.newFuture(targetPartitions); | ||
|
||
invokeDriver(handler, future, options.timeoutMs); | ||
|
||
return new PutNamespacedKVResult(future.all()); | ||
} | ||
|
||
|
||
@Override | ||
public DeleteNamespacedKVResult deleteNamespacedKV(Optional<Set<TopicPartition>> partitions, String namespace, String key, DeleteNamespacedKVOptions options) { | ||
|
||
Set<TopicPartition> targetPartitions = partitions.orElseThrow(() -> | ||
new IllegalArgumentException("Partitions cannot be empty") | ||
); | ||
|
||
NamespacedKVRecordsToDelete.Builder recordsToDeleteBuilder = NamespacedKVRecordsToDelete.newBuilder(); | ||
for (TopicPartition tp : targetPartitions) { | ||
DeleteKVsRequestData.DeleteKVRequest kvRequest = new DeleteKVsRequestData.DeleteKVRequest() | ||
.setKey(key) | ||
.setNamespace(namespace) | ||
.setEpoch(options.ifMatchEpoch()); | ||
|
||
recordsToDeleteBuilder.addRecord(tp, kvRequest); | ||
} | ||
|
||
NamespacedKVRecordsToDelete recordsToDelete = recordsToDeleteBuilder.build(); | ||
|
||
DeleteNamespacedKVHandler handler = new DeleteNamespacedKVHandler(logContext, recordsToDelete); | ||
SimpleAdminApiFuture<TopicPartition, Void> future = DeleteNamespacedKVHandler.newFuture(targetPartitions); | ||
|
||
invokeDriver(handler, future, options.timeoutMs); | ||
|
||
return new DeleteNamespacedKVResult(future.all()); | ||
} | ||
|
||
private <K, V> void invokeDriver( | ||
AdminApiHandler<K, V> handler, | ||
AdminApiFuture<K, V> future, | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the parameter "partitions" exist?