Skip to content

Commit e1ada45

Browse files
authored
Removed parameter and settings (#332)
* Removed first_only parameter * Removed max_concurrency and batch_size setting first_only parameter was added as current geoip processor has it. However, the parameter have no benefit for ip2geo processor as we don't do a sequantial search for array data but use multi search. max_concurrency and batch_size setting is removed as these are only reveal internal implementation and could be a future blocker to improve performance later. Signed-off-by: Heemin Kim <heemin@amazon.com>
1 parent 3d4d755 commit e1ada45

File tree

8 files changed

+33
-216
lines changed

8 files changed

+33
-216
lines changed

src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
*/
6969
@Log4j2
7070
public class GeoIpDataFacade {
71+
public static final int BUNDLE_SIZE = 128;
7172
private static final String IP_RANGE_FIELD_NAME = "_cidr";
7273
private static final String DATA_FIELD_NAME = "_data";
7374
private static final Map<String, Object> INDEX_SETTING_TO_CREATE = Map.of(
@@ -279,28 +280,19 @@ public void onFailure(final Exception e) {
279280
*
280281
* @param indexName the index name
281282
* @param ipIterator the iterator of ip addresses
282-
* @param maxBundleSize number of ip address to pass in multi search
283-
* @param maxConcurrentSearches the max concurrent search requests
284-
* @param firstOnly return only the first matching result if true
285283
* @param geoIpData collected geo data
286284
* @param actionListener the action listener
287285
*/
288286
public void getGeoIpData(
289287
final String indexName,
290288
final Iterator<String> ipIterator,
291-
final Integer maxBundleSize,
292-
final Integer maxConcurrentSearches,
293-
final boolean firstOnly,
294289
final Map<String, Map<String, Object>> geoIpData,
295290
final ActionListener<Map<String, Map<String, Object>>> actionListener
296291
) {
297292
MultiSearchRequestBuilder mRequestBuilder = client.prepareMultiSearch();
298-
if (maxConcurrentSearches != 0) {
299-
mRequestBuilder.setMaxConcurrentSearchRequests(maxConcurrentSearches);
300-
}
301293

302-
List<String> ipsToSearch = new ArrayList<>(maxBundleSize);
303-
while (ipIterator.hasNext() && ipsToSearch.size() < maxBundleSize) {
294+
List<String> ipsToSearch = new ArrayList<>(BUNDLE_SIZE);
295+
while (ipIterator.hasNext() && ipsToSearch.size() < BUNDLE_SIZE) {
304296
String ip = ipIterator.next();
305297
if (geoIpData.get(ip) == null) {
306298
mRequestBuilder.add(
@@ -340,13 +332,8 @@ public void onResponse(final MultiSearchResponse items) {
340332
).v2().get(DATA_FIELD_NAME);
341333

342334
geoIpData.put(ipsToSearch.get(i), data);
343-
344-
if (firstOnly) {
345-
actionListener.onResponse(geoIpData);
346-
return;
347-
}
348335
}
349-
getGeoIpData(indexName, ipIterator, maxBundleSize, maxConcurrentSearches, firstOnly, geoIpData, actionListener);
336+
getGeoIpData(indexName, ipIterator, geoIpData, actionListener);
350337
}
351338

352339
@Override
@@ -362,20 +349,18 @@ public void onFailure(final Exception e) {
362349
* @param indexName Index name to puts the GeoIP data
363350
* @param fields Field name matching with data in CSVRecord in order
364351
* @param iterator GeoIP data to insert
365-
* @param bulkSize Bulk size of data to process
366352
* @param renewLock Runnable to renew lock
367353
*/
368354
public void putGeoIpData(
369355
@NonNull final String indexName,
370356
@NonNull final String[] fields,
371357
@NonNull final Iterator<CSVRecord> iterator,
372-
final int bulkSize,
373358
@NonNull final Runnable renewLock
374359
) throws IOException {
375360
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
376361
final BulkRequest bulkRequest = new BulkRequest();
377362
Queue<DocWriteRequest> requests = new LinkedList<>();
378-
for (int i = 0; i < bulkSize; i++) {
363+
for (int i = 0; i < BUNDLE_SIZE; i++) {
379364
requests.add(Requests.indexRequest(indexName));
380365
}
381366
while (iterator.hasNext()) {
@@ -385,7 +370,7 @@ public void putGeoIpData(
385370
indexRequest.source(document);
386371
indexRequest.id(record.get(0));
387372
bulkRequest.add(indexRequest);
388-
if (iterator.hasNext() == false || bulkRequest.requests().size() == bulkSize) {
373+
if (iterator.hasNext() == false || bulkRequest.requests().size() == BUNDLE_SIZE) {
389374
BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout));
390375
if (response.hasFailures()) {
391376
throw new OpenSearchException(

src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -51,30 +51,6 @@ public class Ip2GeoSettings {
5151
Setting.Property.Dynamic
5252
);
5353

54-
/**
55-
* Bulk size for indexing GeoIP data
56-
*/
57-
public static final Setting<Integer> INDEXING_BULK_SIZE = Setting.intSetting(
58-
"plugins.geospatial.ip2geo.datasource.indexing_bulk_size",
59-
10000,
60-
1,
61-
Setting.Property.NodeScope,
62-
Setting.Property.Dynamic
63-
);
64-
65-
/**
66-
* Multi search bundle size for GeoIP data
67-
*
68-
* Multi search is used only when a field contains a list of ip addresses.
69-
*/
70-
public static final Setting<Integer> MAX_BUNDLE_SIZE = Setting.intSetting(
71-
"plugins.geospatial.ip2geo.processor.max_bundle_size",
72-
100,
73-
1,
74-
Setting.Property.NodeScope,
75-
Setting.Property.Dynamic
76-
);
77-
7854
/**
7955
* Multi search max concurrent searches
8056
*
@@ -96,14 +72,7 @@ public class Ip2GeoSettings {
9672
* @return a list of all settings for Ip2Geo feature
9773
*/
9874
public static final List<Setting<?>> settings() {
99-
return List.of(
100-
DATASOURCE_ENDPOINT,
101-
DATASOURCE_UPDATE_INTERVAL,
102-
TIMEOUT,
103-
INDEXING_BULK_SIZE,
104-
MAX_BUNDLE_SIZE,
105-
MAX_CONCURRENT_SEARCHES
106-
);
75+
return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, MAX_CONCURRENT_SEARCHES);
10776
}
10877

10978
/**

src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
2626
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
2727
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
28-
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
2928
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
3029

3130
@Log4j2
@@ -83,13 +82,7 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable
8382
datasource.getDatabase().getFields().toString()
8483
);
8584
}
86-
geoIpDataFacade.putGeoIpData(
87-
indexName,
88-
header,
89-
reader.iterator(),
90-
clusterSettings.get(Ip2GeoSettings.INDEXING_BULK_SIZE),
91-
renewLock
92-
);
85+
geoIpDataFacade.putGeoIpData(indexName, header, reader.iterator(), renewLock);
9386
}
9487

9588
Instant endTime = Instant.now();

src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java

Lines changed: 13 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
3030
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
3131
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
32-
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
3332
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
3433
import org.opensearch.ingest.AbstractProcessor;
3534
import org.opensearch.ingest.IngestDocument;
@@ -49,7 +48,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
4948
public static final String CONFIG_DATASOURCE = "datasource";
5049
public static final String CONFIG_PROPERTIES = "properties";
5150
public static final String CONFIG_IGNORE_MISSING = "ignore_missing";
52-
public static final String CONFIG_FIRST_ONLY = "first_only";
5351

5452
private final String field;
5553
private final String targetField;
@@ -60,7 +58,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
6058
private final String datasourceName;
6159
private final Set<String> properties;
6260
private final boolean ignoreMissing;
63-
private final boolean firstOnly;
6461
private final ClusterSettings clusterSettings;
6562
private final DatasourceFacade datasourceFacade;
6663
private final GeoIpDataFacade geoIpDataFacade;
@@ -79,7 +76,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
7976
* @param datasourceName the datasourceName
8077
* @param properties the properties
8178
* @param ignoreMissing true if documents with a missing value for the field should be ignored
82-
* @param firstOnly true if only first result should be returned in case of array
8379
* @param clusterSettings the cluster settings
8480
* @param datasourceFacade the datasource facade
8581
* @param geoIpDataFacade the geoip data facade
@@ -92,7 +88,6 @@ public Ip2GeoProcessor(
9288
final String datasourceName,
9389
final Set<String> properties,
9490
final boolean ignoreMissing,
95-
final boolean firstOnly,
9691
final ClusterSettings clusterSettings,
9792
final DatasourceFacade datasourceFacade,
9893
final GeoIpDataFacade geoIpDataFacade
@@ -103,7 +98,6 @@ public Ip2GeoProcessor(
10398
this.datasourceName = datasourceName;
10499
this.properties = properties;
105100
this.ignoreMissing = ignoreMissing;
106-
this.firstOnly = firstOnly;
107101
this.clusterSettings = clusterSettings;
108102
this.datasourceFacade = datasourceFacade;
109103
this.geoIpDataFacade = geoIpDataFacade;
@@ -252,9 +246,6 @@ public void onResponse(final Datasource datasource) {
252246
geoIpDataFacade.getGeoIpData(
253247
indexName,
254248
ipList.iterator(),
255-
clusterSettings.get(Ip2GeoSettings.MAX_BUNDLE_SIZE),
256-
clusterSettings.get(Ip2GeoSettings.MAX_CONCURRENT_SEARCHES),
257-
firstOnly,
258249
data,
259250
listenerToAppendDataToDocument(data, ipList, ingestDocument, handler)
260251
);
@@ -277,33 +268,21 @@ protected ActionListener<Map<String, Map<String, Object>>> listenerToAppendDataT
277268
return new ActionListener<>() {
278269
@Override
279270
public void onResponse(final Map<String, Map<String, Object>> response) {
280-
if (firstOnly) {
281-
for (String ipAddr : ipList) {
282-
Map<String, Object> geoData = data.get(ipAddr);
283-
// GeoData for ipAddr won't be null
284-
if (geoData.isEmpty() == false) {
285-
ingestDocument.setFieldValue(targetField, filteredGeoData(geoData, ipAddr));
286-
handler.accept(ingestDocument, null);
287-
return;
288-
}
289-
}
290-
} else {
291-
boolean match = false;
292-
List<Map<String, Object>> geoDataList = new ArrayList<>(ipList.size());
293-
for (String ipAddr : ipList) {
294-
Map<String, Object> geoData = data.get(ipAddr);
295-
// GeoData for ipAddr won't be null
296-
geoDataList.add(geoData.isEmpty() ? null : filteredGeoData(geoData, ipAddr));
297-
if (geoData.isEmpty() == false) {
298-
match = true;
299-
}
300-
}
301-
if (match) {
302-
ingestDocument.setFieldValue(targetField, geoDataList);
303-
handler.accept(ingestDocument, null);
304-
return;
271+
boolean match = false;
272+
List<Map<String, Object>> geoDataList = new ArrayList<>(ipList.size());
273+
for (String ipAddr : ipList) {
274+
Map<String, Object> geoData = data.get(ipAddr);
275+
// GeoData for ipAddr won't be null
276+
geoDataList.add(geoData.isEmpty() ? null : filteredGeoData(geoData, ipAddr));
277+
if (geoData.isEmpty() == false) {
278+
match = true;
305279
}
306280
}
281+
if (match) {
282+
ingestDocument.setFieldValue(targetField, geoDataList);
283+
handler.accept(ingestDocument, null);
284+
return;
285+
}
307286
handler.accept(ingestDocument, null);
308287
}
309288

@@ -363,7 +342,6 @@ public Ip2GeoProcessor create(
363342
String datasourceName = readStringProperty(TYPE, processorTag, config, CONFIG_DATASOURCE);
364343
List<String> propertyNames = readOptionalList(TYPE, processorTag, config, CONFIG_PROPERTIES);
365344
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, CONFIG_IGNORE_MISSING, false);
366-
boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, CONFIG_FIRST_ONLY, true);
367345

368346
// Skip validation for the call by cluster applier service
369347
if (Thread.currentThread().getName().contains(CLUSTER_UPDATE_THREAD_NAME) == false) {
@@ -378,7 +356,6 @@ public Ip2GeoProcessor create(
378356
datasourceName,
379357
propertyNames == null ? null : new HashSet<>(propertyNames),
380358
ignoreMissing,
381-
firstOnly,
382359
ingestService.getClusterService().getClusterSettings(),
383360
datasourceFacade,
384361
geoIpDataFacade

src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,6 @@ protected Ip2GeoProcessor randomIp2GeoProcessor(String datasourceName) {
258258
datasourceName,
259259
properties,
260260
true,
261-
true,
262261
clusterSettings,
263262
datasourceFacade,
264263
geoIpDataFacade

src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import static org.mockito.Mockito.times;
1212
import static org.mockito.Mockito.verify;
1313
import static org.mockito.Mockito.when;
14+
import static org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade.BUNDLE_SIZE;
1415
import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX;
1516

1617
import java.io.File;
@@ -193,7 +194,7 @@ public void testPutGeoIpData_whenValidInput_thenSucceed() {
193194
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
194195
if (actionRequest instanceof BulkRequest) {
195196
BulkRequest request = (BulkRequest) actionRequest;
196-
assertEquals(1, request.numberOfActions());
197+
assertEquals(2, request.numberOfActions());
197198
BulkResponse response = mock(BulkResponse.class);
198199
when(response.hasFailures()).thenReturn(false);
199200
return response;
@@ -224,7 +225,7 @@ public void testPutGeoIpData_whenValidInput_thenSucceed() {
224225
try (CSVParser csvParser = CSVParser.parse(sampleIp2GeoFile(), StandardCharsets.UTF_8, CSVFormat.RFC4180)) {
225226
Iterator<CSVRecord> iterator = csvParser.iterator();
226227
String[] fields = iterator.next().values();
227-
verifyingGeoIpDataFacade.putGeoIpData(index, fields, iterator, 1, renewLock);
228+
verifyingGeoIpDataFacade.putGeoIpData(index, fields, iterator, renewLock);
228229
verify(renewLock, times(2)).run();
229230
}
230231
}
@@ -261,53 +262,37 @@ public void testGetSingleGeoIpData() {
261262
assertEquals("seattle", captor.getValue().get("city"));
262263
}
263264

264-
public void testGetMultipleGeoIpDataNoSearchRequired() {
265+
public void testGetGeoIpData_whenAllDataIsGathered_thenNoMoreSearch() {
265266
String indexName = GeospatialTestHelper.randomLowerCaseString();
266267
String ip1 = randomIpAddress();
267268
String ip2 = randomIpAddress();
268269
Iterator<String> ipIterator = Arrays.asList(ip1, ip2).iterator();
269-
int maxBundleSize = 1;
270-
int maxConcurrentSearches = 1;
271-
boolean firstOnly = true;
272270
Map<String, Map<String, Object>> geoData = new HashMap<>();
273271
geoData.put(ip1, Map.of("city", "Seattle"));
274272
geoData.put(ip2, Map.of("city", "Hawaii"));
275273
ActionListener<Map<String, Map<String, Object>>> actionListener = mock(ActionListener.class);
276274

277275
// Run
278-
verifyingGeoIpDataFacade.getGeoIpData(
279-
indexName,
280-
ipIterator,
281-
maxBundleSize,
282-
maxConcurrentSearches,
283-
firstOnly,
284-
geoData,
285-
actionListener
286-
);
276+
verifyingGeoIpDataFacade.getGeoIpData(indexName, ipIterator, geoData, actionListener);
287277

288278
// Verify
289279
verify(actionListener).onResponse(geoData);
290280
}
291281

292-
public void testGetMultipleGeoIpData() {
282+
public void testGetGeoIpData_whenCalled_thenGetGeoIpData() {
293283
String indexName = GeospatialTestHelper.randomLowerCaseString();
294284
int dataSize = Randomness.get().nextInt(10) + 1;
295285
List<String> ips = new ArrayList<>();
296286
for (int i = 0; i < dataSize; i++) {
297287
ips.add(randomIpAddress());
298288
}
299-
int maxBundleSize = Randomness.get().nextInt(11) + 1;
300-
int maxConcurrentSearches = 1;
301-
boolean firstOnly = false;
302289
Map<String, Map<String, Object>> geoData = new HashMap<>();
303290
ActionListener<Map<String, Map<String, Object>>> actionListener = mock(ActionListener.class);
304291

305292
List<String> cities = new ArrayList<>();
306293
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
307294
assert actionRequest instanceof MultiSearchRequest;
308295
MultiSearchRequest request = (MultiSearchRequest) actionRequest;
309-
assertEquals(maxConcurrentSearches, request.maxConcurrentSearchRequests());
310-
assertTrue(request.requests().size() == maxBundleSize || request.requests().size() == dataSize % maxBundleSize);
311296
for (SearchRequest searchRequest : request.requests()) {
312297
assertEquals("_local", searchRequest.preference());
313298
assertEquals(1, searchRequest.source().size());
@@ -341,18 +326,10 @@ public void testGetMultipleGeoIpData() {
341326
});
342327

343328
// Run
344-
verifyingGeoIpDataFacade.getGeoIpData(
345-
indexName,
346-
ips.iterator(),
347-
maxBundleSize,
348-
maxConcurrentSearches,
349-
firstOnly,
350-
geoData,
351-
actionListener
352-
);
329+
verifyingGeoIpDataFacade.getGeoIpData(indexName, ips.iterator(), geoData, actionListener);
353330

354331
// Verify
355-
verify(verifyingClient, times((dataSize + maxBundleSize - 1) / maxBundleSize)).execute(
332+
verify(verifyingClient, times((dataSize + BUNDLE_SIZE - 1) / BUNDLE_SIZE)).execute(
356333
any(ActionType.class),
357334
any(ActionRequest.class),
358335
any(ActionListener.class)

0 commit comments

Comments
 (0)