Skip to content

Commit 827b940

Browse files
committed
FT.AGGREGATE was never working in the first place, now it is working, but for RESP3 only
1 parent 9b153b1 commit 827b940

18 files changed

+429
-165
lines changed

src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import io.lettuce.core.protocol.CommandType;
4949
import io.lettuce.core.protocol.ProtocolKeyword;
5050
import io.lettuce.core.protocol.RedisCommand;
51+
import io.lettuce.core.search.AggregationReply;
5152
import io.lettuce.core.search.SearchReply;
5253
import io.lettuce.core.search.arguments.AggregateArgs;
5354
import io.lettuce.core.search.arguments.CreateArgs;
@@ -1579,23 +1580,23 @@ public RedisFuture<SearchReply<K, V>> ftSearch(K index, V query) {
15791580
}
15801581

15811582
@Override
1582-
public RedisFuture<SearchReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args) {
1583+
public RedisFuture<AggregationReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args) {
15831584
return dispatch(searchCommandBuilder.ftAggregate(index, query, args));
15841585
}
15851586

15861587
@Override
1587-
public RedisFuture<SearchReply<K, V>> ftAggregate(K index, V query) {
1588+
public RedisFuture<AggregationReply<K, V>> ftAggregate(K index, V query) {
15881589
return dispatch(searchCommandBuilder.ftAggregate(index, query, null));
15891590
}
15901591

15911592
@Override
1592-
public RedisFuture<SearchReply<K, V>> ftCursorread(K index, long cursorId, int count) {
1593+
public RedisFuture<AggregationReply<K, V>> ftCursorread(K index, long cursorId, int count) {
15931594
return dispatch(searchCommandBuilder.ftCursorread(index, cursorId, count));
15941595
}
15951596

15961597
@Override
1597-
public RedisFuture<SearchReply<K, V>> ftCursorread(K index, long cursorId) {
1598-
return dispatch(searchCommandBuilder.ftCursorread(index, cursorId));
1598+
public RedisFuture<AggregationReply<K, V>> ftCursorread(K index, long cursorId) {
1599+
return dispatch(searchCommandBuilder.ftCursorread(index, cursorId, -1));
15991600
}
16001601

16011602
@Override

src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import io.lettuce.core.protocol.RedisCommand;
5050
import io.lettuce.core.protocol.TracedCommand;
5151
import io.lettuce.core.resource.ClientResources;
52+
import io.lettuce.core.search.AggregationReply;
5253
import io.lettuce.core.search.SearchReply;
5354
import io.lettuce.core.search.arguments.AggregateArgs;
5455
import io.lettuce.core.search.arguments.CreateArgs;
@@ -1649,23 +1650,23 @@ public Mono<SearchReply<K, V>> ftSearch(K index, V query) {
16491650
}
16501651

16511652
@Override
1652-
public Mono<SearchReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args) {
1653+
public Mono<AggregationReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args) {
16531654
return createMono(() -> searchCommandBuilder.ftAggregate(index, query, args));
16541655
}
16551656

16561657
@Override
1657-
public Mono<SearchReply<K, V>> ftAggregate(K index, V query) {
1658+
public Mono<AggregationReply<K, V>> ftAggregate(K index, V query) {
16581659
return createMono(() -> searchCommandBuilder.ftAggregate(index, query, null));
16591660
}
16601661

16611662
@Override
1662-
public Mono<SearchReply<K, V>> ftCursorread(K index, long cursorId, int count) {
1663+
public Mono<AggregationReply<K, V>> ftCursorread(K index, long cursorId, int count) {
16631664
return createMono(() -> searchCommandBuilder.ftCursorread(index, cursorId, count));
16641665
}
16651666

16661667
@Override
1667-
public Mono<SearchReply<K, V>> ftCursorread(K index, long cursorId) {
1668-
return createMono(() -> searchCommandBuilder.ftCursorread(index, cursorId));
1668+
public Mono<AggregationReply<K, V>> ftCursorread(K index, long cursorId) {
1669+
return createMono(() -> searchCommandBuilder.ftCursorread(index, cursorId, -1));
16691670
}
16701671

16711672
@Override

src/main/java/io/lettuce/core/RediSearchCommandBuilder.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import io.lettuce.core.protocol.Command;
1414
import io.lettuce.core.protocol.CommandArgs;
1515
import io.lettuce.core.protocol.CommandKeyword;
16+
import io.lettuce.core.search.AggregateReplyParser;
17+
import io.lettuce.core.search.AggregationReply;
1618
import io.lettuce.core.search.SearchReply;
1719
import io.lettuce.core.search.SearchReplyParser;
1820
import io.lettuce.core.search.arguments.AggregateArgs;
@@ -95,7 +97,7 @@ public Command<K, V, SearchReply<K, V>> ftSearch(K index, V query, SearchArgs<K,
9597
* @param aggregateArgs the aggregate arguments
9698
* @return the result of the aggregate command
9799
*/
98-
public Command<K, V, SearchReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> aggregateArgs) {
100+
public Command<K, V, AggregationReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> aggregateArgs) {
99101
notNullKey(index);
100102
notNullKey(query);
101103

@@ -106,42 +108,42 @@ public Command<K, V, SearchReply<K, V>> ftAggregate(K index, V query, AggregateA
106108
aggregateArgs.build(args);
107109
}
108110

109-
return createCommand(FT_AGGREGATE, new EncodedComplexOutput<>(codec, new SearchReplyParser<>(codec, null)), args);
111+
return createCommand(FT_AGGREGATE, new EncodedComplexOutput<>(codec, new AggregateReplyParser<>(codec, null)), args);
110112
}
111113

112114
/**
113115
* Read next results from an existing cursor.
114116
*
115117
* @param index the index name
116118
* @param cursorId the cursor id
117-
* @param count the number of results to read
118119
* @return the result of the cursor read command
119120
*/
120-
public Command<K, V, SearchReply<K, V>> ftCursorread(K index, long cursorId, int count) {
121+
public Command<K, V, AggregationReply<K, V>> ftCursorread(K index, long cursorId) {
121122
notNullKey(index);
122123

123124
CommandArgs<K, V> args = new CommandArgs<>(codec).add(CommandKeyword.READ).addKey(index);
124125
args.add(cursorId);
125-
args.add(CommandKeyword.COUNT);
126-
args.add(count);
127126

128-
return createCommand(FT_CURSOR, new EncodedComplexOutput<>(codec, new SearchReplyParser<>(codec, null)), args);
127+
return createCommand(FT_CURSOR, new EncodedComplexOutput<>(codec, new AggregateReplyParser<>(codec, null)), args);
129128
}
130129

131130
/**
132131
* Read next results from an existing cursor.
133132
*
134133
* @param index the index name
135134
* @param cursorId the cursor id
135+
* @param count the number of results to read
136136
* @return the result of the cursor read command
137137
*/
138-
public Command<K, V, SearchReply<K, V>> ftCursorread(K index, long cursorId) {
138+
public Command<K, V, AggregationReply<K, V>> ftCursorread(K index, long cursorId, int count) {
139139
notNullKey(index);
140140

141141
CommandArgs<K, V> args = new CommandArgs<>(codec).add(CommandKeyword.READ).addKey(index);
142142
args.add(cursorId);
143+
args.add(CommandKeyword.COUNT);
144+
args.add(count);
143145

144-
return createCommand(FT_CURSOR, new EncodedComplexOutput<>(codec, new SearchReplyParser<>(codec, null)), args);
146+
return createCommand(FT_CURSOR, new EncodedComplexOutput<>(codec, new AggregateReplyParser<>(codec, null)), args);
145147
}
146148

147149
/**

src/main/java/io/lettuce/core/api/async/RediSearchAsyncCommands.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.List;
1010
import io.lettuce.core.RedisFuture;
1111
import io.lettuce.core.annotations.Experimental;
12+
import io.lettuce.core.search.AggregationReply;
1213
import io.lettuce.core.search.SearchReply;
1314
import io.lettuce.core.search.arguments.AggregateArgs;
1415
import io.lettuce.core.search.arguments.CreateArgs;
@@ -275,7 +276,7 @@ public interface RediSearchAsyncCommands<K, V> {
275276
* @see #ftAggregate(Object, Object, AggregateArgs)
276277
*/
277278
@Experimental
278-
RedisFuture<SearchReply<K, V>> ftAggregate(K index, V query);
279+
RedisFuture<AggregationReply<K, V>> ftAggregate(K index, V query);
279280

280281
/**
281282
* Run a search query on an index and perform advanced aggregate transformations with a processing pipeline.
@@ -331,7 +332,7 @@ public interface RediSearchAsyncCommands<K, V> {
331332
* @see #ftCursorread(Object, long)
332333
*/
333334
@Experimental
334-
RedisFuture<SearchReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args);
335+
RedisFuture<AggregationReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args);
335336

336337
/**
337338
* Read next results from an existing cursor.
@@ -365,7 +366,7 @@ public interface RediSearchAsyncCommands<K, V> {
365366
* @see #ftAggregate(Object, Object, AggregateArgs)
366367
*/
367368
@Experimental
368-
RedisFuture<SearchReply<K, V>> ftCursorread(K index, long cursorId, int count);
369+
RedisFuture<AggregationReply<K, V>> ftCursorread(K index, long cursorId, int count);
369370

370371
/**
371372
* Read next results from an existing cursor using the default batch size.
@@ -398,7 +399,7 @@ public interface RediSearchAsyncCommands<K, V> {
398399
* @see #ftAggregate(Object, Object, AggregateArgs)
399400
*/
400401
@Experimental
401-
RedisFuture<SearchReply<K, V>> ftCursorread(K index, long cursorId);
402+
RedisFuture<AggregationReply<K, V>> ftCursorread(K index, long cursorId);
402403

403404
/**
404405
* Delete a cursor and free its associated resources.

src/main/java/io/lettuce/core/api/reactive/RediSearchReactiveCommands.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@
99
import java.util.List;
1010

1111
import io.lettuce.core.annotations.Experimental;
12+
import io.lettuce.core.search.AggregationReply;
1213
import io.lettuce.core.search.SearchReply;
1314
import io.lettuce.core.search.arguments.AggregateArgs;
1415
import io.lettuce.core.search.arguments.CreateArgs;
1516
import io.lettuce.core.search.arguments.FieldArgs;
1617
import io.lettuce.core.search.arguments.SearchArgs;
18+
import reactor.core.publisher.Flux;
1719
import reactor.core.publisher.Mono;
1820

1921
/**
@@ -276,7 +278,7 @@ public interface RediSearchReactiveCommands<K, V> {
276278
* @see #ftAggregate(Object, Object, AggregateArgs)
277279
*/
278280
@Experimental
279-
Mono<SearchReply<K, V>> ftAggregate(K index, V query);
281+
Mono<AggregationReply<K, V>> ftAggregate(K index, V query);
280282

281283
/**
282284
* Run a search query on an index and perform advanced aggregate transformations with a processing pipeline.
@@ -332,7 +334,7 @@ public interface RediSearchReactiveCommands<K, V> {
332334
* @see #ftCursorread(Object, long)
333335
*/
334336
@Experimental
335-
Mono<SearchReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args);
337+
Mono<AggregationReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args);
336338

337339
/**
338340
* Read next results from an existing cursor.
@@ -366,7 +368,7 @@ public interface RediSearchReactiveCommands<K, V> {
366368
* @see #ftAggregate(Object, Object, AggregateArgs)
367369
*/
368370
@Experimental
369-
Mono<SearchReply<K, V>> ftCursorread(K index, long cursorId, int count);
371+
Mono<AggregationReply<K, V>> ftCursorread(K index, long cursorId, int count);
370372

371373
/**
372374
* Read next results from an existing cursor using the default batch size.
@@ -399,7 +401,7 @@ public interface RediSearchReactiveCommands<K, V> {
399401
* @see #ftAggregate(Object, Object, AggregateArgs)
400402
*/
401403
@Experimental
402-
Mono<SearchReply<K, V>> ftCursorread(K index, long cursorId);
404+
Mono<AggregationReply<K, V>> ftCursorread(K index, long cursorId);
403405

404406
/**
405407
* Delete a cursor and free its associated resources.

src/main/java/io/lettuce/core/api/sync/RediSearchCommands.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.List;
1010

1111
import io.lettuce.core.annotations.Experimental;
12+
import io.lettuce.core.search.AggregationReply;
1213
import io.lettuce.core.search.SearchReply;
1314
import io.lettuce.core.search.arguments.AggregateArgs;
1415
import io.lettuce.core.search.arguments.CreateArgs;
@@ -275,7 +276,7 @@ public interface RediSearchCommands<K, V> {
275276
* @see #ftAggregate(Object, Object, AggregateArgs)
276277
*/
277278
@Experimental
278-
SearchReply<K, V> ftAggregate(K index, V query);
279+
AggregationReply<K, V> ftAggregate(K index, V query);
279280

280281
/**
281282
* Run a search query on an index and perform advanced aggregate transformations with a processing pipeline.
@@ -331,7 +332,7 @@ public interface RediSearchCommands<K, V> {
331332
* @see #ftCursorread(Object, long)
332333
*/
333334
@Experimental
334-
SearchReply<K, V> ftAggregate(K index, V query, AggregateArgs<K, V> args);
335+
AggregationReply<K, V> ftAggregate(K index, V query, AggregateArgs<K, V> args);
335336

336337
/**
337338
* Read next results from an existing cursor.
@@ -365,7 +366,7 @@ public interface RediSearchCommands<K, V> {
365366
* @see #ftAggregate(Object, Object, AggregateArgs)
366367
*/
367368
@Experimental
368-
SearchReply<K, V> ftCursorread(K index, long cursorId, int count);
369+
AggregationReply<K, V> ftCursorread(K index, long cursorId, int count);
369370

370371
/**
371372
* Read next results from an existing cursor using the default batch size.
@@ -398,7 +399,7 @@ public interface RediSearchCommands<K, V> {
398399
* @see #ftAggregate(Object, Object, AggregateArgs)
399400
*/
400401
@Experimental
401-
SearchReply<K, V> ftCursorread(K index, long cursorId);
402+
AggregationReply<K, V> ftCursorread(K index, long cursorId);
402403

403404
/**
404405
* Delete a cursor and free its associated resources.

src/main/java/io/lettuce/core/cluster/api/async/RediSearchAsyncCommands.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.List;
1010

1111
import io.lettuce.core.annotations.Experimental;
12+
import io.lettuce.core.search.AggregationReply;
1213
import io.lettuce.core.search.SearchReply;
1314
import io.lettuce.core.search.arguments.AggregateArgs;
1415
import io.lettuce.core.search.arguments.CreateArgs;
@@ -275,7 +276,7 @@ public interface RediSearchAsyncCommands<K, V> {
275276
* @see #ftAggregate(Object, Object, AggregateArgs)
276277
*/
277278
@Experimental
278-
AsyncExecutions<SearchReply<K, V>> ftAggregate(K index, V query);
279+
AsyncExecutions<AggregationReply<K, V>> ftAggregate(K index, V query);
279280

280281
/**
281282
* Run a search query on an index and perform advanced aggregate transformations with a processing pipeline.
@@ -331,7 +332,7 @@ public interface RediSearchAsyncCommands<K, V> {
331332
* @see #ftCursorread(Object, long)
332333
*/
333334
@Experimental
334-
AsyncExecutions<SearchReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args);
335+
AsyncExecutions<AggregationReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args);
335336

336337
/**
337338
* Read next results from an existing cursor.
@@ -365,7 +366,7 @@ public interface RediSearchAsyncCommands<K, V> {
365366
* @see #ftAggregate(Object, Object, AggregateArgs)
366367
*/
367368
@Experimental
368-
AsyncExecutions<SearchReply<K, V>> ftCursorread(K index, long cursorId, int count);
369+
AsyncExecutions<AggregationReply<K, V>> ftCursorread(K index, long cursorId, int count);
369370

370371
/**
371372
* Read next results from an existing cursor using the default batch size.
@@ -398,7 +399,7 @@ public interface RediSearchAsyncCommands<K, V> {
398399
* @see #ftAggregate(Object, Object, AggregateArgs)
399400
*/
400401
@Experimental
401-
AsyncExecutions<SearchReply<K, V>> ftCursorread(K index, long cursorId);
402+
AsyncExecutions<AggregationReply<K, V>> ftCursorread(K index, long cursorId);
402403

403404
/**
404405
* Delete a cursor and free its associated resources.

src/main/java/io/lettuce/core/cluster/api/sync/RediSearchCommands.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.List;
1010

1111
import io.lettuce.core.annotations.Experimental;
12+
import io.lettuce.core.search.AggregationReply;
1213
import io.lettuce.core.search.SearchReply;
1314
import io.lettuce.core.search.arguments.AggregateArgs;
1415
import io.lettuce.core.search.arguments.CreateArgs;
@@ -275,7 +276,7 @@ public interface RediSearchCommands<K, V> {
275276
* @see #ftAggregate(Object, Object, AggregateArgs)
276277
*/
277278
@Experimental
278-
Executions<SearchReply<K, V>> ftAggregate(K index, V query);
279+
Executions<AggregationReply<K, V>> ftAggregate(K index, V query);
279280

280281
/**
281282
* Run a search query on an index and perform advanced aggregate transformations with a processing pipeline.
@@ -331,7 +332,7 @@ public interface RediSearchCommands<K, V> {
331332
* @see #ftCursorread(Object, long)
332333
*/
333334
@Experimental
334-
Executions<SearchReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args);
335+
Executions<AggregationReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args);
335336

336337
/**
337338
* Read next results from an existing cursor.
@@ -365,7 +366,7 @@ public interface RediSearchCommands<K, V> {
365366
* @see #ftAggregate(Object, Object, AggregateArgs)
366367
*/
367368
@Experimental
368-
Executions<SearchReply<K, V>> ftCursorread(K index, long cursorId, int count);
369+
Executions<AggregationReply<K, V>> ftCursorread(K index, long cursorId, int count);
369370

370371
/**
371372
* Read next results from an existing cursor using the default batch size.
@@ -398,7 +399,7 @@ public interface RediSearchCommands<K, V> {
398399
* @see #ftAggregate(Object, Object, AggregateArgs)
399400
*/
400401
@Experimental
401-
Executions<SearchReply<K, V>> ftCursorread(K index, long cursorId);
402+
Executions<AggregationReply<K, V>> ftCursorread(K index, long cursorId);
402403

403404
/**
404405
* Delete a cursor and free its associated resources.

0 commit comments

Comments
 (0)