Skip to content

Commit 2f0a0e4

Browse files
committed
Implements delete datasource API
Signed-off-by: Heemin Kim <heemin@amazon.com>
1 parent 91c609b commit 2f0a0e4

14 files changed

+648
-6
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.geospatial.ip2geo.action;
7+
8+
import org.opensearch.action.ActionType;
9+
import org.opensearch.action.support.master.AcknowledgedResponse;
10+
11+
/**
12+
* Ip2Geo datasource delete action
13+
*/
14+
public class DeleteDatasourceAction extends ActionType<AcknowledgedResponse> {
15+
/**
16+
* Delete datasource action instance
17+
*/
18+
public static final DeleteDatasourceAction INSTANCE = new DeleteDatasourceAction();
19+
/**
20+
* Delete datasource action name
21+
*/
22+
public static final String NAME = "cluster:admin/geospatial/datasource/delete";
23+
24+
private DeleteDatasourceAction() {
25+
super(NAME, AcknowledgedResponse::new);
26+
}
27+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.geospatial.ip2geo.action;
7+
8+
import java.io.IOException;
9+
10+
import lombok.AllArgsConstructor;
11+
import lombok.Getter;
12+
import lombok.Setter;
13+
14+
import org.opensearch.action.ActionRequest;
15+
import org.opensearch.action.ActionRequestValidationException;
16+
import org.opensearch.common.io.stream.StreamInput;
17+
import org.opensearch.common.io.stream.StreamOutput;
18+
19+
/**
20+
* GeoIP datasource delete request
21+
*/
22+
@Getter
23+
@Setter
24+
@AllArgsConstructor
25+
public class DeleteDatasourceRequest extends ActionRequest {
26+
/**
27+
* @param name the datasource name
28+
* @return the datasource name
29+
*/
30+
private String name;
31+
32+
/**
33+
* Constructor
34+
*
35+
* @param in the stream input
36+
* @throws IOException IOException
37+
*/
38+
public DeleteDatasourceRequest(final StreamInput in) throws IOException {
39+
super(in);
40+
this.name = in.readString();
41+
}
42+
43+
@Override
44+
public ActionRequestValidationException validate() {
45+
ActionRequestValidationException errors = null;
46+
if (name == null || name.isBlank()) {
47+
errors = new ActionRequestValidationException();
48+
errors.addValidationError("Datasource name should not be empty");
49+
}
50+
return errors;
51+
}
52+
53+
@Override
54+
public void writeTo(final StreamOutput out) throws IOException {
55+
super.writeTo(out);
56+
out.writeString(name);
57+
}
58+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.geospatial.ip2geo.action;
7+
8+
import java.io.IOException;
9+
10+
import lombok.extern.log4j.Log4j2;
11+
12+
import org.opensearch.OpenSearchException;
13+
import org.opensearch.ResourceNotFoundException;
14+
import org.opensearch.action.ActionListener;
15+
import org.opensearch.action.support.ActionFilters;
16+
import org.opensearch.action.support.HandledTransportAction;
17+
import org.opensearch.action.support.master.AcknowledgedResponse;
18+
import org.opensearch.common.inject.Inject;
19+
import org.opensearch.geospatial.annotation.VisibleForTesting;
20+
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
21+
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
22+
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
23+
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
24+
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
25+
import org.opensearch.ingest.IngestMetadata;
26+
import org.opensearch.ingest.IngestService;
27+
import org.opensearch.tasks.Task;
28+
import org.opensearch.transport.TransportService;
29+
30+
/**
31+
* Transport action to delete datasource
32+
*/
33+
@Log4j2
34+
public class DeleteDatasourceTransportAction extends HandledTransportAction<DeleteDatasourceRequest, AcknowledgedResponse> {
35+
private static final long LOCK_DURATION_IN_SECONDS = 300l;
36+
private final Ip2GeoLockService lockService;
37+
private final IngestService ingestService;
38+
private final DatasourceFacade datasourceFacade;
39+
40+
/**
41+
* Constructor
42+
* @param transportService the transport service
43+
* @param actionFilters the action filters
44+
* @param lockService the lock service
45+
* @param ingestService the ingest service
46+
* @param datasourceFacade the datasource facade
47+
*/
48+
@Inject
49+
public DeleteDatasourceTransportAction(
50+
final TransportService transportService,
51+
final ActionFilters actionFilters,
52+
final Ip2GeoLockService lockService,
53+
final IngestService ingestService,
54+
final DatasourceFacade datasourceFacade
55+
) {
56+
super(DeleteDatasourceAction.NAME, transportService, actionFilters, DeleteDatasourceRequest::new);
57+
this.lockService = lockService;
58+
this.ingestService = ingestService;
59+
this.datasourceFacade = datasourceFacade;
60+
}
61+
62+
/**
63+
* We delete datasource regardless of its state as long as we can acquire a lock
64+
*
65+
* @param task the task
66+
* @param request the request
67+
* @param listener the listener
68+
*/
69+
@Override
70+
protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
71+
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
72+
if (lock == null) {
73+
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later"));
74+
return;
75+
}
76+
try {
77+
deleteDatasource(request.getName());
78+
listener.onResponse(new AcknowledgedResponse(true));
79+
} catch (Exception e) {
80+
listener.onFailure(e);
81+
} finally {
82+
lockService.releaseLock(
83+
lock,
84+
ActionListener.wrap(
85+
released -> { log.info("Released lock for datasource[{}]", request.getName()); },
86+
exception -> { log.error("Failed to release the lock", exception); }
87+
)
88+
);
89+
}
90+
}, exception -> { listener.onFailure(exception); }));
91+
}
92+
93+
@VisibleForTesting
94+
protected void deleteDatasource(final String datasourceName) throws IOException {
95+
Datasource datasource = datasourceFacade.getDatasource(datasourceName);
96+
if (datasource == null) {
97+
throw new ResourceNotFoundException("no such datasource exist");
98+
}
99+
100+
setDatasourceStateAsDeleting(datasource);
101+
datasourceFacade.deleteDatasource(datasource);
102+
}
103+
104+
private void setDatasourceStateAsDeleting(final Datasource datasource) throws IOException {
105+
if (isSafeToDelete(datasource) == false) {
106+
throw new OpenSearchException("datasource is being used by one of processors");
107+
}
108+
109+
DatasourceState previousState = datasource.getState();
110+
datasource.setState(DatasourceState.DELETING);
111+
datasourceFacade.updateDatasource(datasource);
112+
113+
// Check again as processor might just have been created.
114+
// If it fails to update the state back to the previous state, the new processor
115+
// will fail to convert an ip to a geo data.
116+
// In such case, user have to delete the processor and delete this datasource again.
117+
if (isSafeToDelete(datasource) == false) {
118+
datasource.setState(previousState);
119+
datasourceFacade.updateDatasource(datasource);
120+
throw new OpenSearchException("datasource is being used by one of processors");
121+
}
122+
}
123+
124+
private boolean isSafeToDelete(Datasource datasource) {
125+
IngestMetadata ingestMetadata = ingestService.getClusterService().state().getMetadata().custom(IngestMetadata.TYPE);
126+
return ingestMetadata.getPipelines()
127+
.keySet()
128+
.stream()
129+
.flatMap(pipelineId -> ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class).stream())
130+
.filter(ip2GeoProcessor -> ip2GeoProcessor.getDatasourceName().equals(datasource.getName()))
131+
.findAny()
132+
.isEmpty();
133+
}
134+
}

