Skip to content

Commit

Permalink
fix: correctly implement Pagination for Query by Example (QBE) with r…
Browse files Browse the repository at this point in the history
…epositories (resolves #redisgh-449)
  • Loading branch information
bsbodden committed Jun 11, 2024
1 parent d405cc3 commit 2befc95
Show file tree
Hide file tree
Showing 15 changed files with 1,045 additions and 124 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.redis.om.spring.repository.support;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
Expand All @@ -17,7 +18,8 @@
import com.redis.om.spring.repository.RedisDocumentRepository;
import com.redis.om.spring.search.stream.EntityStream;
import com.redis.om.spring.search.stream.EntityStreamImpl;
import com.redis.om.spring.search.stream.FluentQueryByExample;
import com.redis.om.spring.search.stream.RedisFluentQueryByExample;
import com.redis.om.spring.search.stream.SearchStream;
import com.redis.om.spring.serialization.gson.GsonListOfType;
import com.redis.om.spring.util.ObjectUtils;
import com.redis.om.spring.vectorize.FeatureExtractor;
Expand All @@ -26,6 +28,7 @@
import org.springframework.beans.PropertyAccessor;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.annotation.Reference;
import org.springframework.data.annotation.Version;
Expand Down Expand Up @@ -89,7 +92,9 @@ public SimpleRedisDocumentRepository( //
KeyValueOperations operations, //
@Qualifier("redisModulesOperations") RedisModulesOperations<?> rmo, //
RediSearchIndexer indexer, //
RedisMappingContext mappingContext, GsonBuilder gsonBuilder, FeatureExtractor featureExtractor, //
RedisMappingContext mappingContext, //
GsonBuilder gsonBuilder, //
FeatureExtractor featureExtractor, //
RedisOMProperties properties) {
super(metadata, operations);
this.modulesOperations = (RedisModulesOperations<String>) rmo;
Expand Down Expand Up @@ -373,7 +378,13 @@ private Number getEntityVersion(String key, String versionProperty) {

@Override
public <S extends T> Optional<S> findOne(Example<S> example) {
return entityStream.of(example.getProbeType()).filter(example).findFirst();
Iterable<S> result = findAll(example);
var size = Iterables.size(result);
if (size > 1) {
throw new IncorrectResultSizeDataAccessException("Query returned non unique result", 1);
}

return StreamSupport.stream(result.spliterator(), false).findFirst();
}

@Override
Expand All @@ -388,7 +399,13 @@ public <S extends T> Iterable<S> findAll(Example<S> example, Sort sort) {

@Override
public <S extends T> Page<S> findAll(Example<S> example, Pageable pageable) {
return pageFromSlice(entityStream.of(example.getProbeType()).filter(example).getSlice(pageable));
SearchStream<S> stream = entityStream.of(example.getProbeType());
var offset = pageable.getOffset() * pageable.getPageSize();
var limit = pageable.getPageSize();
Slice<S> slice = stream.filter(example).loadAll().limit(limit, Math.toIntExact(offset))
.toList(pageable, stream.getEntityClass());

return pageFromSlice(slice);
}

/* (non-Javadoc)
Expand Down Expand Up @@ -467,7 +484,8 @@ public <S extends T, R> R findBy(Example<S> example, Function<FetchableFluentQue
Assert.notNull(queryFunction, "Query function must not be null");

return queryFunction.apply(
new FluentQueryByExample<>(example, example.getProbeType(), entityStream, getSearchOps()));
new RedisFluentQueryByExample<>(example, example.getProbeType(), entityStream, getSearchOps(),
mappingConverter.getMappingContext()));
}

private SearchOperations<String> getSearchOps() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.redis.om.spring.repository.support;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.redis.om.spring.RedisEnhancedKeyValueAdapter;
import com.redis.om.spring.RedisOMProperties;
Expand All @@ -14,10 +15,12 @@
import com.redis.om.spring.repository.RedisEnhancedRepository;
import com.redis.om.spring.search.stream.EntityStream;
import com.redis.om.spring.search.stream.EntityStreamImpl;
import com.redis.om.spring.search.stream.FluentQueryByExample;
import com.redis.om.spring.search.stream.RedisFluentQueryByExample;
import com.redis.om.spring.search.stream.SearchStream;
import com.redis.om.spring.util.ObjectUtils;
import com.redis.om.spring.vectorize.FeatureExtractor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.data.domain.*;
import org.springframework.data.domain.Sort.Order;
import org.springframework.data.keyvalue.core.IterableConverter;
Expand Down Expand Up @@ -299,7 +302,13 @@ private boolean expires(RedisData data) {

@Override
public <S extends T> Optional<S> findOne(Example<S> example) {
return entityStream.of(example.getProbeType()).filter(example).findFirst();
Iterable<S> result = findAll(example);
var size = Iterables.size(result);
if (size > 1) {
throw new IncorrectResultSizeDataAccessException("Query returned non unique result", 1);
}

return StreamSupport.stream(result.spliterator(), false).findFirst();
}

@Override
Expand All @@ -314,7 +323,13 @@ public <S extends T> Iterable<S> findAll(Example<S> example, Sort sort) {

@Override
public <S extends T> Page<S> findAll(Example<S> example, Pageable pageable) {
return pageFromSlice(entityStream.of(example.getProbeType()).filter(example).getSlice(pageable));
SearchStream<S> stream = entityStream.of(example.getProbeType());
var offset = pageable.getOffset() * pageable.getPageSize();
var limit = pageable.getPageSize();
Slice<S> slice = stream.filter(example).loadAll().limit(limit, Math.toIntExact(offset))
.toList(pageable, stream.getEntityClass());

return pageFromSlice(slice);
}

@Override
Expand All @@ -337,7 +352,8 @@ public <S extends T, R> R findBy(Example<S> example, Function<FetchableFluentQue
Assert.notNull(queryFunction, "Query function must not be null");

return queryFunction.apply(
new FluentQueryByExample<>(example, example.getProbeType(), entityStream, getSearchOps()));
new RedisFluentQueryByExample<>(example, example.getProbeType(), entityStream, getSearchOps(),
mappingConverter.getMappingContext()));
}

private SearchOperations<String> getSearchOps() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.redis.om.spring.annotations.ReducerFunction;
import com.redis.om.spring.metamodel.MetamodelField;
import com.redis.om.spring.search.stream.aggregations.filters.AggregationFilter;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort.Order;
import redis.clients.jedis.search.aggr.AggregationResult;
Expand Down Expand Up @@ -53,7 +53,7 @@ public interface AggregationStream<T> {
// Cursor API
AggregationStream<T> cursor(int i, Duration duration);

<R extends T> Slice<R> toList(PageRequest pageRequest, Class<?>... contentTypes);
<R extends T> Slice<R> toList(Pageable pageRequest, Class<?>... contentTypes);

<R extends T> Slice<R> toList(PageRequest pageRequest, Duration duration, Class<?>... contentTypes);
<R extends T> Slice<R> toList(Pageable pageRequest, Duration duration, Class<?>... contentTypes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import com.redis.om.spring.search.stream.aggregations.filters.AggregationFilter;
import com.redis.om.spring.tuple.Tuples;
import com.redis.om.spring.util.ObjectUtils;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.data.domain.Sort.Order;
Expand Down Expand Up @@ -208,7 +208,7 @@ public AggregationStream<T> limit(int limit) {
}

@Override
public AggregationStream<T> limit(int offset, int limit) {
public AggregationStream<T> limit(int limit, int offset) {
applyCurrentGroupBy();
aggregation.limit(offset, limit);
limitSet = true;
Expand Down Expand Up @@ -387,15 +387,15 @@ public AggregationStream<T> cursor(int count, Duration timeout) {

@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public <R extends T> Slice<R> toList(PageRequest pageRequest, Class<?>... contentTypes) {
public <R extends T> Slice<R> toList(Pageable pageRequest, Class<?>... contentTypes) {
applyCurrentGroupBy();
aggregation.cursor(pageRequest.getPageSize(), 300000);
return new AggregationPage(this, pageRequest, entityClass, gson, mappingConverter, isDocument);
}

@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public <R extends T> Slice<R> toList(PageRequest pageRequest, Duration timeout, Class<?>... contentTypes) {
public <R extends T> Slice<R> toList(Pageable pageRequest, Duration timeout, Class<?>... contentTypes) {
applyCurrentGroupBy();
aggregation.cursor(pageRequest.getPageSize(), timeout.toMillis());
return new AggregationPage(this, pageRequest, entityClass, gson, mappingConverter, isDocument);
Expand Down

This file was deleted.

Loading

0 comments on commit 2befc95

Please sign in to comment.