Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright © 2022-2026 The Oxia Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.oxia.client.api.options.defs;

import io.oxia.client.api.options.PutOption;

/**
* @hidden
*/
public record OptionOverrideModificationsCount(long overrideModificationsCount)
implements PutOption {
public OptionOverrideModificationsCount {
if (overrideModificationsCount < 0) {
throw new IllegalArgumentException(
"overrideModificationsCount cannot be less than 0 - was: " + overrideModificationsCount);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright © 2022-2026 The Oxia Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.oxia.client.api.options.defs;

import io.oxia.client.api.options.PutOption;

/**
* @hidden
*/
public record OptionOverrideVersionId(long overrideVersionId) implements PutOption {
public OptionOverrideVersionId {
if (overrideVersionId < 0) {
throw new IllegalArgumentException(
"overrideVersionId cannot be less than 0 - was: " + overrideVersionId);
}
}
}
12 changes: 9 additions & 3 deletions client/src/main/java/io/oxia/client/AsyncOxiaClientImpl.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2025 The Oxia Authors
* Copyright © 2022-2026 The Oxia Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -313,6 +313,8 @@ private CompletableFuture<PutResult> internalPut(
var versionId = OptionsUtils.getVersionId(options);
var sequenceKeysDeltas = OptionsUtils.getSequenceKeysDeltas(options);
var secondaryIndexes = OptionsUtils.getSecondaryIndexes(options);
var overrideVersionId = OptionsUtils.getOverrideVersionId(options);
var overrideModificationsCount = OptionsUtils.getOverrideModificationsCount(options);

CompletableFuture<PutResult> future = new CompletableFuture<>();

Expand All @@ -327,7 +329,9 @@ private CompletableFuture<PutResult> internalPut(
versionId,
OptionalLong.empty(),
Optional.empty(),
secondaryIndexes);
secondaryIndexes,
overrideVersionId,
overrideModificationsCount);
writeBatchManager.getBatcher(shardId).add(op);
} else {
// The put operation is trying to write an ephemeral record. We need to have a valid session
Expand All @@ -346,7 +350,9 @@ private CompletableFuture<PutResult> internalPut(
versionId,
OptionalLong.of(session.getSessionId()),
Optional.of(clientIdentifier),
secondaryIndexes);
secondaryIndexes,
overrideVersionId,
overrideModificationsCount);
writeBatchManager.getBatcher(shardId).add(op);
})
.exceptionally(
Expand Down
44 changes: 43 additions & 1 deletion client/src/main/java/io/oxia/client/OptionsUtils.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2025 The Oxia Authors
* Copyright © 2022-2026 The Oxia Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@
import io.oxia.client.api.options.GetOption;
import io.oxia.client.api.options.defs.OptionComparisonType;
import io.oxia.client.api.options.defs.OptionEphemeral;
import io.oxia.client.api.options.defs.OptionOverrideModificationsCount;
import io.oxia.client.api.options.defs.OptionOverrideVersionId;
import io.oxia.client.api.options.defs.OptionPartitionKey;
import io.oxia.client.api.options.defs.OptionSecondaryIndex;
import io.oxia.client.api.options.defs.OptionSecondaryIndexName;
Expand Down Expand Up @@ -157,6 +159,46 @@ public static List<OptionSecondaryIndex> getSecondaryIndexes(Set<?> options) {
return res != null ? res : Collections.emptyList();
}

public static OptionalLong getOverrideVersionId(Set<?> options) {
if (options == null || options.isEmpty()) {
return OptionalLong.empty();
}

OptionalLong overrideVersionId = OptionalLong.empty();
for (var o : options) {
if (o instanceof OptionOverrideVersionId e) {
if (overrideVersionId.isPresent()) {
throw new IllegalArgumentException(
"OverrideVersionId cannot be passed multiple times: " + options);
}

overrideVersionId = OptionalLong.of(e.overrideVersionId());
}
}

return overrideVersionId;
}

public static OptionalLong getOverrideModificationsCount(Set<?> options) {
if (options == null || options.isEmpty()) {
return OptionalLong.empty();
}

OptionalLong overrideModificationsCount = OptionalLong.empty();
for (var o : options) {
if (o instanceof OptionOverrideModificationsCount e) {
if (overrideModificationsCount.isPresent()) {
throw new IllegalArgumentException(
"OverrideModificationsCount cannot be passed multiple times: " + options);
}

overrideModificationsCount = OptionalLong.of(e.overrideModificationsCount());
}
}

return overrideModificationsCount;
}

public static Optional<String> getSecondaryIndexName(Set<?> options) {
if (options == null || options.isEmpty()) {
return Optional.empty();
Expand Down
8 changes: 6 additions & 2 deletions client/src/main/java/io/oxia/client/batch/Operation.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2025 The Oxia Authors
* Copyright © 2022-2026 The Oxia Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -96,7 +96,9 @@ record PutOperation(
@NonNull OptionalLong expectedVersionId,
OptionalLong sessionId,
Optional<String> clientIdentifier,
List<OptionSecondaryIndex> secondaryIndexes)
List<OptionSecondaryIndex> secondaryIndexes,
@NonNull OptionalLong overrideVersionId,
@NonNull OptionalLong overrideModificationsCount)
implements WriteOperation<PutResult> {

public PutOperation {
Expand Down Expand Up @@ -136,6 +138,8 @@ PutRequest toProto() {
.build();
});
}
overrideVersionId.ifPresent(builder::setOverrideVersionId);
overrideModificationsCount.ifPresent(builder::setOverrideModificationsCount);
return builder.build();
}

Expand Down
7 changes: 6 additions & 1 deletion client/src/main/proto/io/streamnative/oxia/client.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright © 2022-2025 The Oxia Authors
// Copyright © 2022-2026 The Oxia Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -267,6 +267,11 @@ message PutRequest {
repeated uint64 sequence_key_delta = 7;

repeated SecondaryIndex secondary_indexes = 8;

// Optional overrides for version metadata, used during data migration.
// When set, the server will use these values instead of auto-generating them.
optional int64 override_version_id = 9;
optional int64 override_modifications_count = 10;
}

