Skip to content

Conversation

a-TODO-rov
Copy link
Contributor

@a-TODO-rov a-TODO-rov commented Oct 5, 2025

Implementing request/response policies for search - version with API overrides
Version with ClusterWriter routing implementation - #3409
Part of #3447

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements request/response policies for RediSearch in cluster environments, transitioning from cursor ID-based routing to cursor object-based routing with API overrides. The implementation enables keyless RediSearch commands to route randomly across cluster nodes while respecting ReadFrom policies and maintaining cursor stickiness.

Key changes:

  • Replaced cursor ID parameters with Cursor objects containing both cursor ID and node ID for cluster routing
  • Added cluster-aware routing for keyless RediSearch commands that honor ReadFrom policies
  • Updated cursor lifecycle to support sticky routing in cluster mode

Reviewed Changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
AggregationReply.java Replaced cursor ID field with optional Cursor class containing cursor ID and node ID
AbstractRedisAsyncCommands.java Updated cursor method signatures to accept Cursor objects instead of cursor IDs
RedisAdvancedClusterAsyncCommandsImpl.java Added cluster routing implementation for keyless RediSearch commands
PooledClusterConnectionProvider.java Added random connection selection method for keyless command routing
Template and interface files Updated API signatures across sync/async/reactive interfaces to use Cursor objects
Test files Updated integration tests to use new cursor API and verify cluster routing behavior

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

import io.lettuce.core.search.arguments.SynUpdateArgs;
import reactor.core.publisher.Mono;

import java.util.Optional;
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import is placed after other package imports. Move it to the appropriate alphabetical position with other java.util imports around line 83.

Copilot uses AI. Check for mistakes.

Comment on lines +1352 to +1354
// RediSearch may either omit the cursor on the final page, or return a non-zero cursor
// that requires one more empty READ to return 0. Be tolerant across versions.
long effective = nextResult.getCursor().map(AggregationReply.Cursor::getCursorId).orElse(0L);
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider moving this comment closer to the conditional logic on line 1354 where it's actually used, or make the comment more specific about which versions exhibit this behavior.

Suggested change
// RediSearch may either omit the cursor on the final page, or return a non-zero cursor
// that requires one more empty READ to return 0. Be tolerant across versions.
long effective = nextResult.getCursor().map(AggregationReply.Cursor::getCursorId).orElse(0L);
long effective = nextResult.getCursor().map(AggregationReply.Cursor::getCursorId).orElse(0L);
// RediSearch (prior to v2.6) may either omit the cursor on the final page, or return a non-zero cursor
// that requires one more empty READ to return 0. Be tolerant across versions.

Copilot uses AI. Check for mistakes.

Comment on lines +796 to +805
private ConnectionIntent getConnectionIntent(ProtocolKeyword commandType) {
try {
RedisCommand probe = new Command(commandType, null);
boolean isReadOnly = getStatefulConnection().getOptions().getReadOnlyCommands().isReadOnly(probe);
return isReadOnly ? ConnectionIntent.READ : ConnectionIntent.WRITE;
} catch (Exception e) {
logger.error("Error while determining connection intent for " + commandType, e);
return ConnectionIntent.WRITE;
}
}
Copy link

Copilot AI Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is duplicated in both RedisAdvancedClusterReactiveCommandsImpl and RedisAdvancedClusterAsyncCommandsImpl. Consider extracting this to a shared utility class or abstract base class to avoid code duplication.

Copilot uses AI. Check for mistakes.

Copy link
Collaborator

@tishun tishun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nits only except the connection intent

Comment on lines +41 to +42
* Obtain a node-scoped connection for the given {@link ConnectionIntent} by first selecting a random upstream (master)
* shard to ensure even distribution across partitions and then delegating to the slot-based selection logic.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically, especially in an interface, you would not explain the implementation, but rather focus on the goal : in this case we do not care HOW we obtain a node, what we care is that we are obtaining a random node from the list of all nodes in the topology, by also considering any readfrom policies.

This is because the actual implementation might change, but the overall method contract / purpose could remain the same.

return new PipelinedRedisFuture<>(failed);
}
String nodeId = nodeIdOpt.get();
StatefulRedisConnection<K, V> byNode = getStatefulConnection().getConnection(nodeId, ConnectionIntent.WRITE);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that a cursor is created on a READ node? Would this logic work in this case?

E.g. FT.AGGREGATE -> route to random node that is READ -> connection intent is WRITE here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it seems in this case intent doesn't really matter much. We are acquiring connection to a specific redis-server instance by nodeId. For better semantics i will change intent READ for ftCursorread and leave intent WRITE for ftCursordel

}

@Override
public CompletableFuture<StatefulRedisConnection<K, V>> getRandomConnectionAsync(ConnectionIntent connectionIntent) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a unit test for this method

Comment on lines +873 to +874
@Override
public RedisFuture<AggregationReply<K, V>> ftCursorread(String index, Cursor cursor, int count) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a unit test for this method (and reactive counterpart)

}

@Override
public RedisFuture<String> ftCursordel(String index, Cursor cursor) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a unit test for this method (and reactive counterpart)

* @param <R> result type
* @return RedisFuture wrapping the routed execution
*/
<R> RedisFuture<R> routeKeyless(Supplier<RedisFuture<R>> superCall,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a unit test for this method (and reactive counterpart)

* @param <R> result type
* @return RedisFuture wrapping the routed execution
*/
<R> RedisFuture<R> routeKeyless(Supplier<RedisFuture<R>> superCall,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a unit test for this method (and reactive counterpart)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants