Skip to content

Commit

Permalink
feature: deleteAll - default find/del keys in batches pipelined, opti…
Browse files Browse the repository at this point in the history
…onal drop index with dd and rebuild
  • Loading branch information
bsbodden committed Jul 10, 2024
1 parent cfc8a00 commit 689d0f0
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.springframework.data.convert.CustomConversions;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisKeyCommands;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.core.PartialUpdate.PropertyUpdate;
import org.springframework.data.redis.core.PartialUpdate.UpdateCommand;
Expand All @@ -25,6 +26,7 @@
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import redis.clients.jedis.commands.KeyCommands;
import redis.clients.jedis.search.Query;
import redis.clients.jedis.search.SearchResult;

Expand Down Expand Up @@ -210,8 +212,30 @@ public void deleteAllOf(String keyspace) {
Class<?> type = indexer.getEntityClassForKeyspace(keyspace);
String searchIndex = indexer.getIndexName(keyspace);
SearchOperations<String> searchOps = modulesOperations.opsForSearch(searchIndex);
searchOps.dropIndexAndDocuments();
indexer.createIndexFor(type);
if (redisOMProperties.getRepository().isDropAndRecreateIndexOnDeleteAll()) {
searchOps.dropIndexAndDocuments();
indexer.createIndexFor(type);
} else {
boolean moreRecords = true;
while (moreRecords) {
Query query = new Query("*");
query.limit(0, redisOMProperties.getRepository().getDeleteBatchSize());
query.setNoContent();
SearchResult searchResult = searchOps.search(query);
if (searchResult.getTotalResults() > 0) {
List<byte[]> keys = searchResult.getDocuments().stream().map(k -> toBytes(k.getId())).toList();
redisOperations.executePipelined((RedisCallback<Object>) connection -> {
RedisKeyCommands keyCommands = connection.keyCommands();
for (byte[] key : keys) {
keyCommands.del(key);
}
return null;
});
} else {
moreRecords = false;
}
}
}
}

public <T> List<String> getAllIds(String keyspace, Class<T> type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.annotation.Reference;
import org.springframework.data.annotation.Version;
import org.springframework.data.redis.connection.RedisKeyCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisKeyValueAdapter;
import org.springframework.data.redis.core.RedisOperations;
Expand Down Expand Up @@ -212,8 +213,30 @@ public void deleteAllOf(String keyspace) {
Class<?> type = indexer.getEntityClassForKeyspace(keyspace);
String searchIndex = indexer.getIndexName(keyspace);
SearchOperations<String> searchOps = modulesOperations.opsForSearch(searchIndex);
searchOps.dropIndexAndDocuments();
indexer.createIndexFor(type);
if (redisOMProperties.getRepository().isDropAndRecreateIndexOnDeleteAll()) {
searchOps.dropIndexAndDocuments();
indexer.createIndexFor(type);
} else {
boolean moreRecords = true;
while (moreRecords) {
Query query = new Query("*");
query.limit(0, redisOMProperties.getRepository().getDeleteBatchSize());
query.setNoContent();
SearchResult searchResult = searchOps.search(query);
if (searchResult.getTotalResults() > 0) {
List<byte[]> keys = searchResult.getDocuments().stream().map(k -> toBytes(k.getId())).toList();
redisOperations.executePipelined((RedisCallback<Object>) connection -> {
RedisKeyCommands keyCommands = connection.keyCommands();
for (byte[] key : keys) {
keyCommands.del(key);
}
return null;
});
} else {
moreRecords = false;
}
}
}
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,29 @@ public Ollama getOllama() {

public static class Repository {
private final Query query = new Query();
private boolean dropAndRecreateIndexOnDeleteAll = false;
private int deleteBatchSize = 500;

public Query getQuery() {
return query;
}

public boolean isDropAndRecreateIndexOnDeleteAll() {
return dropAndRecreateIndexOnDeleteAll;
}

public void setDropAndRecreateIndexOnDeleteAll(boolean dropAndRecreateIndexOnDeleteAll) {
this.dropAndRecreateIndexOnDeleteAll = dropAndRecreateIndexOnDeleteAll;
}

public int getDeleteBatchSize() {
return deleteBatchSize;
}

public void setDeleteBatchSize(int deleteBatchSize) {
this.deleteBatchSize = deleteBatchSize;
}

public static class Query {
private int limit = MAX_SEARCH_RESULTS;
private double defaultDistance = DEFAULT_DISTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public Iterable<ID> getIds() {
Optional<Field> maybeIdField = ObjectUtils.getIdFieldForEntityClass(metadata.getJavaType());
String idField = maybeIdField.map(Field::getName).orElse("id");
Query query = new Query("*");
query.limit(0, MAX_SEARCH_RESULTS);
query.limit(0, properties.getRepository().getQuery().getLimit());
query.returnFields(idField);
SearchResult searchResult = searchOps.search(query);

Expand Down

0 comments on commit 689d0f0

Please sign in to comment.