Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ private CompletableFuture<List<String>> internalShardlist(
var stub = stubManager.getStub(leader);
var requestBuilder =
ListRequest.newBuilder()
.setShardId(shardId)
.setShard(shardId)
.setStartInclusive(startKeyInclusive)
.setEndExclusive(endKeyExclusive);
secondaryIndexName.ifPresent(requestBuilder::setSecondaryIndexName);
Expand Down Expand Up @@ -740,7 +740,7 @@ private void internalShardRangeScan(
var stub = stubManager.getStub(leader);
var requestBuilder =
RangeScanRequest.newBuilder()
.setShardId(shardId)
.setShard(shardId)
.setStartInclusive(startKeyInclusive)
.setEndExclusive(endKeyExclusive);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -97,7 +97,7 @@ public void onCompleted() {
@NonNull
ReadRequest toProto() {
return ReadRequest.newBuilder()
.setShardId(getShardId())
.setShard(getShardId())
.addAllGets(gets.stream().map(Operation.ReadOperation.GetOperation::toProto).toList())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -134,7 +134,7 @@ public void handleError(Throwable batchError) {
@NonNull
WriteRequest toProto() {
return WriteRequest.newBuilder()
.setShardId(getShardId())
.setShard(getShardId())
.addAllPuts(
puts.stream().map(Operation.WriteOperation.PutOperation::toProto).collect(toList()))
.addAllDeletes(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,7 +70,7 @@ public class ShardNotificationReceiver implements Closeable, StreamObserver<Noti
}

void start() {
var request = NotificationsRequest.newBuilder().setShardId(shardId);
var request = NotificationsRequest.newBuilder().setShard(shardId);
offset.ifPresent(request::setStartOffsetExclusive);
stub.async().getNotifications(request.build(), this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -87,7 +87,7 @@ public class Session implements StreamObserver<KeepAliveResponse> {
this.sessionId = sessionId;
this.clientIdentifier = config.clientIdentifier();
this.heartbeat =
SessionHeartbeat.newBuilder().setShardId(shardId).setSessionId(sessionId).build();
SessionHeartbeat.newBuilder().setShard(shardId).setSessionId(sessionId).build();
this.listener = listener;

log.info(
Expand Down Expand Up @@ -179,7 +179,7 @@ public CompletableFuture<Void> close() {
heartbeatFuture.cancel(true);
var stub = stubProvider.getStubForShard(shardId);
var request =
CloseSessionRequest.newBuilder().setShardId(shardId).setSessionId(sessionId).build();
CloseSessionRequest.newBuilder().setShard(shardId).setSessionId(sessionId).build();

CompletableFuture<Void> result = new CompletableFuture<>();
stub.async()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,7 +45,7 @@ CompletableFuture<Session> create(long shardId) {
var request =
CreateSessionRequest.newBuilder()
.setSessionTimeoutMs((int) config.sessionTimeout().toMillis())
.setShardId(shardId)
.setShard(shardId)
.setClientIdentity(config.clientIdentifier())
.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,6 @@ public boolean overlaps(@NonNull Shard other) {
}

static @NonNull Shard fromProto(@NonNull ShardAssignment s) {
return new Shard(s.getShardId(), s.getLeader(), HashRange.fromProto(s.getInt32HashRange()));
return new Shard(s.getShard(), s.getLeader(), HashRange.fromProto(s.getInt32HashRange()));
}
}
22 changes: 11 additions & 11 deletions client/src/main/proto/io/streamnative/oxia/client.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright © 2022-2024 StreamNative Inc.
// Copyright © 2022-2025 StreamNative Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -148,7 +148,7 @@ message NamespaceShardsAssignment {
*/
message ShardAssignment {
// The shard id
int64 shard_id = 1;
int64 shard = 1;

// The shard leader, e.g. `host:port`
string leader = 2;
Expand Down Expand Up @@ -184,7 +184,7 @@ message Int32HashRange {
message WriteRequest {
// The shard id. This is optional allow for support for server-side hashing
// and proxying in the future.
optional int64 shard_id = 1;
optional int64 shard = 1;
// The put requests
repeated PutRequest puts = 2;
// The delete requests
Expand Down Expand Up @@ -212,7 +212,7 @@ message WriteResponse {
message ReadRequest {
// The shard id. This is optional allow for support for server-side hashing
// and proxying in the future.
optional int64 shard_id = 1;
optional int64 shard = 1;
// The get requests
repeated GetRequest gets = 2;
}
Expand Down Expand Up @@ -365,7 +365,7 @@ message DeleteRangeResponse {
message ListRequest {
// The shard id. This is optional allow for support for server-side hashing
// and proxying in the future.
optional int64 shard_id = 1;
optional int64 shard = 1;
// The start of the range, inclusive
string start_inclusive = 2;
// The end of the range, exclusive
Expand All @@ -388,7 +388,7 @@ message ListResponse {
message RangeScanRequest {
// The shard id. This is optional allow for support for server-side hashing
// and proxying in the future.
optional int64 shard_id = 1;
optional int64 shard = 1;
// The start of the range, inclusive
string start_inclusive = 2;
// The end of the range, exclusive
Expand Down Expand Up @@ -442,7 +442,7 @@ enum Status {
}

message CreateSessionRequest {
int64 shard_id = 1;
int64 shard = 1;
uint32 session_timeout_ms = 2;
string client_identity = 3;
}
Expand All @@ -452,14 +452,14 @@ message CreateSessionResponse {
}

message SessionHeartbeat {
int64 shard_id = 1;
int64 shard = 1;
int64 session_id = 2;
}

message KeepAliveResponse {}

message CloseSessionRequest {
int64 shard_id = 1;
int64 shard = 1;
int64 session_id = 2;
}

Expand All @@ -473,13 +473,13 @@ enum NotificationType {
}

message NotificationsRequest {
int64 shard_id = 1;
int64 shard = 1;

optional int64 start_offset_exclusive = 2;
}

message NotificationBatch {
int64 shard_id = 1;
int64 shard = 1;
int64 offset = 2;
fixed64 timestamp = 3;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void testOxiaReconnectBackoff(BackoffType type) throws Exception {
private static CompletableFuture<Void> sendMessage(OxiaStub stub) {
final var readRequest =
ReadRequest.newBuilder()
.setShardId(0)
.setShard(0)
.addGets(GetRequest.newBuilder().setKey("test").build())
.build();
final CompletableFuture<Void> f = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void start() throws Exception {
.containsOnly(
SessionHeartbeat.newBuilder()
.setSessionId(sessionId)
.setShardId(shardId)
.setShard(shardId)
.build());
});
session.close().join();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,7 @@ public class ModelFactory {
static @NonNull ShardAssignment newShardAssignment(
long id, int min, int max, @NonNull String leader) {
return ShardAssignment.newBuilder()
.setShardId(id)
.setShard(id)
.setLeader(leader)
.setInt32HashRange(newHashRange(min, max))
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -215,7 +215,7 @@ ShardAssignment assignment(int shardId, int min, int max) {
var hashRange =
Int32HashRange.newBuilder().setMinHashInclusive(min).setMaxHashInclusive(max).build();
return ShardAssignment.newBuilder()
.setShardId(shardId)
.setShard(shardId)
.setLeader("leader" + shardId)
.setInt32HashRange(hashRange)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -163,7 +163,7 @@ void cleanup() {

@Test
void start() {
var assignment = ShardAssignment.newBuilder().setShardId(0).setLeader("leader0").build();
var assignment = ShardAssignment.newBuilder().setShard(0).setLeader("leader0").build();
var nsAssignment = NamespaceShardsAssignment.newBuilder().addAssignments(assignment).build();
when(stub.async()).thenReturn(async);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,7 @@ void shardFromProto() {
var shard =
Shard.fromProto(
ShardAssignment.newBuilder()
.setShardId(1)
.setShard(1)
.setInt32HashRange(
Int32HashRange.newBuilder()
.setMinHashInclusive(2)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,7 +63,7 @@ class StaticShardStrategy implements ShardStrategy {
}

private static boolean isEquivalent(Shard shard, ShardAssignment assignment) {
return shard.id() == assignment.getShardId()
return shard.id() == assignment.getShard()
&& shard.hashRange().minInclusive() == assignment.getInt32HashRange().getMinHashInclusive()
&& shard.hashRange().maxInclusive() == assignment.getInt32HashRange().getMaxHashInclusive();
}
Expand Down