Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.core.operation.Operation;
import tech.ydb.core.settings.BaseRequestSettings;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicClient;
Expand Down Expand Up @@ -50,6 +49,7 @@
import tech.ydb.topic.settings.PartitioningSettings;
import tech.ydb.topic.settings.ReadEventHandlersSettings;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.TopicClientOperationSettings;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.utils.ProtoUtils;
import tech.ydb.topic.write.AsyncWriter;
Expand Down Expand Up @@ -86,9 +86,10 @@ public static Builder newClient(TopicRpc rpc) {
return new TopicClientBuilderImpl(rpc);
}

private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) {
private GrpcRequestSettings makeGrpcRequestSettings(TopicClientOperationSettings settings) {
return GrpcRequestSettings.newBuilder()
.withDeadline(settings.getRequestTimeout())
.withPreferReadyChannel(settings.isPreferReadyChannel())
.build();
}

Expand Down Expand Up @@ -401,10 +402,7 @@ public CompletableFuture<Status> commitOffset(String path, CommitOffsetSettings
request.setReadSessionId(settings.getReadSessionId());
}

GrpcRequestSettings grpcRequestSettings = GrpcRequestSettings.newBuilder()
.withDeadline(settings.getRequestTimeout())
.withPreferReadyChannel(true)
.build();
final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings);
return topicRpc.commitOffset(request.build(), grpcRequestSettings);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import tech.ydb.core.settings.OperationSettings;
import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.MeteringMode;
import tech.ydb.topic.description.SupportedCodecs;

