Skip to content

Commit 91c609b

Browse files
authored
Run system index handling code with stashed thread context (#297)
Signed-off-by: Heemin Kim <heemin@amazon.com>
1 parent e235a95 commit 91c609b

File tree

3 files changed

+142
-72
lines changed

3 files changed

+142
-72
lines changed

src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.opensearch.core.xcontent.XContentParser;
5050
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
5151
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
52+
import org.opensearch.geospatial.shared.StashedThreadContext;
5253
import org.opensearch.index.IndexNotFoundException;
5354
import org.opensearch.index.query.QueryBuilders;
5455
import org.opensearch.search.SearchHit;
@@ -89,7 +90,7 @@ public void createIndexIfNotExists(final StepListener<Void> stepListener) {
8990
indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2());
9091
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping())
9192
.settings(indexSettings);
92-
client.admin().indices().create(createIndexRequest, new ActionListener<>() {
93+
StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, new ActionListener<>() {
9394
@Override
9495
public void onResponse(final CreateIndexResponse createIndexResponse) {
9596
stepListener.onResponse(null);
@@ -104,7 +105,7 @@ public void onFailure(final Exception e) {
104105
}
105106
stepListener.onFailure(e);
106107
}
107-
});
108+
}));
108109
}
109110

110111
private String getIndexMapping() {
@@ -123,34 +124,44 @@ private String getIndexMapping() {
123124
* Update datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
124125
* @param datasource the datasource
125126
* @return index response
126-
* @throws IOException exception
127127
*/
128-
public IndexResponse updateDatasource(final Datasource datasource) throws IOException {
128+
public IndexResponse updateDatasource(final Datasource datasource) {
129129
datasource.setLastUpdateTime(Instant.now());
130-
return client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
131-
.setId(datasource.getName())
132-
.setOpType(DocWriteRequest.OpType.INDEX)
133-
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
134-
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
135-
.execute()
136-
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
130+
return StashedThreadContext.run(client, () -> {
131+
try {
132+
return client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
133+
.setId(datasource.getName())
134+
.setOpType(DocWriteRequest.OpType.INDEX)
135+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
136+
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
137+
.execute()
138+
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
139+
} catch (IOException e) {
140+
throw new RuntimeException(e);
141+
}
142+
});
137143
}
138144

139145
/**
140146
* Put datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
141147
*
142148
* @param datasource the datasource
143149
* @param listener the listener
144-
* @throws IOException exception
145150
*/
146-
public void putDatasource(final Datasource datasource, final ActionListener listener) throws IOException {
151+
public void putDatasource(final Datasource datasource, final ActionListener listener) {
147152
datasource.setLastUpdateTime(Instant.now());
148-
client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
149-
.setId(datasource.getName())
150-
.setOpType(DocWriteRequest.OpType.CREATE)
151-
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
152-
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
153-
.execute(listener);
153+
StashedThreadContext.run(client, () -> {
154+
try {
155+
client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
156+
.setId(datasource.getName())
157+
.setOpType(DocWriteRequest.OpType.CREATE)
158+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
159+
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
160+
.execute(listener);
161+
} catch (IOException e) {
162+
new RuntimeException(e);
163+
}
164+
});
154165
}
155166

156167
/**
@@ -163,7 +174,7 @@ public Datasource getDatasource(final String name) throws IOException {
163174
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name);
164175
GetResponse response;
165176
try {
166-
response = client.get(request).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
177+
response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)));
167178
if (response.isExists() == false) {
168179
log.error("Datasource[{}] does not exist in an index[{}]", name, DatasourceExtension.JOB_INDEX_NAME);
169180
return null;
@@ -188,7 +199,7 @@ public Datasource getDatasource(final String name) throws IOException {
188199
*/
189200
public void getDatasource(final String name, final ActionListener<Datasource> actionListener) {
190201
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name);
191-
client.get(request, new ActionListener<GetResponse>() {
202+
StashedThreadContext.run(client, () -> client.get(request, new ActionListener<>() {
192203
@Override
193204
public void onResponse(final GetResponse response) {
194205
if (response.isExists() == false) {
@@ -212,7 +223,7 @@ public void onResponse(final GetResponse response) {
212223
public void onFailure(final Exception e) {
213224
actionListener.onFailure(e);
214225
}
215-
});
226+
}));
216227
}
217228

218229
/**
@@ -221,20 +232,26 @@ public void onFailure(final Exception e) {
221232
* @param actionListener the action listener
222233
*/
223234
public void getDatasources(final String[] names, final ActionListener<List<Datasource>> actionListener) {
224-
client.prepareMultiGet()
225-
.add(DatasourceExtension.JOB_INDEX_NAME, names)
226-
.execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener));
235+
StashedThreadContext.run(
236+
client,
237+
() -> client.prepareMultiGet()
238+
.add(DatasourceExtension.JOB_INDEX_NAME, names)
239+
.execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener))
240+
);
227241
}
228242

229243
/**
230244
* Get all datasources up to {@code MAX_SIZE} from an index {@code DatasourceExtension.JOB_INDEX_NAME}
231245
* @param actionListener the action listener
232246
*/
233247
public void getAllDatasources(final ActionListener<List<Datasource>> actionListener) {
234-
client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
235-
.setQuery(QueryBuilders.matchAllQuery())
236-
.setSize(MAX_SIZE)
237-
.execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener));
248+
StashedThreadContext.run(
249+
client,
250+
() -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
251+
.setQuery(QueryBuilders.matchAllQuery())
252+
.setSize(MAX_SIZE)
253+
.execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener))
254+
);
238255
}
239256

