Skip to content

Commit

Permalink
Ensure that Redis pipelines are closed after usage (langchain4j#449)
Browse files Browse the repository at this point in the history
Hi !

Not closing Redis pipeline calls prevent Jedis pooled connections to be
returned in the pool, as `Pipeline` object are created for each
`addAllInternal` calls.

This change ensure that the `Pipeline` object is closed, as soon as it
is no longer required.

Hope this helps,
Guillaume
  • Loading branch information
gdarmont authored Jan 8, 2024
1 parent a70acf5 commit 8168f74
Showing 1 changed file with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,24 +158,28 @@ private void addAllInternal(List<String> ids, List<Embedding> embeddings, List<T
ensureTrue(ids.size() == embeddings.size(), "ids size is not equal to embeddings size");
ensureTrue(embedded == null || embeddings.size() == embedded.size(), "embeddings size is not equal to embedded size");

Pipeline pipeline = client.pipelined();

int size = ids.size();
for (int i = 0; i < size; i++) {
String id = ids.get(i);
Embedding embedding = embeddings.get(i);
TextSegment textSegment = embedded == null ? null : embedded.get(i);
Map<String, Object> fields = new HashMap<>();
fields.put(schema.getVectorFieldName(), embedding.vector());
if (textSegment != null) {
// do not check metadata key is included in RedisSchema#metadataFieldsName
fields.put(schema.getScalarFieldName(), textSegment.text());
fields.putAll(textSegment.metadata().asMap());
List<Object> responses;
try (Pipeline pipeline = client.pipelined()) {

int size = ids.size();
for (int i = 0; i < size; i++) {
String id = ids.get(i);
Embedding embedding = embeddings.get(i);
TextSegment textSegment = embedded == null ? null : embedded.get(i);
Map<String, Object> fields = new HashMap<>();
fields.put(schema.getVectorFieldName(), embedding.vector());
if (textSegment != null) {
// do not check metadata key is included in RedisSchema#metadataFieldsName
fields.put(schema.getScalarFieldName(), textSegment.text());
fields.putAll(textSegment.metadata().asMap());
}
String key = schema.getPrefix() + id;
pipeline.jsonSetWithEscape(key, Path2.of("$"), fields);
}
String key = schema.getPrefix() + id;
pipeline.jsonSetWithEscape(key, Path2.of("$"), fields);

responses = pipeline.syncAndReturnAll();
}
List<Object> responses = pipeline.syncAndReturnAll();

Optional<Object> errResponse = responses.stream().filter(response -> !"OK".equals(response)).findAny();
if (errResponse.isPresent()) {
if (log.isErrorEnabled()) {
Expand Down

0 comments on commit 8168f74

Please sign in to comment.