-
Notifications
You must be signed in to change notification settings - Fork 44
Implements delete datasource API #291
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.geospatial.ip2geo.action; | ||
|
||
import org.opensearch.action.ActionType; | ||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
|
||
/** | ||
* Ip2Geo datasource delete action | ||
*/ | ||
public class DeleteDatasourceAction extends ActionType<AcknowledgedResponse> { | ||
/** | ||
* Delete datasource action instance | ||
*/ | ||
public static final DeleteDatasourceAction INSTANCE = new DeleteDatasourceAction(); | ||
/** | ||
* Delete datasource action name | ||
*/ | ||
public static final String NAME = "cluster:admin/geospatial/datasource/delete"; | ||
|
||
private DeleteDatasourceAction() { | ||
super(NAME, AcknowledgedResponse::new); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.geospatial.ip2geo.action; | ||
|
||
import java.io.IOException; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
import lombok.Setter; | ||
|
||
import org.opensearch.action.ActionRequest; | ||
import org.opensearch.action.ActionRequestValidationException; | ||
import org.opensearch.common.io.stream.StreamInput; | ||
import org.opensearch.common.io.stream.StreamOutput; | ||
|
||
/** | ||
* GeoIP datasource delete request | ||
*/ | ||
@Getter | ||
@Setter | ||
@AllArgsConstructor | ||
public class DeleteDatasourceRequest extends ActionRequest { | ||
/** | ||
* @param name the datasource name | ||
* @return the datasource name | ||
*/ | ||
private String name; | ||
|
||
/** | ||
* Constructor | ||
* | ||
* @param in the stream input | ||
* @throws IOException IOException | ||
*/ | ||
public DeleteDatasourceRequest(final StreamInput in) throws IOException { | ||
super(in); | ||
this.name = in.readString(); | ||
} | ||
|
||
@Override | ||
public ActionRequestValidationException validate() { | ||
ActionRequestValidationException errors = null; | ||
if (name == null || name.isBlank()) { | ||
errors = new ActionRequestValidationException(); | ||
errors.addValidationError("Datasource name should not be empty"); | ||
} | ||
return errors; | ||
} | ||
|
||
@Override | ||
public void writeTo(final StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
out.writeString(name); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.geospatial.ip2geo.action; | ||
|
||
import java.io.IOException; | ||
|
||
import lombok.extern.log4j.Log4j2; | ||
|
||
import org.opensearch.OpenSearchException; | ||
import org.opensearch.ResourceNotFoundException; | ||
import org.opensearch.action.ActionListener; | ||
import org.opensearch.action.support.ActionFilters; | ||
import org.opensearch.action.support.HandledTransportAction; | ||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
import org.opensearch.common.inject.Inject; | ||
import org.opensearch.geospatial.annotation.VisibleForTesting; | ||
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; | ||
import org.opensearch.geospatial.ip2geo.common.DatasourceState; | ||
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; | ||
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; | ||
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor; | ||
import org.opensearch.ingest.IngestMetadata; | ||
import org.opensearch.ingest.IngestService; | ||
import org.opensearch.tasks.Task; | ||
import org.opensearch.transport.TransportService; | ||
|
||
/** | ||
* Transport action to delete datasource | ||
*/ | ||
@Log4j2 | ||
public class DeleteDatasourceTransportAction extends HandledTransportAction<DeleteDatasourceRequest, AcknowledgedResponse> { | ||
private static final long LOCK_DURATION_IN_SECONDS = 300l; | ||
private final Ip2GeoLockService lockService; | ||
private final IngestService ingestService; | ||
private final DatasourceFacade datasourceFacade; | ||
|
||
/** | ||
* Constructor | ||
* @param transportService the transport service | ||
* @param actionFilters the action filters | ||
* @param lockService the lock service | ||
* @param ingestService the ingest service | ||
* @param datasourceFacade the datasource facade | ||
*/ | ||
@Inject | ||
public DeleteDatasourceTransportAction( | ||
final TransportService transportService, | ||
final ActionFilters actionFilters, | ||
final Ip2GeoLockService lockService, | ||
final IngestService ingestService, | ||
final DatasourceFacade datasourceFacade | ||
) { | ||
super(DeleteDatasourceAction.NAME, transportService, actionFilters, DeleteDatasourceRequest::new); | ||
this.lockService = lockService; | ||
this.ingestService = ingestService; | ||
this.datasourceFacade = datasourceFacade; | ||
} | ||
|
||
/** | ||
* We delete datasource regardless of its state as long as we can acquire a lock | ||
* | ||
* @param task the task | ||
* @param request the request | ||
* @param listener the listener | ||
*/ | ||
@Override | ||
protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) { | ||
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { | ||
if (lock == null) { | ||
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later")); | ||
return; | ||
} | ||
try { | ||
deleteDatasource(request.getName()); | ||
listener.onResponse(new AcknowledgedResponse(true)); | ||
} catch (Exception e) { | ||
listener.onFailure(e); | ||
} finally { | ||
lockService.releaseLock( | ||
lock, | ||
ActionListener.wrap( | ||
released -> { log.info("Released lock for datasource[{}]", request.getName()); }, | ||
exception -> { log.error("Failed to release the lock", exception); } | ||
) | ||
); | ||
} | ||
}, exception -> { listener.onFailure(exception); })); | ||
} | ||
|
||
@VisibleForTesting | ||
protected void deleteDatasource(final String datasourceName) throws IOException { | ||
Datasource datasource = datasourceFacade.getDatasource(datasourceName); | ||
if (datasource == null) { | ||
throw new ResourceNotFoundException("no such datasource exist"); | ||
} | ||
|
||
setDatasourceStateAsDeleting(datasource); | ||
datasourceFacade.deleteDatasource(datasource); | ||
} | ||
|
||
private void setDatasourceStateAsDeleting(final Datasource datasource) throws IOException { | ||
if (isSafeToDelete(datasource) == false) { | ||
throw new OpenSearchException("datasource is being used by one of processors"); | ||
} | ||
|
||
DatasourceState previousState = datasource.getState(); | ||
datasource.setState(DatasourceState.DELETING); | ||
datasourceFacade.updateDatasource(datasource); | ||
|
||
// Check again as processor might just have been created. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there not a write lock that can be grabbed to delete the datasource? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Haven't implemented lock between create ip2geo processor and delete datasource. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmmm yes Im not sure what the lock would be. I would think there would be something with the JobScheduler LockService. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lock with job scheduler utilize opensearch index which cannot be used in processor creation time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like utilizing cluster state might be the best way to avoid this. In k-NN, we do something similar with the ModelGraveyard: https://github.com/opensearch-project/k-NN/blob/main/src/main/java/org/opensearch/knn/indices/ModelGraveyard.java. Basically, we add model IDs that are being deleted to a "ModelGraveyard" that is stored in the cluster state. ref: opensearch-project/k-NN#424. @naveentatikonda might be a good person to talk to more about this - he implemented it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. The thing is, we didn't want to add any custom metadata during design review meeting as it can cause a headache in the future due to many restriction on maintaining backward compatibility. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, might be something to consider in the future if this becomes a problem. |
||
// If it fails to update the state back to the previous state, the new processor | ||
// will fail to convert an ip to a geo data. | ||
// In such case, user have to delete the processor and delete this datasource again. | ||
if (isSafeToDelete(datasource) == false) { | ||
datasource.setState(previousState); | ||
datasourceFacade.updateDatasource(datasource); | ||
throw new OpenSearchException("datasource is being used by one of processors"); | ||
} | ||
} | ||
|
||
private boolean isSafeToDelete(Datasource datasource) { | ||
IngestMetadata ingestMetadata = ingestService.getClusterService().state().getMetadata().custom(IngestMetadata.TYPE); | ||
return ingestMetadata.getPipelines() | ||
.keySet() | ||
.stream() | ||
.flatMap(pipelineId -> ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class).stream()) | ||
.filter(ip2GeoProcessor -> ip2GeoProcessor.getDatasourceName().equals(datasource.getName())) | ||
.findAny() | ||
.isEmpty(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.geospatial.ip2geo.action; | ||
|
||
import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER; | ||
import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix; | ||
import static org.opensearch.rest.RestRequest.Method.DELETE; | ||
|
||
import java.util.List; | ||
import java.util.Locale; | ||
|
||
import org.opensearch.client.node.NodeClient; | ||
import org.opensearch.rest.BaseRestHandler; | ||
import org.opensearch.rest.RestRequest; | ||
import org.opensearch.rest.action.RestToXContentListener; | ||
|
||
/** | ||
* Rest handler for Ip2Geo datasource delete request | ||
*/ | ||
public class RestDeleteDatasourceHandler extends BaseRestHandler { | ||
private static final String ACTION_NAME = "ip2geo_datasource_delete"; | ||
private static final String PARAMS_NAME = "name"; | ||
|
||
@Override | ||
public String getName() { | ||
return ACTION_NAME; | ||
} | ||
|
||
@Override | ||
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) { | ||
final String name = request.param(PARAMS_NAME); | ||
final DeleteDatasourceRequest deleteDatasourceRequest = new DeleteDatasourceRequest(name); | ||
|
||
return channel -> client.executeLocally( | ||
DeleteDatasourceAction.INSTANCE, | ||
deleteDatasourceRequest, | ||
new RestToXContentListener<>(channel) | ||
); | ||
} | ||
|
||
@Override | ||
public List<Route> routes() { | ||
String path = String.join(URL_DELIMITER, getPluginURLPrefix(), String.format(Locale.ROOT, "ip2geo/datasource/{%s}", PARAMS_NAME)); | ||
return List.of(new Route(DELETE, path)); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.