240257
private <T> ActionListener<T> createGetDataSourceQueryActionLister(

src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.opensearch.common.unit.TimeValue;
5353
import org.opensearch.common.xcontent.XContentHelper;
5454
import org.opensearch.common.xcontent.XContentType;
55+
import org.opensearch.geospatial.shared.StashedThreadContext;
5556
import org.opensearch.index.query.QueryBuilders;
5657

5758
/**
@@ -92,7 +93,10 @@ public void createIndexIfNotExists(final String indexName) {
9293
indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2());
9394
indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2());
9495
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(indexSettings).mapping(getIndexMapping());
95-
client.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
96+
StashedThreadContext.run(
97+
client,
98+
() -> client.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
99+
);
96100
}
97101

98102
/**
@@ -207,31 +211,34 @@ public String createDocument(final String[] fields, final String[] values) {
207211
* @param actionListener action listener
208212
*/
209213
public void getGeoIpData(final String indexName, final String ip, final ActionListener<Map<String, Object>> actionListener) {
210-
client.prepareSearch(indexName)
211-
.setSize(1)
212-
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
213-
.setPreference("_local")
214-
.setRequestCache(true)
215-
.execute(new ActionListener<>() {
216-
@Override
217-
public void onResponse(final SearchResponse searchResponse) {
218-
if (searchResponse.getHits().getHits().length == 0) {
219-
actionListener.onResponse(Collections.emptyMap());
220-
} else {
221-
Map<String, Object> geoIpData = (Map<String, Object>) XContentHelper.convertToMap(
222-
searchResponse.getHits().getAt(0).getSourceRef(),
223-
false,
224-
XContentType.JSON
225-
).v2().get(DATA_FIELD_NAME);
226-
actionListener.onResponse(geoIpData);
214+
StashedThreadContext.run(
215+
client,
216+
() -> client.prepareSearch(indexName)
217+
.setSize(1)
218+
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
219+
.setPreference("_local")
220+
.setRequestCache(true)
221+
.execute(new ActionListener<>() {
222+
@Override
223+
public void onResponse(final SearchResponse searchResponse) {
224+
if (searchResponse.getHits().getHits().length == 0) {
225+
actionListener.onResponse(Collections.emptyMap());
226+
} else {
227+
Map<String, Object> geoIpData = (Map<String, Object>) XContentHelper.convertToMap(
228+
searchResponse.getHits().getAt(0).getSourceRef(),
229+
false,
230+
XContentType.JSON
231+
).v2().get(DATA_FIELD_NAME);
232+
actionListener.onResponse(geoIpData);
233+
}
227234
}
228-
}
229235

230-
@Override
231-
public void onFailure(final Exception e) {
232-
actionListener.onFailure(e);
233-
}
234-
});
236+
@Override
237+
public void onFailure(final Exception e) {
238+
actionListener.onFailure(e);
239+
}
240+
})
241+
);
235242
}
236243

237244
/**
@@ -281,7 +288,7 @@ public void getGeoIpData(
281288
return;
282289
}
283290

284-
mRequestBuilder.execute(new ActionListener<>() {
291+
StashedThreadContext.run(client, () -> mRequestBuilder.execute(new ActionListener<>() {
285292
@Override
286293
public void onResponse(final MultiSearchResponse items) {
287294
for (int i = 0; i < ipsToSearch.size(); i++) {
@@ -315,7 +322,7 @@ public void onResponse(final MultiSearchResponse items) {
315322
public void onFailure(final Exception e) {
316323
actionListener.onFailure(e);
317324
}
318-
});
325+
}));
319326
}
320327

321328
/**
@@ -328,32 +335,34 @@ public void onFailure(final Exception e) {
328335
*/
329336
public void putGeoIpData(final String indexName, final String[] fields, final Iterator<CSVRecord> iterator, final int bulkSize) {
330337
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
331-
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
338+
final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
332339
while (iterator.hasNext()) {
333340
CSVRecord record = iterator.next();
334341
String document = createDocument(fields, record.values());
335342
IndexRequest request = Requests.indexRequest(indexName).source(document, XContentType.JSON);
336343
bulkRequest.add(request);
337344
if (iterator.hasNext() == false || bulkRequest.requests().size() == bulkSize) {
338-
BulkResponse response = client.bulk(bulkRequest).actionGet(timeout);
345+
BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout));
339346
if (response.hasFailures()) {
340347
throw new OpenSearchException(
341348
"error occurred while ingesting GeoIP data in {} with an error {}",
342349
indexName,
343350
response.buildFailureMessage()
344351
);
345352
}
346-
bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
353+
bulkRequest.requests().clear();
347354
}
348355
}
349-
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
350-
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
351-
client.admin()
352-
.indices()
353-
.prepareUpdateSettings(indexName)
354-
.setSettings(Map.of(INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v1(), INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v2()))
355-
.execute()
356-
.actionGet(timeout);
356+
StashedThreadContext.run(client, () -> {
357+
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
358+
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
359+
client.admin()
360+
.indices()
361+
.prepareUpdateSettings(indexName)
362+
.setSettings(Map.of(INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v1(), INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v2()))
363+
.execute()
364+
.actionGet(timeout);
365+
});
357366
}
358367

