Skip to content

Commit 3a37a45

Browse files
committed
Added a cache to store datasource metadata (opensearch-project#338)
Signed-off-by: Heemin Kim <heemin@amazon.com>
1 parent b38f82a commit 3a37a45

File tree

10 files changed

+495
-158
lines changed

10 files changed

+495
-158
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.geospatial.ip2geo.cache;
7+
8+
import java.io.IOException;
9+
import java.time.Instant;
10+
import java.util.Map;
11+
import java.util.concurrent.ConcurrentHashMap;
12+
13+
import lombok.Getter;
14+
import lombok.extern.log4j.Log4j2;
15+
16+
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
17+
import org.opensearch.common.xcontent.XContentType;
18+
import org.opensearch.core.xcontent.NamedXContentRegistry;
19+
import org.opensearch.core.xcontent.XContentParser;
20+
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
21+
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
22+
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
23+
import org.opensearch.index.engine.Engine;
24+
import org.opensearch.index.shard.IndexingOperationListener;
25+
import org.opensearch.index.shard.ShardId;
26+
27+
@Log4j2
28+
public class Ip2GeoCache implements IndexingOperationListener {
29+
private final DatasourceFacade datasourceFacade;
30+
private Map<String, DatasourceMetadata> data;
31+
32+
public Ip2GeoCache(final DatasourceFacade datasourceFacade) {
33+
this.datasourceFacade = datasourceFacade;
34+
}
35+
36+
public String getIndexName(final String datasourceName) {
37+
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getIndexName();
38+
}
39+
40+
public boolean isExpired(final String datasourceName) {
41+
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate().isBefore(Instant.now());
42+
}
43+
44+
public boolean has(final String datasourceName) {
45+
return getData().containsKey(datasourceName);
46+
}
47+
48+
public DatasourceState getState(final String datasourceName) {
49+
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getState();
50+
}
51+
52+
private Map<String, DatasourceMetadata> getData() {
53+
if (data != null) {
54+
return data;
55+
}
56+
synchronized (this) {
57+
if (data != null) {
58+
return data;
59+
}
60+
Map<String, DatasourceMetadata> tempData = new ConcurrentHashMap<>();
61+
datasourceFacade.getAllDatasources()
62+
.stream()
63+
.forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource)));
64+
data = tempData;
65+
return data;
66+
}
67+
}
68+
69+
private void put(final Datasource datasource) {
70+
DatasourceMetadata metadata = new DatasourceMetadata(datasource);
71+
getData().put(datasource.getName(), metadata);
72+
}
73+
74+
private void remove(final String datasourceName) {
75+
getData().remove(datasourceName);
76+
}
77+
78+
@Override
79+
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
80+
if (Engine.Result.Type.FAILURE.equals(result.getResultType())) {
81+
return;
82+
}
83+
84+
try {
85+
XContentParser parser = XContentType.JSON.xContent()
86+
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, index.source().utf8ToString());
87+
parser.nextToken();
88+
Datasource datasource = Datasource.PARSER.parse(parser, null);
89+
put(datasource);
90+
} catch (IOException e) {
91+
log.error("IOException occurred updating datasource metadata for datasource {} ", index.id(), e);
92+
}
93+
}
94+
95+
@Override
96+
public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {
97+
if (result.getResultType().equals(Engine.Result.Type.FAILURE)) {
98+
return;
99+
}
100+
remove(delete.id());
101+
}
102+
103+
@Getter
104+
private static class DatasourceMetadata {
105+
private static DatasourceMetadata EMPTY_METADATA = new DatasourceMetadata();
106+
private String indexName;
107+
private Instant expirationDate;
108+
private DatasourceState state;
109+
110+
private DatasourceMetadata() {
111+
expirationDate = Instant.MIN;
112+
}
113+
114+
public DatasourceMetadata(final Datasource datasource) {
115+
this.indexName = datasource.currentIndexName();
116+
this.expirationDate = datasource.expirationDay();
117+
this.state = datasource.getState();
118+
}
119+
}
120+
}

src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,23 @@ public void getAllDatasources(final ActionListener<List<Datasource>> actionListe
297297
);
298298
}
299299

