Skip to content

Commit

Permalink
clean up files + added logs
Browse files Browse the repository at this point in the history
Signed-off-by: Joanne Wang <jowg@amazon.com>
  • Loading branch information
jowg-amazon committed Oct 3, 2023
1 parent 994739c commit a1037b0
Show file tree
Hide file tree
Showing 31 changed files with 2,840 additions and 222 deletions.
8 changes: 8 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ repositories {
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
}

compileJava {
options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor'])
}
compileTestJava {
options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor'])
}


sourceSets.main.java.srcDirs = ['src/main/generated','src/main/java']
configurations {
zipArchive
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.securityanalytics.threatintel.action;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.master.AcknowledgedResponse;

/**
* Threat intel datasource delete action
*/
public class DeleteDatasourceAction extends ActionType<AcknowledgedResponse> {
/**
* Delete datasource action instance
*/
public static final DeleteDatasourceAction INSTANCE = new DeleteDatasourceAction();
/**
* Delete datasource action name
*/
public static final String NAME = "cluster:admin/security_analytics/datasource/delete";

private DeleteDatasourceAction() {
super(NAME, AcknowledgedResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.securityanalytics.threatintel.action;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.securityanalytics.threatintel.common.ParameterValidator;

import java.io.IOException;

/**
* Threat intel datasource delete request
*/
@Getter
@Setter
@AllArgsConstructor
public class DeleteDatasourceRequest extends ActionRequest {
private static final ParameterValidator VALIDATOR = new ParameterValidator();
/**
* @param name the datasource name
* @return the datasource name
*/
private String name;

/**
* Constructor
*
* @param in the stream input
* @throws IOException IOException
*/
public DeleteDatasourceRequest(final StreamInput in) throws IOException {
super(in);
this.name = in.readString();
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException errors = null;
if (VALIDATOR.validateDatasourceName(name).isEmpty() == false) {
errors = new ActionRequestValidationException();
errors.addValidationError("no such datasource exist");
}
return errors;
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.securityanalytics.threatintel.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;

import org.opensearch.ingest.IngestService;
import org.opensearch.securityanalytics.model.DetectorTrigger;
import org.opensearch.securityanalytics.threatintel.common.DatasourceState;
import org.opensearch.securityanalytics.threatintel.common.ThreatIntelLockService;
import org.opensearch.securityanalytics.threatintel.dao.DatasourceDao;
import org.opensearch.securityanalytics.threatintel.dao.ThreatIntelFeedDao;
import org.opensearch.securityanalytics.threatintel.jobscheduler.Datasource;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* Transport action to delete datasource
*/
public class DeleteDatasourceTransportAction extends HandledTransportAction<DeleteDatasourceRequest, AcknowledgedResponse> {
private static final Logger log = LogManager.getLogger(DetectorTrigger.class);

private static final long LOCK_DURATION_IN_SECONDS = 300l;
private final ThreatIntelLockService lockService;
private final IngestService ingestService;
private final DatasourceDao datasourceDao;
private final ThreatIntelFeedDao threatIntelFeedDao;
// private final Ip2GeoProcessorDao ip2GeoProcessorDao;
private final ThreadPool threadPool;

/**
* Constructor
* @param transportService the transport service
* @param actionFilters the action filters
* @param lockService the lock service
* @param ingestService the ingest service
* @param datasourceDao the datasource facade
*/
@Inject
public DeleteDatasourceTransportAction(
final TransportService transportService,
final ActionFilters actionFilters,
final ThreatIntelLockService lockService,
final IngestService ingestService,
final DatasourceDao datasourceDao,
final ThreatIntelFeedDao threatIntelFeedDao,
// final Ip2GeoProcessorDao ip2GeoProcessorDao,
final ThreadPool threadPool
) {
super(DeleteDatasourceAction.NAME, transportService, actionFilters, DeleteDatasourceRequest::new);
this.lockService = lockService;
this.ingestService = ingestService;
this.datasourceDao = datasourceDao;
this.threatIntelFeedDao = threatIntelFeedDao;
// this.ip2GeoProcessorDao = ip2GeoProcessorDao;
this.threadPool = threadPool;
}

/**
* We delete datasource regardless of its state as long as we can acquire a lock
*
* @param task the task
* @param request the request
* @param listener the listener
*/
@Override
protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
if (lock == null) {
listener.onFailure(
new OpenSearchStatusException("Another processor is holding a lock on the resource. Try again later", RestStatus.BAD_REQUEST)
);
log.error("Another processor is holding lock, BAD_REQUEST exception", RestStatus.BAD_REQUEST);

return;
}
try {
// TODO: makes every sub-methods as async call to avoid using a thread in generic pool
threadPool.generic().submit(() -> {
try {
deleteDatasource(request.getName());
lockService.releaseLock(lock);
listener.onResponse(new AcknowledgedResponse(true));
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
log.error("delete data source failed",e);
}
});
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
log.error("Internal server error", e);
}
}, exception -> { listener.onFailure(exception); }));
}

protected void deleteDatasource(final String datasourceName) throws IOException {
Datasource datasource = datasourceDao.getDatasource(datasourceName);
if (datasource == null) {
throw new ResourceNotFoundException("no such datasource exist");
}
DatasourceState previousState = datasource.getState();
// setDatasourceStateAsDeleting(datasource);

try {
threatIntelFeedDao.deleteThreatIntelDataIndex(datasource.getIndices());
} catch (Exception e) {
if (previousState.equals(datasource.getState()) == false) {
datasource.setState(previousState);
datasourceDao.updateDatasource(datasource);
}
throw e;
}
datasourceDao.deleteDatasource(datasource);
}

// private void setDatasourceStateAsDeleting(final Datasource datasource) {
// if (datasourceDao.getProcessors(datasource.getName()).isEmpty() == false) {
// throw new OpenSearchStatusException("datasource is being used by one of processors", RestStatus.BAD_REQUEST);
// }
//
// DatasourceState previousState = datasource.getState();
// datasource.setState(DatasourceState.DELETING);
// datasourceDao.updateDatasource(datasource);
//
// // Check again as processor might just have been created.
// // If it fails to update the state back to the previous state, the new processor
// // will fail to convert an ip to a geo data.
// // In such case, user have to delete the processor and delete this datasource again.
// if (datasourceDao.getProcessors(datasource.getName()).isEmpty() == false) {
// datasource.setState(previousState);
// datasourceDao.updateDatasource(datasource);
// throw new OpenSearchStatusException("datasource is being used by one of processors", RestStatus.BAD_REQUEST);
// }
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.securityanalytics.threatintel.action;

import org.opensearch.action.ActionType;

/**
* Threat intel datasource get action
*/
public class GetDatasourceAction extends ActionType<GetDatasourceResponse> {
/**
* Get datasource action instance
*/
public static final GetDatasourceAction INSTANCE = new GetDatasourceAction();
/**
* Get datasource action name
*/
public static final String NAME = "cluster:admin/security_analytics/datasource/get";

private GetDatasourceAction() {
super(NAME, GetDatasourceResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.securityanalytics.threatintel.action;

import lombok.Getter;
import lombok.Setter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* threat intel datasource get request
*/
@Getter
@Setter
public class GetDatasourceRequest extends ActionRequest {
/**
* @param names the datasource names
* @return the datasource names
*/
private String[] names;

/**
* Constructs a new get datasource request with a list of datasources.
*
* If the list of datasources is empty or it contains a single element "_all", all registered datasources
* are returned.
*
* @param names list of datasource names
*/
public GetDatasourceRequest(final String[] names) {
this.names = names;
}

/**
* Constructor with stream input
* @param in the stream input
* @throws IOException IOException
*/
public GetDatasourceRequest(final StreamInput in) throws IOException {
super(in);
this.names = in.readStringArray();
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException errors = null;
if (names == null) {
errors = new ActionRequestValidationException();
errors.addValidationError("names should not be null");
}
return errors;
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(names);
}
}
Loading

0 comments on commit a1037b0

Please sign in to comment.