359368
public AcknowledgedResponse deleteIp2GeoDataIndex(final String index) {
@@ -364,11 +373,14 @@ public AcknowledgedResponse deleteIp2GeoDataIndex(final String index) {
364373
IP2GEO_DATA_INDEX_NAME_PREFIX
365374
);
366375
}
367-
return client.admin()
368-
.indices()
369-
.prepareDelete(index)
370-
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
371-
.execute()
372-
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
376+
return StashedThreadContext.run(
377+
client,
378+
() -> client.admin()
379+
.indices()
380+
.prepareDelete(index)
381+
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
382+
.execute()
383+
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
384+
);
373385
}
374386
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.geospatial.shared;
7+
8+
import java.util.function.Supplier;
9+
10+
import org.opensearch.client.Client;
11+
import org.opensearch.common.util.concurrent.ThreadContext;
12+
13+
/**
14+
* Helper class to run code with stashed thread context
15+
*
16+
* Code need to be run with stashed thread context if it interacts with system index
17+
* when security plugin is enabled.
18+
*/
19+
public class StashedThreadContext {
20+
/**
21+
* Set the thread context to default, this is needed to allow actions on model system index
22+
* when security plugin is enabled
23+
* @param function runnable that needs to be executed after thread context has been stashed, accepts and returns nothing
24+
*/
25+
public static void run(final Client client, final Runnable function) {
26+
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
27+
function.run();
28+
}
29+
}
30+
31+
/**
32+
* Set the thread context to default, this is needed to allow actions on model system index
33+
* when security plugin is enabled
34+
* @param function supplier function that needs to be executed after thread context has been stashed, return object
35+
*/
36+
public static <T> T run(final Client client, final Supplier<T> function) {
37+
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
38+
return function.get();
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)