Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

import org.opensearch.OpenSearchWrapperException;
Expand Down Expand Up @@ -175,19 +176,31 @@ public String getMapping(String mappingPath) {
public void createSessionMemoryDataIndex(String indexName, MemoryConfiguration configuration, ActionListener<Boolean> listener) {
String indexMappings = getMapping(ML_MEMORY_SESSION_INDEX_MAPPING_PATH);
Map<String, Object> indexSettings = configuration.getMemoryIndexMapping(SESSION_INDEX);
initIndexIfAbsent(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
if (configuration.isUseSystemIndex()) {
initIndexIfAbsent(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
} else {
initIndexWithContext(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
}
}

public void createWorkingMemoryDataIndex(String indexName, MemoryConfiguration configuration, ActionListener<Boolean> listener) {
String indexMappings = getMapping(ML_WORKING_MEMORY_INDEX_MAPPING_PATH);
Map<String, Object> indexSettings = configuration.getMemoryIndexMapping(WORKING_MEMORY_INDEX);
initIndexIfAbsent(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
if (configuration.isUseSystemIndex()) {
initIndexIfAbsent(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
} else {
initIndexWithContext(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
}
}

public void createLongTermMemoryHistoryIndex(String indexName, MemoryConfiguration configuration, ActionListener<Boolean> listener) {
String indexMappings = getMapping(ML_LONG_MEMORY_HISTORY_INDEX_MAPPING_PATH);
Map<String, Object> indexSettings = configuration.getMemoryIndexMapping(LONG_TERM_MEMORY_HISTORY_INDEX);
initIndexIfAbsent(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
if (configuration.isUseSystemIndex()) {
initIndexIfAbsent(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
} else {
initIndexWithContext(indexName, StringUtils.toJson(indexMappings), indexSettings, 1, listener);
}
}

/**
Expand Down Expand Up @@ -271,7 +284,11 @@ public void createLongTermMemoryIndex(
}

// Initialize index with mapping and settings
initIndexIfAbsent(indexName, indexMappings, indexSettings, 1, listener);
if (memoryConfig.isUseSystemIndex()) {
initIndexIfAbsent(indexName, indexMappings, indexSettings, 1, listener);
} else {
initIndexWithContext(indexName, indexMappings, indexSettings, 1, listener);
}
} catch (Exception e) {
log.error("Failed to create long-term memory index", e);
listener.onFailure(e);
Expand All @@ -287,6 +304,41 @@ public void initIndexIfAbsent(String indexName, String mapping, Integer version,
initIndexIfAbsent(indexName, mapping, null, version, listener);
}

public void initIndexWithContext(
String indexName,
String mapping,
Map<String, Object> indexSettings,
Integer version,
ActionListener<Boolean> listener
) {
log.info("Using initIndexWithContext method to create index: {}", indexName);
try {
ActionListener<CreateIndexResponse> actionListener = ActionListener.wrap(r -> {
if (r.isAcknowledged()) {
log.info("create index:{}", indexName);
listener.onResponse(true);
} else {
listener.onResponse(false);
}
}, e -> {
if (e instanceof ResourceAlreadyExistsException
|| (e instanceof OpenSearchWrapperException && e.getCause() instanceof ResourceAlreadyExistsException)) {
log.info("Skip creating the Index:{} that is already created by another parallel request", indexName);
listener.onResponse(true);
} else {
log.error("Failed to create index {}", indexName, e);
listener.onFailure(e);
}
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping, XContentType.JSON);
request.settings(Objects.requireNonNullElse(indexSettings, DEFAULT_INDEX_SETTINGS));
client.admin().indices().create(request, actionListener);
} catch (Exception e) {
log.error("Failed to init index {}", indexName, e);
listener.onFailure(e);
}
}

public void initIndexIfAbsent(
String indexName,
String mapping,
Expand Down
Loading