Skip to content

Commit 885ead8

Browse files
committed
Bug fix and refactoring of code
1. Bugfix: Ingest metadata can be null if there is no processor created 2. Refactoring: Moved private method to another class for better testing support 3. Refactoring: Set some private static final variable as public so that unit test can use it 4. Refactoring: Changed string value to static variable Signed-off-by: Heemin Kim <heemin@amazon.com>
1 parent 01720bf commit 885ead8

File tree

6 files changed

+164
-74
lines changed

6 files changed

+164
-74
lines changed

src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
2121
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
2222
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
23+
import org.opensearch.geospatial.ip2geo.common.Ip2GeoProcessorFacade;
2324
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
24-
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
25-
import org.opensearch.ingest.IngestMetadata;
2625
import org.opensearch.ingest.IngestService;
2726
import org.opensearch.tasks.Task;
2827
import org.opensearch.transport.TransportService;
@@ -36,6 +35,7 @@ public class DeleteDatasourceTransportAction extends HandledTransportAction<Dele
3635
private final Ip2GeoLockService lockService;
3736
private final IngestService ingestService;
3837
private final DatasourceFacade datasourceFacade;
38+
private final Ip2GeoProcessorFacade ip2GeoProcessorFacade;
3939

4040
/**
4141
* Constructor
@@ -51,12 +51,14 @@ public DeleteDatasourceTransportAction(
5151
final ActionFilters actionFilters,
5252
final Ip2GeoLockService lockService,
5353
final IngestService ingestService,
54-
final DatasourceFacade datasourceFacade
54+
final DatasourceFacade datasourceFacade,
55+
final Ip2GeoProcessorFacade ip2GeoProcessorFacade
5556
) {
5657
super(DeleteDatasourceAction.NAME, transportService, actionFilters, DeleteDatasourceRequest::new);
5758
this.lockService = lockService;
5859
this.ingestService = ingestService;
5960
this.datasourceFacade = datasourceFacade;
61+
this.ip2GeoProcessorFacade = ip2GeoProcessorFacade;
6062
}
6163

6264
/**
@@ -101,8 +103,8 @@ protected void deleteDatasource(final String datasourceName) throws IOException
101103
datasourceFacade.deleteDatasource(datasource);
102104
}
103105

104-
private void setDatasourceStateAsDeleting(final Datasource datasource) throws IOException {
105-
if (isSafeToDelete(datasource) == false) {
106+
private void setDatasourceStateAsDeleting(final Datasource datasource) {
107+
if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) {
106108
throw new OpenSearchException("datasource is being used by one of processors");
107109
}
108110

@@ -114,21 +116,10 @@ private void setDatasourceStateAsDeleting(final Datasource datasource) throws IO
114116
// If it fails to update the state back to the previous state, the new processor
115117
// will fail to convert an ip to a geo data.
116118
// In such case, user have to delete the processor and delete this datasource again.
117-
if (isSafeToDelete(datasource) == false) {
119+
if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) {
118120
datasource.setState(previousState);
119121
datasourceFacade.updateDatasource(datasource);
120122
throw new OpenSearchException("datasource is being used by one of processors");
121123
}
122124
}
123-
124-
private boolean isSafeToDelete(Datasource datasource) {
125-
IngestMetadata ingestMetadata = ingestService.getClusterService().state().getMetadata().custom(IngestMetadata.TYPE);
126-
return ingestMetadata.getPipelines()
127-
.keySet()
128-
.stream()
129-
.flatMap(pipelineId -> ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class).stream())
130-
.filter(ip2GeoProcessor -> ip2GeoProcessor.getDatasourceName().equals(datasource.getName()))
131-
.findAny()
132-
.isEmpty();
133-
}
134125
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.geospatial.ip2geo.common;
7+
8+
import java.util.Collections;
9+
import java.util.List;
10+
import java.util.stream.Collectors;
11+
12+
import org.opensearch.common.inject.Inject;
13+
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
14+
import org.opensearch.ingest.IngestMetadata;
15+
import org.opensearch.ingest.IngestService;
16+
17+
public class Ip2GeoProcessorFacade {
18+
private final IngestService ingestService;
19+
20+
@Inject
21+
public Ip2GeoProcessorFacade(final IngestService ingestService) {
22+
this.ingestService = ingestService;
23+
}
24+
25+
public List<Ip2GeoProcessor> getProcessors(final String datasourceName) {
26+
IngestMetadata ingestMetadata = ingestService.getClusterService().state().getMetadata().custom(IngestMetadata.TYPE);
27+
if (ingestMetadata == null) {
28+
return Collections.emptyList();
29+
}
30+
return ingestMetadata.getPipelines()
31+
.keySet()
32+
.stream()
33+
.flatMap(pipelineId -> ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class).stream())
34+
.filter(ip2GeoProcessor -> ip2GeoProcessor.getDatasourceName().equals(datasourceName))
35+
.collect(Collectors.toList());
36+
}
37+
}

src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
4747
public static final String CONFIG_FIELD = "field";
4848
public static final String CONFIG_TARGET_FIELD = "target_field";
4949
public static final String CONFIG_DATASOURCE = "datasource";
50-
public static final String CONFIG_PROPERTIES = "target_field";
50+
public static final String CONFIG_PROPERTIES = "properties";
5151
public static final String CONFIG_IGNORE_MISSING = "ignore_missing";
5252
public static final String CONFIG_FIRST_ONLY = "first_only";
5353

src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.Collections;
1717
import java.util.HashSet;
1818
import java.util.Locale;
19+
import java.util.Set;
1920
import java.util.concurrent.atomic.AtomicReference;
2021
import java.util.function.BiFunction;
2122
import java.util.stream.Collectors;
@@ -46,9 +47,11 @@
4647
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
4748
import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor;
4849
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
50+
import org.opensearch.geospatial.ip2geo.common.Ip2GeoProcessorFacade;
4951
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
5052
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
5153
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
54+
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
5255
import org.opensearch.ingest.IngestMetadata;
5356
import org.opensearch.ingest.IngestService;
5457
import org.opensearch.jobscheduler.spi.LockModel;
@@ -86,6 +89,8 @@ public abstract class Ip2GeoTestCase extends RestActionTestCase {
8689
protected TransportService transportService;
8790
@Mock
8891
protected Ip2GeoLockService ip2GeoLockService;
92+
@Mock
93+
protected Ip2GeoProcessorFacade ip2GeoProcessorFacade;
8994
protected IngestMetadata ingestMetadata;
9095
protected NoOpNodeClient client;
9196
protected VerifyingClient verifyingClient;
@@ -211,6 +216,28 @@ protected LockModel randomLockModel() {
211216
return lockModel;
212217
}
213218

219+
protected Ip2GeoProcessor randomIp2GeoProcessor(String datasourceName) {
220+
String tag = GeospatialTestHelper.randomLowerCaseString();
221+
String description = GeospatialTestHelper.randomLowerCaseString();
222+
String field = GeospatialTestHelper.randomLowerCaseString();
223+
String targetField = GeospatialTestHelper.randomLowerCaseString();
224+
Set<String> properties = Set.of(GeospatialTestHelper.randomLowerCaseString());
225+
Ip2GeoProcessor ip2GeoProcessor = new Ip2GeoProcessor(
226+
tag,
227+
description,
228+
field,
229+
targetField,
230+
datasourceName,
231+
properties,
232+
true,
233+
true,
234+
clusterSettings,
235+
datasourceFacade,
236+
geoIpDataFacade
237+
);
238+
return ip2GeoProcessor;
239+
}
240+
214241
/**
215242
* Temporary class of VerifyingClient until this PR(https://github.com/opensearch-project/OpenSearch/pull/7167)
216243
* is merged in OpenSearch core

src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java

Lines changed: 13 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,9 @@
1515
import static org.mockito.Mockito.when;
1616

1717
import java.io.IOException;
18-
import java.nio.ByteBuffer;
19-
import java.nio.charset.StandardCharsets;
2018
import java.time.Instant;
2119
import java.util.Arrays;
2220
import java.util.Collections;
23-
import java.util.HashMap;
24-
import java.util.Map;
25-
import java.util.Set;
2621

2722
import lombok.SneakyThrows;
2823

@@ -32,15 +27,10 @@
3227
import org.opensearch.ResourceNotFoundException;
3328
import org.opensearch.action.ActionListener;
3429
import org.opensearch.action.support.master.AcknowledgedResponse;
35-
import org.opensearch.common.bytes.BytesReference;
36-
import org.opensearch.common.xcontent.XContentType;
3730
import org.opensearch.geospatial.GeospatialTestHelper;
3831
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
3932
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
4033
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
41-
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
42-
import org.opensearch.ingest.IngestMetadata;
43-
import org.opensearch.ingest.PipelineConfiguration;
4434
import org.opensearch.jobscheduler.spi.LockModel;
4535
import org.opensearch.tasks.Task;
4636

@@ -49,7 +39,14 @@ public class DeleteDatasourceTransportActionTests extends Ip2GeoTestCase {
4939

5040
@Before
5141
public void init() {
52-
action = new DeleteDatasourceTransportAction(transportService, actionFilters, ip2GeoLockService, ingestService, datasourceFacade);
42+
action = new DeleteDatasourceTransportAction(
43+
transportService,
44+
actionFilters,
45+
ip2GeoLockService,
46+
ingestService,
47+
datasourceFacade,
48+
ip2GeoProcessorFacade
49+
);
5350
}
5451

5552
@SneakyThrows
@@ -113,6 +110,7 @@ public void testDeleteDatasource_whenNull_thenThrowException() {
113110
public void testDeleteDatasource_whenSafeToDelete_thenDelete() {
114111
Datasource datasource = randomDatasource();
115112
when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource);
113+
when(ip2GeoProcessorFacade.getProcessors(datasource.getName())).thenReturn(Collections.emptyList());
116114

117115
// Run
118116
action.deleteDatasource(datasource.getName());
@@ -128,14 +126,8 @@ public void testDeleteDatasource_whenProcessorIsUsingDatasource_thenThrowExcepti
128126
Datasource datasource = randomDatasource();
129127
datasource.setState(DatasourceState.AVAILABLE);
130128
when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource);
131-
132-
String pipelineId = GeospatialTestHelper.randomLowerCaseString();
133-
Map<String, PipelineConfiguration> pipelines = new HashMap<>();
134-
pipelines.put(pipelineId, createPipelineConfiguration());
135-
IngestMetadata ingestMetadata = new IngestMetadata(pipelines);
136-
when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata);
137-
when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn(
138-
Arrays.asList(createIp2GeoProcessor(datasource.getName()))
129+
when(ip2GeoProcessorFacade.getProcessors(datasource.getName())).thenReturn(
130+
Arrays.asList(randomIp2GeoProcessor(datasource.getName()))
139131
);
140132

141133
// Run
@@ -152,15 +144,9 @@ public void testDeleteDatasource_whenProcessorIsCreatedDuringDeletion_thenThrowE
152144
Datasource datasource = randomDatasource();
153145
datasource.setState(DatasourceState.AVAILABLE);
154146
when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource);
155-
156-
String pipelineId = GeospatialTestHelper.randomLowerCaseString();
157-
Map<String, PipelineConfiguration> pipelines = new HashMap<>();
158-
pipelines.put(pipelineId, createPipelineConfiguration());
159-
IngestMetadata ingestMetadata = new IngestMetadata(pipelines);
160-
when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata);
161-
when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn(
147+
when(ip2GeoProcessorFacade.getProcessors(datasource.getName())).thenReturn(
162148
Collections.emptyList(),
163-
Arrays.asList(createIp2GeoProcessor(datasource.getName()))
149+
Arrays.asList(randomIp2GeoProcessor(datasource.getName()))
164150
);
165151

166152
// Run
@@ -170,33 +156,4 @@ public void testDeleteDatasource_whenProcessorIsCreatedDuringDeletion_thenThrowE
170156
verify(datasourceFacade, times(2)).updateDatasource(datasource);
171157
verify(datasourceFacade, never()).deleteDatasource(datasource);
172158
}
173-
174-
private PipelineConfiguration createPipelineConfiguration() {
175-
String id = GeospatialTestHelper.randomLowerCaseString();
176-
ByteBuffer byteBuffer = ByteBuffer.wrap(GeospatialTestHelper.randomLowerCaseString().getBytes(StandardCharsets.US_ASCII));
177-
BytesReference config = BytesReference.fromByteBuffer(byteBuffer);
178-
return new PipelineConfiguration(id, config, XContentType.JSON);
179-
}
180-
181-
private Ip2GeoProcessor createIp2GeoProcessor(String datasourceName) {
182-
String tag = GeospatialTestHelper.randomLowerCaseString();
183-
String description = GeospatialTestHelper.randomLowerCaseString();
184-
String field = GeospatialTestHelper.randomLowerCaseString();
185-
String targetField = GeospatialTestHelper.randomLowerCaseString();
186-
Set<String> properties = Set.of(GeospatialTestHelper.randomLowerCaseString());
187-
Ip2GeoProcessor ip2GeoProcessor = new Ip2GeoProcessor(
188-
tag,
189-
description,
190-
field,
191-
targetField,
192-
datasourceName,
193-
properties,
194-
true,
195-
true,
196-
clusterSettings,
197-
datasourceFacade,
198-
geoIpDataFacade
199-
);
200-
return ip2GeoProcessor;
201-
}
202159
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.geospatial.ip2geo.common;
7+
8+
import static org.mockito.Mockito.when;
9+
10+
import java.nio.ByteBuffer;
11+
import java.nio.charset.StandardCharsets;
12+
import java.util.Arrays;
13+
import java.util.HashMap;
14+
import java.util.List;
15+
import java.util.Map;
16+
17+
import org.junit.Before;
18+
import org.opensearch.common.bytes.BytesReference;
19+
import org.opensearch.common.xcontent.XContentType;
20+
import org.opensearch.geospatial.GeospatialTestHelper;
21+
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
22+
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
23+
import org.opensearch.ingest.IngestMetadata;
24+
import org.opensearch.ingest.PipelineConfiguration;
25+
26+
public class Ip2GeoProcessorFacadeTests extends Ip2GeoTestCase {
27+
private Ip2GeoProcessorFacade ip2GeoProcessorFacade;
28+
29+
@Before
30+
public void init() {
31+
ip2GeoProcessorFacade = new Ip2GeoProcessorFacade(ingestService);
32+
}
33+
34+
public void testGetProcessors_whenNullMetadata_thenReturnEmpty() {
35+
String datasourceName = GeospatialTestHelper.randomLowerCaseString();
36+
when(metadata.custom(IngestMetadata.TYPE)).thenReturn(null);
37+
38+
List<Ip2GeoProcessor> ip2GeoProcessorList = ip2GeoProcessorFacade.getProcessors(datasourceName);
39+
assertTrue(ip2GeoProcessorList.isEmpty());
40+
}
41+
42+
public void testGetProcessors_whenNoProcessorForGivenDatasource_thenReturnEmpty() {
43+
String datasourceBeingUsed = GeospatialTestHelper.randomLowerCaseString();
44+
String datasourceNotBeingUsed = GeospatialTestHelper.randomLowerCaseString();
45+
String pipelineId = GeospatialTestHelper.randomLowerCaseString();
46+
Map<String, PipelineConfiguration> pipelines = new HashMap<>();
47+
pipelines.put(pipelineId, createPipelineConfiguration());
48+
IngestMetadata ingestMetadata = new IngestMetadata(pipelines);
49+
when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata);
50+
Ip2GeoProcessor ip2GeoProcessor = randomIp2GeoProcessor(datasourceBeingUsed);
51+
when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn(Arrays.asList(ip2GeoProcessor));
52+
53+
List<Ip2GeoProcessor> ip2GeoProcessorList = ip2GeoProcessorFacade.getProcessors(datasourceNotBeingUsed);
54+
assertTrue(ip2GeoProcessorList.isEmpty());
55+
}
56+
57+
public void testGetProcessors_whenProcessorsForGivenDatasource_thenReturnProcessors() {
58+
String datasourceName = GeospatialTestHelper.randomLowerCaseString();
59+
String pipelineId = GeospatialTestHelper.randomLowerCaseString();
60+
Map<String, PipelineConfiguration> pipelines = new HashMap<>();
61+
pipelines.put(pipelineId, createPipelineConfiguration());
62+
IngestMetadata ingestMetadata = new IngestMetadata(pipelines);
63+
when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata);
64+
Ip2GeoProcessor ip2GeoProcessor = randomIp2GeoProcessor(datasourceName);
65+
when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn(Arrays.asList(ip2GeoProcessor));
66+
67+
List<Ip2GeoProcessor> ip2GeoProcessorList = ip2GeoProcessorFacade.getProcessors(datasourceName);
68+
assertEquals(1, ip2GeoProcessorList.size());
69+
assertEquals(ip2GeoProcessor.getDatasourceName(), ip2GeoProcessorList.get(0).getDatasourceName());
70+
}
71+
72+
private PipelineConfiguration createPipelineConfiguration() {
73+
String id = GeospatialTestHelper.randomLowerCaseString();
74+
ByteBuffer byteBuffer = ByteBuffer.wrap(GeospatialTestHelper.randomLowerCaseString().getBytes(StandardCharsets.US_ASCII));
75+
BytesReference config = BytesReference.fromByteBuffer(byteBuffer);
76+
return new PipelineConfiguration(id, config, XContentType.JSON);
77+
}
78+
}

0 commit comments

Comments
 (0)