Skip to content

DATAREDIS-711 - Emit Lua array responses as List. #282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<version>2.1.0.DATAREDIS-711-SNAPSHOT</version>

<name>Spring Data Redis</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ default Mono<Boolean> scriptExists(String scriptSha) {
* Evaluate given {@code script}.
*
* @param script must not be {@literal null}.
* @param returnType must not be {@literal null}.
* @param returnType must not be {@literal null}. Using {@link ReturnType#MULTI} emits a {@link List} as-is instead of
* emitting the individual elements from the array response.
* @param numKeys
* @param keysAndArgs must not be {@literal null}.
* @return never {@literal null}.
Expand All @@ -94,7 +95,8 @@ default Mono<Boolean> scriptExists(String scriptSha) {
* Evaluate given {@code scriptSha}.
*
* @param scriptSha must not be {@literal null}.
* @param returnType must not be {@literal null}.
* @param returnType must not be {@literal null}. Using {@link ReturnType#MULTI} emits a {@link List} as-is instead of
* emitting the individual elements from the array response.
* @param numKeys
* @param keysAndArgs must not be {@literal null}.
* @return never {@literal null}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class LettuceReactiveClusterHyperLogLogCommands extends LettuceReactiveHyperLogL
@Override
public Flux<BooleanResponse<PfMergeCommand>> pfMerge(Publisher<PfMergeCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null for PFMERGE");
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty for PFMERGE!");
Expand All @@ -76,7 +76,7 @@ public Flux<BooleanResponse<PfMergeCommand>> pfMerge(Publisher<PfMergeCommand> c
@Override
public Flux<NumericResponse<PfCountCommand, Long>> pfCount(Publisher<PfCountCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notEmpty(command.getKeys(), "Keys must be null or empty for PFCOUNT!");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Mono<ByteBuffer> randomKey(RedisClusterNode node) {
@Override
public Flux<BooleanResponse<RenameCommand>> rename(Publisher<RenameCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "key must not be null.");
Assert.notNull(command.getNewName(), "NewName must not be null!");
Expand All @@ -102,7 +102,7 @@ public Flux<BooleanResponse<RenameCommand>> rename(Publisher<RenameCommand> comm
@Override
public Flux<BooleanResponse<RenameCommand>> renameNX(Publisher<RenameCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null.");
Assert.notNull(command.getNewName(), "NewName must not be null!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class LettuceReactiveClusterListCommands extends LettuceReactiveListCommands imp
@Override
public Flux<PopResponse> bPop(Publisher<BPopCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKeys(), "Keys must not be null!");
Assert.notNull(command.getDirection(), "Direction must not be null!");
Expand All @@ -68,7 +68,7 @@ public Flux<PopResponse> bPop(Publisher<BPopCommand> commands) {
@Override
public Flux<ByteBufferResponse<RPopLPushCommand>> rPopLPush(Publisher<RPopLPushCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getDestination(), "Destination key must not be null!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public Flux<RedisClientInfo> getClientList() {
public Flux<RedisClientInfo> getClientList(RedisClusterNode node) {

return connection.execute(node, RedisServerReactiveCommands::clientList)
.flatMapIterable(LettuceConverters.stringToRedisClientListConverter()::convert);
.concatMapIterable(LettuceConverters.stringToRedisClientListConverter()::convert);
}

private <T> Collection<Publisher<Tuple2<RedisClusterNode, T>>> executeOnAllNodes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class LettuceReactiveClusterSetCommands extends LettuceReactiveSetCommands imple
@Override
public Flux<CommandResponse<SUnionCommand, Flux<ByteBuffer>>> sUnion(Publisher<SUnionCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKeys(), "Keys must not be null!");

Expand All @@ -74,7 +74,7 @@ public Flux<CommandResponse<SUnionCommand, Flux<ByteBuffer>>> sUnion(Publisher<S
@Override
public Flux<NumericResponse<SUnionStoreCommand, Long>> sUnionStore(Publisher<SUnionStoreCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKeys(), "Source keys must not be null!");
Assert.notNull(command.getKey(), "Destination key must not be null!");
Expand All @@ -99,7 +99,7 @@ public Flux<NumericResponse<SUnionStoreCommand, Long>> sUnionStore(Publisher<SUn
@Override
public Flux<CommandResponse<SInterCommand, Flux<ByteBuffer>>> sInter(Publisher<SInterCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKeys(), "Keys must not be null!");

Expand All @@ -124,7 +124,7 @@ public Flux<CommandResponse<SInterCommand, Flux<ByteBuffer>>> sInter(Publisher<S
return source;
});

return Mono.just(new CommandResponse<>(command, result.flatMap(v -> Flux.fromStream(v.stream()))));
return Mono.just(new CommandResponse<>(command, result.concatMap(v -> Flux.fromStream(v.stream()))));
}));
}

Expand All @@ -134,7 +134,7 @@ public Flux<CommandResponse<SInterCommand, Flux<ByteBuffer>>> sInter(Publisher<S
@Override
public Flux<NumericResponse<SInterStoreCommand, Long>> sInterStore(Publisher<SInterStoreCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKeys(), "Source keys must not be null!");
Assert.notNull(command.getKey(), "Destination key must not be null!");
Expand All @@ -159,7 +159,7 @@ public Flux<NumericResponse<SInterStoreCommand, Long>> sInterStore(Publisher<SIn
@Override
public Flux<CommandResponse<SDiffCommand, Flux<ByteBuffer>>> sDiff(Publisher<SDiffCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKeys(), "Keys must not be null!");

Expand All @@ -185,7 +185,7 @@ public Flux<CommandResponse<SDiffCommand, Flux<ByteBuffer>>> sDiff(Publisher<SDi
return source;
});

return Mono.just(new CommandResponse<>(command, result.flatMap(v -> Flux.fromStream(v.stream()))));
return Mono.just(new CommandResponse<>(command, result.concatMap(v -> Flux.fromStream(v.stream()))));

}));
}
Expand All @@ -196,7 +196,7 @@ public Flux<CommandResponse<SDiffCommand, Flux<ByteBuffer>>> sDiff(Publisher<SDi
@Override
public Flux<NumericResponse<SDiffStoreCommand, Long>> sDiffStore(Publisher<SDiffStoreCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKeys(), "Source keys must not be null!");
Assert.notNull(command.getKey(), "Destination key must not be null!");
Expand All @@ -221,7 +221,7 @@ public Flux<NumericResponse<SDiffStoreCommand, Long>> sDiffStore(Publisher<SDiff
@Override
public Flux<BooleanResponse<SMoveCommand>> sMove(Publisher<SMoveCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Source key must not be null!");
Assert.notNull(command.getDestination(), "Destination key must not be null!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class LettuceReactiveClusterStringCommands extends LettuceReactiveStringCommands
@Override
public Flux<ReactiveRedisConnection.NumericResponse<BitOpCommand, Long>> bitOp(Publisher<BitOpCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
keys.add(command.getDestinationKey());
Expand All @@ -71,7 +71,7 @@ public Flux<ReactiveRedisConnection.NumericResponse<BitOpCommand, Long>> bitOp(P
@Override
public Flux<ReactiveRedisConnection.BooleanResponse<MSetCommand>> mSetNX(Publisher<MSetCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getKeyValuePairs().keySet())) {
return super.mSetNX(Mono.just(command));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class LettuceReactiveClusterZSetCommands extends LettuceReactiveZSetCommands imp
@Override
public Flux<NumericResponse<ZUnionStoreCommand, Long>> zUnionStore(Publisher<ZUnionStoreCommand> commands) {

return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty.");

Expand All @@ -65,7 +65,7 @@ public Flux<NumericResponse<ZUnionStoreCommand, Long>> zUnionStore(Publisher<ZUn
*/
@Override
public Flux<NumericResponse<ZInterStoreCommand, Long>> zInterStore(Publisher<ZInterStoreCommand> commands) {
return getConnection().execute(cmd -> Flux.from(commands).flatMap(command -> {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null or empty.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class LettuceReactiveGeoCommands implements ReactiveGeoCommands {
@Override
public Flux<NumericResponse<GeoAddCommand, Long>> geoAdd(Publisher<GeoAddCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getGeoLocations(), "Locations must not be null!");
Expand All @@ -95,7 +95,7 @@ public Flux<NumericResponse<GeoAddCommand, Long>> geoAdd(Publisher<GeoAddCommand
@Override
public Flux<CommandResponse<GeoDistCommand, Distance>> geoDist(Publisher<GeoDistCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getFrom(), "From member must not be null!");
Expand All @@ -121,7 +121,7 @@ public Flux<CommandResponse<GeoDistCommand, Distance>> geoDist(Publisher<GeoDist
@Override
public Flux<MultiValueResponse<GeoHashCommand, String>> geoHash(Publisher<GeoHashCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getMembers(), "Members must not be null!");
Expand All @@ -139,7 +139,7 @@ public Flux<MultiValueResponse<GeoHashCommand, String>> geoHash(Publisher<GeoHas
@Override
public Flux<MultiValueResponse<GeoPosCommand, Point>> geoPos(Publisher<GeoPosCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getMembers(), "Members must not be null!");
Expand All @@ -160,7 +160,7 @@ public Flux<MultiValueResponse<GeoPosCommand, Point>> geoPos(Publisher<GeoPosCom
public Flux<CommandResponse<GeoRadiusCommand, Flux<GeoResult<GeoLocation<ByteBuffer>>>>> geoRadius(
Publisher<GeoRadiusCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getPoint(), "Point must not be null!");
Expand All @@ -187,7 +187,7 @@ public Flux<CommandResponse<GeoRadiusCommand, Flux<GeoResult<GeoLocation<ByteBuf
public Flux<CommandResponse<GeoRadiusByMemberCommand, Flux<GeoResult<GeoLocation<ByteBuffer>>>>> geoRadiusByMember(
Publisher<GeoRadiusByMemberCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getMember(), "Member must not be null!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class LettuceReactiveHashCommands implements ReactiveHashCommands {
@Override
public Flux<BooleanResponse<HSetCommand>> hSet(Publisher<HSetCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getFieldValueMap(), "FieldValueMap must not be null!");
Expand Down Expand Up @@ -95,7 +95,7 @@ public Flux<BooleanResponse<HSetCommand>> hSet(Publisher<HSetCommand> commands)
@Override
public Flux<MultiValueResponse<HGetCommand, ByteBuffer>> hMGet(Publisher<HGetCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getFields(), "Fields must not be null!");
Expand All @@ -122,7 +122,7 @@ public Flux<MultiValueResponse<HGetCommand, ByteBuffer>> hMGet(Publisher<HGetCom
@Override
public Flux<BooleanResponse<HExistsCommand>> hExists(Publisher<HExistsCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getName(), "Name must not be null!");
Expand All @@ -138,7 +138,7 @@ public Flux<BooleanResponse<HExistsCommand>> hExists(Publisher<HExistsCommand> c
@Override
public Flux<NumericResponse<HDelCommand, Long>> hDel(Publisher<HDelCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");
Assert.notNull(command.getFields(), "Fields must not be null!");
Expand All @@ -155,7 +155,7 @@ public Flux<NumericResponse<HDelCommand, Long>> hDel(Publisher<HDelCommand> comm
@Override
public Flux<NumericResponse<KeyCommand, Long>> hLen(Publisher<KeyCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Command.getKey() must not be null!");

Expand All @@ -170,7 +170,7 @@ public Flux<NumericResponse<KeyCommand, Long>> hLen(Publisher<KeyCommand> comman
@Override
public Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> hKeys(Publisher<KeyCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");

Expand All @@ -187,7 +187,7 @@ public Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> hKeys(Publisher<KeyCo
@Override
public Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> hVals(Publisher<KeyCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");

Expand All @@ -205,7 +205,7 @@ public Flux<CommandResponse<KeyCommand, Flux<ByteBuffer>>> hVals(Publisher<KeyCo
public Flux<CommandResponse<KeyCommand, Flux<Map.Entry<ByteBuffer, ByteBuffer>>>> hGetAll(
Publisher<KeyCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Key must not be null!");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class LettuceReactiveHyperLogLogCommands implements ReactiveHyperLogLogCommands
@Override
public Flux<NumericResponse<PfAddCommand, Long>> pfAdd(Publisher<PfAddCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "key must not be null!");

Expand All @@ -70,7 +70,7 @@ public Flux<NumericResponse<PfAddCommand, Long>> pfAdd(Publisher<PfAddCommand> c
@Override
public Flux<NumericResponse<PfCountCommand, Long>> pfCount(Publisher<PfCountCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notEmpty(command.getKeys(), "Keys must not be empty for PFCOUNT.");

Expand All @@ -86,7 +86,7 @@ public Flux<NumericResponse<PfCountCommand, Long>> pfCount(Publisher<PfCountComm
@Override
public Flux<BooleanResponse<PfMergeCommand>> pfMerge(Publisher<PfMergeCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

Assert.notNull(command.getKey(), "Destination key must not be null for PFMERGE.");
Assert.notEmpty(command.getSourceKeys(), "Source keys must not be null for PFMERGE.");
Expand Down
Loading