300+
/**
301+
* Get all datasources up to {@code MAX_SIZE} from an index {@code DatasourceExtension.JOB_INDEX_NAME}
302+
*/
303+
public List<Datasource> getAllDatasources() {
304+
SearchResponse response = StashedThreadContext.run(
305+
client,
306+
() -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
307+
.setQuery(QueryBuilders.matchAllQuery())
308+
.setSize(MAX_SIZE)
309+
.execute()
310+
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
311+
);
312+
313+
List<BytesReference> bytesReferences = toBytesReferences(response);
314+
return bytesReferences.stream().map(bytesRef -> toDatasource(bytesRef)).collect(Collectors.toList());
315+
}
316+
300317
private <T> ActionListener<T> createGetDataSourceQueryActionLister(
301318
final Class<T> response,
302319
final ActionListener<List<Datasource>> actionListener

src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java

Lines changed: 44 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@
1717
import java.util.function.BiConsumer;
1818
import java.util.stream.Collectors;
1919

20+
import lombok.AllArgsConstructor;
2021
import lombok.Getter;
2122
import lombok.extern.log4j.Log4j2;
2223

2324
import org.opensearch.action.ActionListener;
2425
import org.opensearch.common.settings.ClusterSettings;
2526
import org.opensearch.geospatial.annotation.VisibleForTesting;
27+
import org.opensearch.geospatial.ip2geo.cache.Ip2GeoCache;
2628
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
2729
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
2830
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
29-
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
30-
import org.opensearch.index.IndexNotFoundException;
3131
import org.opensearch.ingest.AbstractProcessor;
3232
import org.opensearch.ingest.IngestDocument;
3333
import org.opensearch.ingest.IngestService;
@@ -57,6 +57,7 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
5757
private final ClusterSettings clusterSettings;
5858
private final DatasourceFacade datasourceFacade;
5959
private final GeoIpDataFacade geoIpDataFacade;
60+
private final Ip2GeoCache ip2GeoCache;
6061

6162
/**
6263
* Ip2Geo processor type
@@ -75,6 +76,7 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
7576
* @param clusterSettings the cluster settings
7677
* @param datasourceFacade the datasource facade
7778
* @param geoIpDataFacade the geoip data facade
79+
* @param ip2GeoCache the cache
7880
*/
7981
public Ip2GeoProcessor(
8082
final String tag,
@@ -86,7 +88,8 @@ public Ip2GeoProcessor(
8688
final boolean ignoreMissing,
8789
final ClusterSettings clusterSettings,
8890
final DatasourceFacade datasourceFacade,
89-
final GeoIpDataFacade geoIpDataFacade
91+
final GeoIpDataFacade geoIpDataFacade,
92+
final Ip2GeoCache ip2GeoCache
9093
) {
9194
super(tag, description);
9295
this.field = field;
@@ -97,6 +100,7 @@ public Ip2GeoProcessor(
97100
this.clusterSettings = clusterSettings;
98101
this.datasourceFacade = datasourceFacade;
99102
this.geoIpDataFacade = geoIpDataFacade;
103+
this.ip2GeoCache = ip2GeoCache;
100104
}
101105

102106
/**
@@ -149,42 +153,18 @@ protected void executeInternal(
149153
final BiConsumer<IngestDocument, Exception> handler,
150154
final String ip
151155
) {
152-
datasourceFacade.getDatasource(datasourceName, new ActionListener<>() {
153-
@Override
154-
public void onResponse(final Datasource datasource) {
155-
if (datasource == null) {
156-
handler.accept(null, new IllegalStateException("datasource is not available"));
157-
return;
158-
}
159-
160-
if (DatasourceState.AVAILABLE.equals(datasource.getState()) == false) {
161-
handler.accept(null, new IllegalStateException("datasource is not in an available state"));
162-
return;
163-
}
164-
165-
String indexName = datasource.currentIndexName();
166-
if (indexName == null) {
167-
ingestDocument.setFieldValue(targetField, DATA_EXPIRED);
168-
handler.accept(ingestDocument, null);
169-
return;
170-
}
171-
172-
try {
173-
geoIpDataFacade.getGeoIpData(indexName, ip, getSingleGeoIpDataListener(ingestDocument, handler));
174-
} catch (Exception e) {
175-
handler.accept(null, e);
176-
}
177-
}
156+
validateDatasourceIsInAvailableState(datasourceName);
157+
String indexName = ip2GeoCache.getIndexName(datasourceName);
158+
if (ip2GeoCache.isExpired(datasourceName) || indexName == null) {
159+
handleExpiredData(ingestDocument, handler);
160+
return;
161+
}
178162

179-
@Override
180-
public void onFailure(final Exception e) {
181-
if (e instanceof IndexNotFoundException) {
182-
handler.accept(null, new IllegalStateException("datasource is not available"));
183-
return;
184-
}
185-
handler.accept(null, e);
186-
}
187-
});
163+
try {
164+
geoIpDataFacade.getGeoIpData(indexName, ip, getSingleGeoIpDataListener(ingestDocument, handler));
165+
} catch (Exception e) {
166+
handler.accept(null, e);
167+
}
188168
}
189169

190170
@VisibleForTesting
@@ -228,6 +208,21 @@ private List<Map<String, Object>> filteredGeoData(final List<Map<String, Object>
228208
return geoData.stream().map(this::filteredGeoData).collect(Collectors.toList());
229209
}
230210

211+
private void validateDatasourceIsInAvailableState(final String datasourceName) {
212+
if (ip2GeoCache.has(datasourceName) == false) {
213+
throw new IllegalStateException("datasource does not exist");
214+
}
215+
216+
if (DatasourceState.AVAILABLE.equals(ip2GeoCache.getState(datasourceName)) == false) {
217+
throw new IllegalStateException("datasource is not in an available state");
218+
}
219+
}
220+
221+
private void handleExpiredData(final IngestDocument ingestDocument, final BiConsumer<IngestDocument, Exception> handler) {
222+
ingestDocument.setFieldValue(targetField, DATA_EXPIRED);
223+
handler.accept(ingestDocument, null);
224+
}
225+
231226
/**
232227
* Handle multiple ips
233228
*
@@ -246,37 +241,15 @@ protected void executeInternal(
246241
throw new IllegalArgumentException("array in field [" + field + "] should only contain strings");
247242
}
248243
}
249-
datasourceFacade.getDatasource(datasourceName, new ActionListener<>() {
250-
@Override
251-
public void onResponse(final Datasource datasource) {
252-
if (datasource == null || DatasourceState.AVAILABLE.equals(datasource.getState()) == false) {
253-
handler.accept(null, new IllegalStateException("datasource is not available"));
254-
return;
255-
}
256244

257-
String indexName = datasource.currentIndexName();
258-
if (indexName == null) {
259-
ingestDocument.setFieldValue(targetField, DATA_EXPIRED);
260-
handler.accept(ingestDocument, null);
261-
return;
262-
}
263-
264-
try {
265-
geoIpDataFacade.getGeoIpData(indexName, (List<String>) ips, getMultiGeoIpDataListener(ingestDocument, handler));
266-
} catch (Exception e) {
267-
handler.accept(null, e);
268-
}
269-
}
245+
validateDatasourceIsInAvailableState(datasourceName);
246+
String indexName = ip2GeoCache.getIndexName(datasourceName);
247+
if (ip2GeoCache.isExpired(datasourceName) || indexName == null) {
248+
handleExpiredData(ingestDocument, handler);
249+
return;
250+
}
270251

271-
@Override
272-
public void onFailure(final Exception e) {
273-
if (e instanceof IndexNotFoundException) {
274-
handler.accept(null, new IllegalStateException("datasource is not available"));
275-
return;
276-
}
277-
handler.accept(null, e);
278-
}
279-
});
252+
geoIpDataFacade.getGeoIpData(indexName, (List<String>) ips, getMultiGeoIpDataListener(ingestDocument, handler));
280253
}
281254

282255
@VisibleForTesting
@@ -312,23 +285,12 @@ public String getType() {
312285
/**
313286
* Ip2Geo processor factory
314287
*/
288+
@AllArgsConstructor
315289
public static final class Factory implements Processor.Factory {
316290
private final IngestService ingestService;
317291
private final DatasourceFacade datasourceFacade;
318292
private final GeoIpDataFacade geoIpDataFacade;
319-
320-
/**
321-
* Default constructor
322-
*
323-
* @param ingestService the ingest service
324-
* @param datasourceFacade the datasource facade
325-
* @param geoIpDataFacade the geoip data facade
326-
*/
327-
public Factory(final IngestService ingestService, final DatasourceFacade datasourceFacade, final GeoIpDataFacade geoIpDataFacade) {
328-
this.ingestService = ingestService;
329-
this.datasourceFacade = datasourceFacade;
330-
this.geoIpDataFacade = geoIpDataFacade;
331-
}
293+
private final Ip2GeoCache ip2GeoCache;
332294

333295
/**
334296
* Within this method, blocking request cannot be called because this method is executed in a transport thread.
@@ -357,7 +319,8 @@ public Ip2GeoProcessor create(
357319
ignoreMissing,
358320
ingestService.getClusterService().getClusterSettings(),
359321
datasourceFacade,
360-
geoIpDataFacade
322+
geoIpDataFacade,
323+
ip2GeoCache
361324
);
362325
}
363326
}

0 commit comments

Comments
 (0)