Skip to content

Commit ef9b97b

Browse files
dhrubo-osb4sjoo
authored andcommitted
not stashing context if index is not system index (opensearch-project#4407)
Signed-off-by: Dhrubo Saha <dhrubo@amazon.com>
1 parent 9f52dc0 commit ef9b97b

File tree

1 file changed

+56
-4
lines changed

1 file changed

+56
-4
lines changed

ml-algorithms/src/main/java/org/opensearch/ml/engine/indices/MLIndicesHandler.java

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.io.UncheckedIOException;
3232
import java.util.HashMap;
3333
import java.util.Map;
34+
import java.util.Objects;
3435
import java.util.concurrent.atomic.AtomicBoolean;
3536

3637
import org.opensearch.OpenSearchWrapperException;
@@ -178,7 +179,11 @@ public void createSessionMemoryDataIndex(String indexName, MemoryConfiguration c
178179
|| configuration.getMemoryIndexMapping(SESSION_INDEX).isEmpty()
179180
? ALL_NODES_REPLICA_INDEX_SETTINGS
180181
: configuration.getMemoryIndexMapping(SESSION_INDEX);
181-
initIndexIfAbsent(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
182+
if (configuration.isUseSystemIndex()) {
183+
initIndexIfAbsent(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
184+
} else {
185+
initIndexWithContext(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
186+
}
182187
}
183188

184189
public void createWorkingMemoryDataIndex(String indexName, MemoryConfiguration configuration, ActionListener<Boolean> listener) {
@@ -187,7 +192,11 @@ public void createWorkingMemoryDataIndex(String indexName, MemoryConfiguration c
187192
|| configuration.getMemoryIndexMapping(WORKING_MEMORY_INDEX).isEmpty()
188193
? ALL_NODES_REPLICA_INDEX_SETTINGS
189194
: configuration.getMemoryIndexMapping(WORKING_MEMORY_INDEX);
190-
initIndexIfAbsent(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
195+
if (configuration.isUseSystemIndex()) {
196+
initIndexIfAbsent(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
197+
} else {
198+
initIndexWithContext(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
199+
}
191200
}
192201

193202
public void createLongTermMemoryHistoryIndex(String indexName, MemoryConfiguration configuration, ActionListener<Boolean> listener) {
@@ -196,7 +205,11 @@ public void createLongTermMemoryHistoryIndex(String indexName, MemoryConfigurati
196205
|| configuration.getMemoryIndexMapping(LONG_TERM_MEMORY_HISTORY_INDEX).isEmpty()
197206
? ALL_NODES_REPLICA_INDEX_SETTINGS
198207
: configuration.getMemoryIndexMapping(LONG_TERM_MEMORY_HISTORY_INDEX);
199-
initIndexIfAbsent(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
208+
if (configuration.isUseSystemIndex()) {
209+
initIndexIfAbsent(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
210+
} else {
211+
initIndexWithContext(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
212+
}
200213
}
201214

202215
/**
@@ -282,7 +295,11 @@ public void createLongTermMemoryIndex(
282295
}
283296

284297
// Initialize index with mapping and settings
285-
initIndexIfAbsent(indexName, indexMappings, indexSettings, 1, listener);
298+
if (memoryConfig.isUseSystemIndex()) {
299+
initIndexIfAbsent(indexName, indexMappings, indexSettings, 1, listener);
300+
} else {
301+
initIndexWithContext(indexName, indexMappings, indexSettings, 1, listener);
302+
}
286303
} catch (Exception e) {
287304
log.error("Failed to create long-term memory index", e);
288305
listener.onFailure(e);
@@ -298,6 +315,41 @@ public void initIndexIfAbsent(String indexName, String mapping, Integer version,
298315
initIndexIfAbsent(indexName, mapping, null, version, listener);
299316
}
300317

318+
public void initIndexWithContext(
319+
String indexName,
320+
String mapping,
321+
Map<String, Object> indexSettings,
322+
Integer version,
323+
ActionListener<Boolean> listener
324+
) {
325+
log.info("Using initIndexWithContext method to create index: {}", indexName);
326+
try {
327+
ActionListener<CreateIndexResponse> actionListener = ActionListener.wrap(r -> {
328+
if (r.isAcknowledged()) {
329+
log.info("create index:{}", indexName);
330+
listener.onResponse(true);
331+
} else {
332+
listener.onResponse(false);
333+
}
334+
}, e -> {
335+
if (e instanceof ResourceAlreadyExistsException
336+
|| (e instanceof OpenSearchWrapperException && e.getCause() instanceof ResourceAlreadyExistsException)) {
337+
log.info("Skip creating the Index:{} that is already created by another parallel request", indexName);
338+
listener.onResponse(true);
339+
} else {
340+
log.error("Failed to create index {}", indexName, e);
341+
listener.onFailure(e);
342+
}
343+
});
344+
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping, XContentType.JSON);
345+
request.settings(Objects.requireNonNullElse(indexSettings, DEFAULT_INDEX_SETTINGS));
346+
client.admin().indices().create(request, actionListener);
347+
} catch (Exception e) {
348+
log.error("Failed to init index {}", indexName, e);
349+
listener.onFailure(e);
350+
}
351+
}
352+
301353
public void initIndexIfAbsent(
302354
String indexName,
303355
String mapping,

0 commit comments

Comments
 (0)