/**
Expand Down
53 changes: 52 additions & 1 deletion client/src/test/java/io/oxia/client/api/PutOptionTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2025 The Oxia Authors
* Copyright © 2022-2026 The Oxia Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,8 @@

import io.oxia.client.OptionsUtils;
import io.oxia.client.api.options.PutOption;
import io.oxia.client.api.options.defs.OptionOverrideModificationsCount;
import io.oxia.client.api.options.defs.OptionOverrideVersionId;
import io.oxia.client.api.options.defs.OptionVersionId;
import java.util.Collections;
import java.util.Set;
Expand Down Expand Up @@ -108,4 +110,53 @@ void isEphemeral() {
assertThat(OptionsUtils.isEphemeral(Set.of(PutOption.IfVersionIdEquals(5)))).isFalse();
assertThat(OptionsUtils.isEphemeral(Collections.emptySet())).isFalse();
}

@Nested
@DisplayName("OverrideVersionId tests")
class OverrideVersionIdTests {
@Test
void overrideVersionId() {
assertThat(new OptionOverrideVersionId(5L)).isInstanceOf(OptionOverrideVersionId.class);
}

@Test
void overrideVersionIdLessThanZero() {
assertThatNoException().isThrownBy(() -> new OptionOverrideVersionId(0L));
assertThatThrownBy(() -> new OptionOverrideVersionId(-1L))
.isInstanceOf(IllegalArgumentException.class);
}

@Test
void getOverrideVersionId() {
assertThat(OptionsUtils.getOverrideVersionId(Collections.emptySet())).isEmpty();
assertThat(OptionsUtils.getOverrideVersionId(Set.of(new OptionOverrideVersionId(5L))))
.hasValue(5L);
}
}

@Nested
@DisplayName("OverrideModificationsCount tests")
class OverrideModificationsCountTests {
@Test
void overrideModificationsCount() {
assertThat(new OptionOverrideModificationsCount(3L))
.isInstanceOf(OptionOverrideModificationsCount.class);
}

@Test
void overrideModificationsCountLessThanZero() {
assertThatNoException().isThrownBy(() -> new OptionOverrideModificationsCount(0L));
assertThatThrownBy(() -> new OptionOverrideModificationsCount(-1L))
.isInstanceOf(IllegalArgumentException.class);
}

@Test
void getOverrideModificationsCount() {
assertThat(OptionsUtils.getOverrideModificationsCount(Collections.emptySet())).isEmpty();
assertThat(
OptionsUtils.getOverrideModificationsCount(
Set.of(new OptionOverrideModificationsCount(3L))))
.hasValue(3L);
}
}
}
10 changes: 7 additions & 3 deletions client/src/test/java/io/oxia/client/batch/BatchTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2025 The Oxia Authors
* Copyright © 2022-2026 The Oxia Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -207,7 +207,9 @@ class WriteBatchTests {
OptionalLong.of(1),
OptionalLong.empty(),
Optional.empty(),
Collections.emptyList());
Collections.emptyList(),
OptionalLong.empty(),
OptionalLong.empty());
PutOperation putEphemeral =
new PutOperation(
putEphemeralCallable,
Expand All @@ -218,7 +220,9 @@ class WriteBatchTests {
OptionalLong.of(1),
OptionalLong.of(1),
Optional.of("client-id"),
Collections.emptyList());
Collections.emptyList(),
OptionalLong.empty(),
OptionalLong.empty());
DeleteOperation delete = new DeleteOperation(deleteCallable, "", OptionalLong.of(1));
DeleteRangeOperation deleteRange = new DeleteRangeOperation(deleteRangeCallable, "a", "b");

Expand Down
6 changes: 4 additions & 2 deletions client/src/test/java/io/oxia/client/batch/BatcherTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2025 The Oxia Authors
* Copyright © 2022-2026 The Oxia Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -125,7 +125,9 @@ void addWhenNextDoesNotFit() {
OptionalLong.empty(),
OptionalLong.empty(),
Optional.empty(),
Collections.emptyList());
Collections.emptyList(),
OptionalLong.empty(),
OptionalLong.empty());
when(batchFactory.getBatch(shardId)).thenReturn(batch);
when(batch.size()).thenReturn(config.maxRequestsPerBatch(), 1);
when(batch.canAdd(any())).thenReturn(false);
Expand Down
Loading