3131import java .io .UncheckedIOException ;
3232import java .util .HashMap ;
3333import java .util .Map ;
34+ import java .util .Objects ;
3435import java .util .concurrent .atomic .AtomicBoolean ;
3536
3637import org .opensearch .OpenSearchWrapperException ;
@@ -178,7 +179,11 @@ public void createSessionMemoryDataIndex(String indexName, MemoryConfiguration c
178179 || configuration .getMemoryIndexMapping (SESSION_INDEX ).isEmpty ()
179180 ? ALL_NODES_REPLICA_INDEX_SETTINGS
180181 : configuration .getMemoryIndexMapping (SESSION_INDEX );
181- initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
182+ if (configuration .isUseSystemIndex ()) {
183+ initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
184+ } else {
185+ initIndexWithContext (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
186+ }
182187 }
183188
184189 public void createWorkingMemoryDataIndex (String indexName , MemoryConfiguration configuration , ActionListener <Boolean > listener ) {
@@ -187,7 +192,11 @@ public void createWorkingMemoryDataIndex(String indexName, MemoryConfiguration c
187192 || configuration .getMemoryIndexMapping (WORKING_MEMORY_INDEX ).isEmpty ()
188193 ? ALL_NODES_REPLICA_INDEX_SETTINGS
189194 : configuration .getMemoryIndexMapping (WORKING_MEMORY_INDEX );
190- initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
195+ if (configuration .isUseSystemIndex ()) {
196+ initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
197+ } else {
198+ initIndexWithContext (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
199+ }
191200 }
192201
193202 public void createLongTermMemoryHistoryIndex (String indexName , MemoryConfiguration configuration , ActionListener <Boolean > listener ) {
@@ -196,7 +205,11 @@ public void createLongTermMemoryHistoryIndex(String indexName, MemoryConfigurati
196205 || configuration .getMemoryIndexMapping (LONG_TERM_MEMORY_HISTORY_INDEX ).isEmpty ()
197206 ? ALL_NODES_REPLICA_INDEX_SETTINGS
198207 : configuration .getMemoryIndexMapping (LONG_TERM_MEMORY_HISTORY_INDEX );
199- initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
208+ if (configuration .isUseSystemIndex ()) {
209+ initIndexIfAbsent (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
210+ } else {
211+ initIndexWithContext (indexName , StringUtils .toJson (indexMappings ), indexSettings , 1 , listener );
212+ }
200213 }
201214
202215 /**
@@ -282,7 +295,11 @@ public void createLongTermMemoryIndex(
282295 }
283296
284297 // Initialize index with mapping and settings
285- initIndexIfAbsent (indexName , indexMappings , indexSettings , 1 , listener );
298+ if (memoryConfig .isUseSystemIndex ()) {
299+ initIndexIfAbsent (indexName , indexMappings , indexSettings , 1 , listener );
300+ } else {
301+ initIndexWithContext (indexName , indexMappings , indexSettings , 1 , listener );
302+ }
286303 } catch (Exception e ) {
287304 log .error ("Failed to create long-term memory index" , e );
288305 listener .onFailure (e );
@@ -298,6 +315,41 @@ public void initIndexIfAbsent(String indexName, String mapping, Integer version,
298315 initIndexIfAbsent (indexName , mapping , null , version , listener );
299316 }
300317
318+ public void initIndexWithContext (
319+ String indexName ,
320+ String mapping ,
321+ Map <String , Object > indexSettings ,
322+ Integer version ,
323+ ActionListener <Boolean > listener
324+ ) {
325+ log .info ("Using initIndexWithContext method to create index: {}" , indexName );
326+ try {
327+ ActionListener <CreateIndexResponse > actionListener = ActionListener .wrap (r -> {
328+ if (r .isAcknowledged ()) {
329+ log .info ("create index:{}" , indexName );
330+ listener .onResponse (true );
331+ } else {
332+ listener .onResponse (false );
333+ }
334+ }, e -> {
335+ if (e instanceof ResourceAlreadyExistsException
336+ || (e instanceof OpenSearchWrapperException && e .getCause () instanceof ResourceAlreadyExistsException )) {
337+ log .info ("Skip creating the Index:{} that is already created by another parallel request" , indexName );
338+ listener .onResponse (true );
339+ } else {
340+ log .error ("Failed to create index {}" , indexName , e );
341+ listener .onFailure (e );
342+ }
343+ });
344+ CreateIndexRequest request = new CreateIndexRequest (indexName ).mapping (mapping , XContentType .JSON );
345+ request .settings (Objects .requireNonNullElse (indexSettings , DEFAULT_INDEX_SETTINGS ));
346+ client .admin ().indices ().create (request , actionListener );
347+ } catch (Exception e ) {
348+ log .error ("Failed to init index {}" , indexName , e );
349+ listener .onFailure (e );
350+ }
351+ }
352+
301353 public void initIndexIfAbsent (
302354 String indexName ,
303355 String mapping ,
0 commit comments