src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public class GetDatasourceAction extends ActionType<GetDatasourceResponse> {
1616
*/
1717
public static final GetDatasourceAction INSTANCE = new GetDatasourceAction();
1818
/**
19-
* Name of a get datasource action
19+
* Get datasource action name
2020
*/
2121
public static final String NAME = "cluster:admin/geospatial/datasource/get";
2222

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.geospatial.ip2geo.action;
7+
8+
import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER;
9+
import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix;
10+
import static org.opensearch.rest.RestRequest.Method.DELETE;
11+
12+
import java.util.List;
13+
import java.util.Locale;
14+
15+
import org.opensearch.client.node.NodeClient;
16+
import org.opensearch.rest.BaseRestHandler;
17+
import org.opensearch.rest.RestRequest;
18+
import org.opensearch.rest.action.RestToXContentListener;
19+
20+
/**
21+
* Rest handler for Ip2Geo datasource delete request
22+
*/
23+
public class RestDeleteDatasourceHandler extends BaseRestHandler {
24+
private static final String ACTION_NAME = "ip2geo_datasource_delete";
25+
private static final String PARAMS_NAME = "name";
26+
27+
@Override
28+
public String getName() {
29+
return ACTION_NAME;
30+
}
31+
32+
@Override
33+
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
34+
final String name = request.param(PARAMS_NAME);
35+
final DeleteDatasourceRequest deleteDatasourceRequest = new DeleteDatasourceRequest(name);
36+
37+
return channel -> client.executeLocally(
38+
DeleteDatasourceAction.INSTANCE,
39+
deleteDatasourceRequest,
40+
new RestToXContentListener<>(channel)
41+
);
42+
}
43+
44+
@Override
45+
public List<Route> routes() {
46+
String path = String.join(URL_DELIMITER, getPluginURLPrefix(), String.format(Locale.ROOT, "ip2geo/datasource/{%s}", PARAMS_NAME));
47+
return List.of(new Route(DELETE, path));
48+
}
49+
}

