3131import java .io .UncheckedIOException ;
3232import java .util .HashMap ;
3333import java .util .Map ;
34+ import java .util .Objects ;
3435import java .util .concurrent .atomic .AtomicBoolean ;
3536
3637import org .opensearch .OpenSearchWrapperException ;
@@ -175,19 +176,31 @@ public String getMapping(String mappingPath) {
175176 public void createSessionMemoryDataIndex (String indexName , MemoryConfiguration configuration , ActionListener <Boolean > listener ) {
176177 String indexMappings = getMapping (ML_MEMORY_SESSION_INDEX_MAPPING_PATH );
177178 Map <String , Object > indexSettings = configuration .getMemoryIndexMapping (SESSION_INDEX );
178- initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
179+ if (configuration .isUseSystemIndex ()) {
180+ initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
181+ } else {
182+ initIndexWithContext (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
183+ }
179184 }
180185
181186 public void createWorkingMemoryDataIndex (String indexName , MemoryConfiguration configuration , ActionListener <Boolean > listener ) {
182187 String indexMappings = getMapping (ML_WORKING_MEMORY_INDEX_MAPPING_PATH );
183188 Map <String , Object > indexSettings = configuration .getMemoryIndexMapping (WORKING_MEMORY_INDEX );
184- initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
189+ if (configuration .isUseSystemIndex ()) {
190+ initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
191+ } else {
192+ initIndexWithContext (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
193+ }
185194 }
186195
187196 public void createLongTermMemoryHistoryIndex (String indexName , MemoryConfiguration configuration , ActionListener <Boolean > listener ) {
188197 String indexMappings = getMapping (ML_LONG_MEMORY_HISTORY_INDEX_MAPPING_PATH );
189198 Map <String , Object > indexSettings = configuration .getMemoryIndexMapping (LONG_TERM_MEMORY_HISTORY_INDEX );
190- initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
199+ if (configuration .isUseSystemIndex ()) {
200+ initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
201+ } else {
202+ initIndexWithContext (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
203+ }
191204 }
192205
193206 /**
@@ -271,7 +284,11 @@ public void createLongTermMemoryIndex(
271284 }
272285
273286 // Initialize index with mapping and settings
274- initIndexIfAbsent (indexName , indexMappings , indexSettings , 1 , listener );
287+ if (memoryConfig .isUseSystemIndex ()) {
288+ initIndexIfAbsent (indexName , indexMappings , indexSettings , 1 , listener );
289+ } else {
290+ initIndexWithContext (indexName , indexMappings , indexSettings , 1 , listener );
291+ }
275292 } catch (Exception e ) {
276293 log .error ("Failed to create long-term memory index" , e );
277294 listener .onFailure (e );
@@ -287,6 +304,41 @@ public void initIndexIfAbsent(String indexName, String mapping, Integer version,
287304 initIndexIfAbsent (indexName , mapping , null , version , listener );
288305 }
289306
307+ public void initIndexWithContext (
308+ String indexName ,
309+ String mapping ,
310+ Map <String , Object > indexSettings ,
311+ Integer version ,
312+ ActionListener <Boolean > listener
313+ ) {
314+ log .info ("Using initIndexWithContext method to create index: {}" , indexName );
315+ try {
316+ ActionListener <CreateIndexResponse > actionListener = ActionListener .wrap (r -> {
317+ if (r .isAcknowledged ()) {
318+ log .info ("create index:{}" , indexName );
319+ listener .onResponse (true );
320+ } else {
321+ listener .onResponse (false );
322+ }
323+ }, e -> {
324+ if (e instanceof ResourceAlreadyExistsException
325+ || (e instanceof OpenSearchWrapperException && e .getCause () instanceof ResourceAlreadyExistsException )) {
326+ log .info ("Skip creating the Index:{} that is already created by another parallel request" , indexName );
327+ listener .onResponse (true );
328+ } else {
329+ log .error ("Failed to create index {}" , indexName , e );
330+ listener .onFailure (e );
331+ }
332+ });
333+ CreateIndexRequest request = new CreateIndexRequest (indexName ).mapping (mapping , XContentType .JSON );
334+ request .settings (Objects .requireNonNullElse (indexSettings , DEFAULT_INDEX_SETTINGS ));
335+ client .admin ().indices ().create (request , actionListener );
336+ } catch (Exception e ) {
337+ log .error ("Failed to init index {}" , indexName , e );
338+ listener .onFailure (e );
339+ }
340+ }
341+
290342 public void initIndexIfAbsent (
291343 String indexName ,
292344 String mapping ,
0 commit comments