/**
* @author Nikolay Perfilov
*/
public class AlterTopicSettings extends OperationSettings {
public class AlterTopicSettings extends TopicClientOperationSettings {
@Nullable
private final AlterPartitioningSettings alterPartitioningSettings;
@Nullable
Expand Down Expand Up @@ -113,7 +112,7 @@ public MeteringMode getMeteringMode() {
/**
* BUILDER
*/
public static class Builder extends OperationBuilder<Builder> {
public static class Builder extends TopicClientOperationBuilder<Builder> {
private AlterPartitioningSettings alterPartitioningSettings = null;
private Duration retentionPeriod = null;
private Long retentionStorageMb = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package tech.ydb.topic.settings;

import tech.ydb.core.settings.OperationSettings;

/**
* @author Nikolay Perfilov
*/
public class CommitOffsetSettings extends OperationSettings {
public class CommitOffsetSettings extends TopicClientOperationSettings {
private final long partitionId;
private final String consumer;
private final long offset;
Expand Down Expand Up @@ -42,7 +40,7 @@ public String getReadSessionId() {
/*
* BUILDER
*/
public static class Builder extends OperationBuilder<Builder> {
public static class Builder extends TopicClientOperationBuilder<Builder> {
private long partitionId = -1;
private String consumer = null;
private long offset = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import tech.ydb.core.settings.OperationSettings;
import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.MeteringMode;
import tech.ydb.topic.description.SupportedCodecs;
Expand All @@ -21,7 +20,7 @@
/**
* @author Nikolay Perfilov
*/
public class CreateTopicSettings extends OperationSettings {
public class CreateTopicSettings extends TopicClientOperationSettings {
@Nullable
private final PartitioningSettings partitioningSettings;
@Nullable
Expand Down Expand Up @@ -94,7 +93,7 @@ public MeteringMode getMeteringMode() {
/**
* BUILDER
*/
public static class Builder extends OperationBuilder<Builder> {
public static class Builder extends TopicClientOperationBuilder<Builder> {
private PartitioningSettings partitioningSettings = null;
private Duration retentionPeriod = null;
private long retentionStorageMb = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package tech.ydb.topic.settings;

import tech.ydb.core.settings.OperationSettings;

/**
*
* @author Aleksandr Gorshenin
*/
public class DescribeConsumerSettings extends OperationSettings {
public class DescribeConsumerSettings extends TopicClientOperationSettings {
private final boolean includeStats;
private final boolean includeLocation;

Expand All @@ -28,7 +26,7 @@ public static Builder newBuilder() {
return new Builder();
}

public static class Builder extends OperationBuilder<Builder> {
public static class Builder extends TopicClientOperationBuilder<Builder> {
private boolean includeStats = false;
private boolean includeLocation = false;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package tech.ydb.topic.settings;

import tech.ydb.core.settings.OperationSettings;

/**
* @author Nikolay Perfilov
*/
public class DescribeTopicSettings extends OperationSettings {
public class DescribeTopicSettings extends TopicClientOperationSettings {
private final boolean includeStats;

private DescribeTopicSettings(Builder builder) {
Expand All @@ -21,7 +19,7 @@ public static Builder newBuilder() {
return new Builder();
}

public static class Builder extends OperationBuilder<Builder> {
public static class Builder extends TopicClientOperationBuilder<Builder> {
private boolean includeStats = false;

public Builder withIncludeStats(boolean includeStats) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package tech.ydb.topic.settings;

import tech.ydb.core.settings.OperationSettings;

/**
* @author Nikolay Perfilov
*/
public class DropTopicSettings extends OperationSettings {
public class DropTopicSettings extends TopicClientOperationSettings {
private DropTopicSettings(Builder builder) {
super(builder);
}
Expand All @@ -14,7 +12,7 @@ public static Builder newBuilder() {
return new Builder();
}

public static class Builder extends OperationBuilder<Builder> {
public static class Builder extends TopicClientOperationBuilder<Builder> {
@Override
public DropTopicSettings build() {
return new DropTopicSettings(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package tech.ydb.topic.settings;

import tech.ydb.core.settings.OperationSettings;

public class TopicClientOperationSettings extends OperationSettings {
private final boolean preferReadyChannel;

protected TopicClientOperationSettings(TopicClientOperationBuilder<?> builder) {
super(builder);
this.preferReadyChannel = builder.preferReadyChannel;
}

public boolean isPreferReadyChannel() {
return preferReadyChannel;
}

public static class TopicClientOperationBuilder<Self extends TopicClientOperationBuilder<?>>
extends OperationBuilder<Self> {
private boolean preferReadyChannel = false;

public Self withPreferReadyChannel(boolean value) {
this.preferReadyChannel = value;
return self();
}

@Override
public TopicClientOperationSettings build() {
return new TopicClientOperationSettings(this);
}
}
}
161 changes: 161 additions & 0 deletions topic/src/test/java/tech/ydb/topic/impl/TopicClientImplTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package tech.ydb.topic.impl;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import org.junit.Test;
import org.mockito.ArgumentCaptor;

import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcRequestSettings;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.settings.AlterTopicSettings;
import tech.ydb.topic.settings.CommitOffsetSettings;
import tech.ydb.topic.settings.CreateTopicSettings;
import tech.ydb.topic.settings.DescribeConsumerSettings;
import tech.ydb.topic.settings.DescribeTopicSettings;
import tech.ydb.topic.settings.DropTopicSettings;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TopicClientImplTest {

@Test
public void testEnablePreferReadyChannelSetting() {
TopicRpc mock = mock(TopicRpc.class);
when(mock.createTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
when(mock.dropTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
when(mock.alterTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
when(mock.commitOffset(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
when(mock.describeTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(mock(Result.class)));
when(mock.describeConsumer(any(), any())).thenReturn(CompletableFuture.completedFuture(mock(Result.class)));

final String topic = "topic";
final String consumer = "consumer";
final String sessionId = "sessionId";

ArgumentCaptor<GrpcRequestSettings> requestCaptor = ArgumentCaptor.forClass(GrpcRequestSettings.class);
TopicClient client = TopicClientImpl.newClient(mock).build();

// createTopic
client.createTopic(topic, CreateTopicSettings.newBuilder()
.withPreferReadyChannel(true)
.build());

verify(mock).createTopic(any(), requestCaptor.capture());
assertTrue(requestCaptor.getValue().isPreferReadyChannel());

// dropTopic
client.dropTopic(topic, DropTopicSettings.newBuilder()
.withPreferReadyChannel(true)
.build());

verify(mock).dropTopic(any(), requestCaptor.capture());
assertTrue(requestCaptor.getValue().isPreferReadyChannel());

// alterTopic
client.alterTopic(topic, AlterTopicSettings.newBuilder()
.withPreferReadyChannel(true)
.build());

verify(mock).alterTopic(any(), requestCaptor.capture());
assertTrue(requestCaptor.getValue().isPreferReadyChannel());

// commitOffset
client.commitOffset(topic, CommitOffsetSettings.newBuilder()
.setReadSessionId(sessionId)
.withPreferReadyChannel(true)
.setConsumer(consumer)
.setPartitionId(0)
.build());

verify(mock).commitOffset(any(), requestCaptor.capture());
assertTrue(requestCaptor.getValue().isPreferReadyChannel());

// describeTopic
client.describeTopic(topic, DescribeTopicSettings.newBuilder()
.withPreferReadyChannel(true)
.build());

verify(mock).describeTopic(any(), requestCaptor.capture());
assertTrue(requestCaptor.getValue().isPreferReadyChannel());

// describeConsumer
client.describeConsumer(topic, consumer, DescribeConsumerSettings.newBuilder()
.withPreferReadyChannel(true)
.build());

verify(mock).describeConsumer(any(), requestCaptor.capture());
assertTrue(requestCaptor.getValue().isPreferReadyChannel());
}

@Test
public void testDefaultPreferReadyChannelSetting() {
TopicRpc mock = mock(TopicRpc.class);
when(mock.createTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
when(mock.dropTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
when(mock.alterTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
when(mock.commitOffset(any(), any())).thenReturn(CompletableFuture.completedFuture(Status.SUCCESS));
when(mock.describeTopic(any(), any())).thenReturn(CompletableFuture.completedFuture(mock(Result.class)));
when(mock.describeConsumer(any(), any())).thenReturn(CompletableFuture.completedFuture(mock(Result.class)));

final String topic = "topic";
final String consumer = "consumer";
final String sessionId = "sessionId";

ArgumentCaptor<GrpcRequestSettings> requestCaptor = ArgumentCaptor.forClass(GrpcRequestSettings.class);
TopicClient client = TopicClientImpl.newClient(mock).build();

// createTopic
client.createTopic(topic, CreateTopicSettings.newBuilder()
.build());

verify(mock).createTopic(any(), requestCaptor.capture());
assertFalse(requestCaptor.getValue().isPreferReadyChannel());

// dropTopic
client.dropTopic(topic, DropTopicSettings.newBuilder()
.build());

verify(mock).dropTopic(any(), requestCaptor.capture());
assertFalse(requestCaptor.getValue().isPreferReadyChannel());

// alterTopic
client.alterTopic(topic, AlterTopicSettings.newBuilder()
.build());

verify(mock).alterTopic(any(), requestCaptor.capture());
assertFalse(requestCaptor.getValue().isPreferReadyChannel());

// commitOffset
client.commitOffset(topic, CommitOffsetSettings.newBuilder()
.setReadSessionId(sessionId)
.setConsumer(consumer)
.setPartitionId(0)
.build());

verify(mock).commitOffset(any(), requestCaptor.capture());
assertFalse(requestCaptor.getValue().isPreferReadyChannel());

// describeTopic
client.describeTopic(topic, DescribeTopicSettings.newBuilder()
.build());

verify(mock).describeTopic(any(), requestCaptor.capture());
assertFalse(requestCaptor.getValue().isPreferReadyChannel());

// describeConsumer
client.describeConsumer(topic, consumer, DescribeConsumerSettings.newBuilder()
.build());

verify(mock).describeConsumer(any(), requestCaptor.capture());
assertFalse(requestCaptor.getValue().isPreferReadyChannel());
}
}
Loading