src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,24 @@
1818
import java.util.Objects;
1919
import java.util.stream.Collectors;
2020

21-
import javax.swing.*;
22-
2321
import lombok.extern.log4j.Log4j2;
2422

2523
import org.opensearch.OpenSearchException;
2624
import org.opensearch.ResourceAlreadyExistsException;
25+
import org.opensearch.ResourceNotFoundException;
2726
import org.opensearch.action.ActionListener;
2827
import org.opensearch.action.DocWriteRequest;
2928
import org.opensearch.action.StepListener;
3029
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
3130
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
31+
import org.opensearch.action.delete.DeleteResponse;
3232
import org.opensearch.action.get.GetRequest;
3333
import org.opensearch.action.get.GetResponse;
3434
import org.opensearch.action.get.MultiGetItemResponse;
3535
import org.opensearch.action.get.MultiGetResponse;
3636
import org.opensearch.action.index.IndexResponse;
3737
import org.opensearch.action.search.SearchResponse;
38+
import org.opensearch.action.support.IndicesOptions;
3839
import org.opensearch.action.support.WriteRequest;
3940
import org.opensearch.client.Client;
4041
import org.opensearch.cluster.service.ClusterService;
@@ -52,6 +53,7 @@
5253
import org.opensearch.geospatial.shared.StashedThreadContext;
5354
import org.opensearch.index.IndexNotFoundException;
5455
import org.opensearch.index.query.QueryBuilders;
56+
import org.opensearch.rest.RestStatus;
5557
import org.opensearch.search.SearchHit;
5658

5759
/**
@@ -164,6 +166,38 @@ public void putDatasource(final Datasource datasource, final ActionListener list
164166
});
165167
}
166168

169+
/**
170+
* Delete datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
171+
*
172+
* @param datasource the datasource
173+
*
174+
*/
175+
public void deleteDatasource(final Datasource datasource) {
176+
if (client.admin()
177+
.indices()
178+
.prepareDelete(datasource.getIndices().toArray(new String[0]))
179+
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
180+
.execute()
181+
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
182+
.isAcknowledged() == false) {
183+
throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", datasource.getIndices()));
184+
}
185+
DeleteResponse response = client.prepareDelete()
186+
.setIndex(DatasourceExtension.JOB_INDEX_NAME)
187+
.setId(datasource.getName())
188+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
189+
.execute()
190+
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
191+
192+
if (response.status().equals(RestStatus.OK)) {
193+
log.info("deleted datasource[{}] successfully", datasource.getName());
194+
} else if (response.status().equals(RestStatus.NOT_FOUND)) {
195+
throw new ResourceNotFoundException("datasource[{}] does not exist", datasource.getName());
196+
} else {
197+
throw new OpenSearchException("failed to delete datasource[{}] with status[{}]", datasource.getName(), response.status());
198+
}
199+
}
200+
167201
/**
168202
* Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME}
169203
* @param name the name of a datasource

src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.function.BiConsumer;
2121
import java.util.stream.Collectors;
2222

23+
import lombok.Getter;
2324
import lombok.extern.log4j.Log4j2;
2425

2526
import org.opensearch.action.ActionListener;
@@ -44,6 +45,10 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
4445
private static final String PROPERTY_IP = "ip";
4546
private final String field;
4647
private final String targetField;
48+
/**
49+
* @return The datasource name
50+
*/
51+
@Getter
4752
private final String datasourceName;
4853
private final Set<String> properties;
4954
private final boolean ignoreMissing;

0 commit comments

Comments
 (0)