From c19377384d4265a45ed613b652675ea5a363a20c Mon Sep 17 00:00:00 2001 From: Joanne Wang <109310487+jowg-amazon@users.noreply.github.com> Date: Wed, 4 Oct 2023 19:03:06 -0700 Subject: [PATCH] Preliminary framework for jobscheduler and datasource (#626) Signed-off-by: Joanne Wang --- build.gradle | 2 + .../SecurityAnalyticsPlugin.java | 2 +- .../monitors/opensearch_security.policy | 3 + .../ThreatIntelFeedDataService.java | 248 +++++- .../threatIntel/common/Constants.java | 9 + .../action/DeleteDatasourceAction.java | 27 + .../action/DeleteDatasourceRequest.java | 62 ++ .../DeleteDatasourceTransportAction.java | 152 ++++ .../action/GetDatasourceAction.java | 26 + .../action/GetDatasourceRequest.java | 66 ++ .../action/GetDatasourceResponse.java | 81 ++ .../action/GetDatasourceTransportAction.java | 79 ++ .../action/PutDatasourceAction.java | 27 + .../action/PutDatasourceRequest.java | 267 ++++++ .../action/PutDatasourceTransportAction.java | 182 ++++ .../action/RestDeleteDatasourceHandler.java | 48 + .../action/RestGetDatasourceHandler.java | 44 + .../action/RestPutDatasourceHandler.java | 71 ++ .../action/RestUpdateDatasourceHandler.java | 50 ++ .../action/UpdateDatasourceAction.java | 27 + .../action/UpdateDatasourceRequest.java | 190 ++++ .../UpdateDatasourceTransportAction.java | 179 ++++ .../common/DatasourceManifest.java | 168 ++++ .../threatintel/common/DatasourceState.java | 37 + .../common/ParameterValidator.java | 58 ++ .../common/StashedThreadContext.java | 42 + .../common/ThreatIntelExecutor.java | 45 + .../common/ThreatIntelLockService.java | 167 ++++ .../common/ThreatIntelSettings.java | 103 +++ .../threatintel/dao/DatasourceDao.java | 380 ++++++++ .../threatintel/jobscheduler/Datasource.java | 819 ++++++++++++++++++ .../jobscheduler/DatasourceExtension.java | 47 + .../jobscheduler/DatasourceRunner.java | 159 ++++ .../jobscheduler/DatasourceTask.java | 21 + .../jobscheduler/DatasourceUpdateService.java | 296 +++++++ ...rch.jobscheduler.spi.JobSchedulerExtension | 1 + .../securityanalytics/TestHelpers.java | 2 +- .../findings/FindingServiceTests.java | 6 + 38 files changed, 4187 insertions(+), 6 deletions(-) create mode 100644 src/main/java/org/opensearch/securityanalytics/config/monitors/opensearch_security.policy create mode 100644 src/main/java/org/opensearch/securityanalytics/threatIntel/common/Constants.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceAction.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceRequest.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceTransportAction.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceAction.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceRequest.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceResponse.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceTransportAction.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/PutDatasourceAction.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/PutDatasourceRequest.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/PutDatasourceTransportAction.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/RestDeleteDatasourceHandler.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/RestGetDatasourceHandler.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/RestPutDatasourceHandler.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/RestUpdateDatasourceHandler.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/UpdateDatasourceAction.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/UpdateDatasourceRequest.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/action/UpdateDatasourceTransportAction.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/common/DatasourceManifest.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/common/DatasourceState.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/common/ParameterValidator.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/common/StashedThreadContext.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/common/ThreatIntelExecutor.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/common/ThreatIntelLockService.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/common/ThreatIntelSettings.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/dao/DatasourceDao.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/Datasource.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceExtension.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceRunner.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceTask.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceUpdateService.java create mode 100644 src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension diff --git a/build.gradle b/build.gradle index 2e16c6b70..2a958f0b6 100644 --- a/build.gradle +++ b/build.gradle @@ -158,6 +158,8 @@ dependencies { api "org.opensearch:common-utils:${common_utils_version}@jar" api "org.opensearch.client:opensearch-rest-client:${opensearch_version}" implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" + compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}" + implementation "org.apache.commons:commons-csv:1.10.0" // Needed for integ tests zipArchive group: 'org.opensearch.plugin', name:'alerting', version: "${opensearch_build}" diff --git a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java index ccf2f44ab..33808b445 100644 --- a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java +++ b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java @@ -137,7 +137,7 @@ public Collection createComponents(Client client, mapperService = new MapperService(client, clusterService, indexNameExpressionResolver, indexTemplateManager, logTypeService); ruleIndices = new RuleIndices(logTypeService, client, clusterService, threadPool); correlationRuleIndices = new CorrelationRuleIndices(client, clusterService); - ThreatIntelFeedDataService threatIntelFeedDataService = new ThreatIntelFeedDataService(clusterService.state(), client, indexNameExpressionResolver, xContentRegistry); + ThreatIntelFeedDataService threatIntelFeedDataService = new ThreatIntelFeedDataService(clusterService.state(), clusterService, client, indexNameExpressionResolver, xContentRegistry); DetectorThreatIntelService detectorThreatIntelService = new DetectorThreatIntelService(threatIntelFeedDataService); this.client = client; diff --git a/src/main/java/org/opensearch/securityanalytics/config/monitors/opensearch_security.policy b/src/main/java/org/opensearch/securityanalytics/config/monitors/opensearch_security.policy new file mode 100644 index 000000000..c5af78398 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/config/monitors/opensearch_security.policy @@ -0,0 +1,3 @@ +grant { + permission java.lang.management.ManagementPermission "reputation.alienvault.com:443" "connect,resolve"; +}; \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java index 91d156003..351572470 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java @@ -1,39 +1,106 @@ package org.opensearch.securityanalytics.threatIntel; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.SpecialPermission; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; +import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.securityanalytics.findings.FindingsService; import org.opensearch.securityanalytics.model.ThreatIntelFeedData; +import org.opensearch.securityanalytics.threatIntel.common.DatasourceManifest; +import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; +import org.opensearch.securityanalytics.threatIntel.common.ThreatIntelSettings; +import org.opensearch.securityanalytics.threatIntel.dao.DatasourceDao; import org.opensearch.securityanalytics.util.IndexUtils; +import org.opensearch.securityanalytics.util.SecurityAnalyticsException; +import org.opensearch.securityanalytics.threatIntel.common.Constants; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.*; +import java.util.stream.Collectors; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import static org.opensearch.securityanalytics.threatIntel.jobscheduler.Datasource.THREAT_INTEL_DATA_INDEX_NAME_PREFIX; /** * Service to handle CRUD operations on Threat Intel Feed Data */ public class ThreatIntelFeedDataService { private static final Logger log = LogManager.getLogger(FindingsService.class); + private static final String SCHEMA_VERSION = "schema_version"; + private static final String IOC_TYPE = "ioc_type"; + private static final String IOC_VALUE = "ioc_value"; + private static final String FEED_ID = "feed_id"; + private static final String TIMESTAMP = "timestamp"; + private static final String TYPE = "type"; + private static final String DATA_FIELD_NAME = "_data"; + private final ClusterState state; private final Client client; private final IndexNameExpressionResolver indexNameExpressionResolver; + private static final Map INDEX_SETTING_TO_CREATE = Map.of( + "index.number_of_shards", + 1, + "index.number_of_replicas", + 0, + "index.refresh_interval", + -1, + "index.hidden", + true + ); + private static final Map INDEX_SETTING_TO_FREEZE = Map.of( + "index.auto_expand_replicas", + "0-all", + "index.blocks.write", + true + ); + private final ClusterService clusterService; + private final ClusterSettings clusterSettings; + public ThreatIntelFeedDataService( ClusterState state, + ClusterService clusterService, Client client, IndexNameExpressionResolver indexNameExpressionResolver, NamedXContentRegistry xContentRegistry) { @@ -41,6 +108,8 @@ public ThreatIntelFeedDataService( this.client = client; this.indexNameExpressionResolver = indexNameExpressionResolver; this.xContentRegistry = xContentRegistry; + this.clusterService = clusterService; + this.clusterSettings = clusterService.getClusterSettings(); } private final NamedXContentRegistry xContentRegistry; @@ -52,7 +121,7 @@ public void getThreatIntelFeedData( String tifdIndex = IndexUtils.getNewIndexByCreationDate( this.state, this.indexNameExpressionResolver, - ".opendsearch-sap-threatintel*" + ".opensearch-sap-threatintel*" //name? ); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("ioc_type", iocType))); @@ -87,4 +156,175 @@ private List getTifdList(SearchResponse searchResponse) { } return list; } + + /** + * Create an index for a threat intel feed + * + * Index setting start with single shard, zero replica, no refresh interval, and hidden. + * Once the threat intel feed is indexed, do refresh and force merge. + * Then, change the index setting to expand replica to all nodes, and read only allow delete. + * + * @param indexName index name + */ + public void createIndexIfNotExists(final String indexName) { + if (clusterService.state().metadata().hasIndex(indexName) == true) { + return; + } + final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(INDEX_SETTING_TO_CREATE) + .mapping(getIndexMapping()); + StashedThreadContext.run( + client, + () -> client.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT)) + ); + } + + private void freezeIndex(final String indexName) { + TimeValue timeout = clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT); + StashedThreadContext.run(client, () -> { + client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout); + client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout); + client.admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings(INDEX_SETTING_TO_FREEZE) + .execute() + .actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT)); + }); + } + + private String getIndexMapping() { + try { + try (InputStream is = DatasourceDao.class.getResourceAsStream("/mappings/threat_intel_feed_mapping.json")) { // TODO: check Datasource dao and this mapping + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + return reader.lines().map(String::trim).collect(Collectors.joining()); + } + } + } catch (IOException e) { + log.error("Runtime exception when getting the threat intel index mapping", e); + throw new SecurityAnalyticsException("Runtime exception when getting the threat intel index mapping", RestStatus.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Create CSVParser of a threat intel feed + * + * @param manifest Datasource manifest + * @return CSVParser for threat intel feed + */ + @SuppressForbidden(reason = "Need to connect to http endpoint to read threat intel feed database file") + public CSVParser getDatabaseReader(final DatasourceManifest manifest) { + SpecialPermission.check(); + return AccessController.doPrivileged((PrivilegedAction) () -> { + try { + URL url = new URL(manifest.getUrl()); + return internalGetDatabaseReader(manifest, url.openConnection()); + } catch (IOException e) { + log.error("Exception: failed to read threat intel feed data from {}",manifest.getUrl(), e); + throw new OpenSearchException("failed to read threat intel feed data from {}", manifest.getUrl(), e); + } + }); + } + + @SuppressForbidden(reason = "Need to connect to http endpoint to read threat intel feed database file") // TODO: update this function because no zip file... + protected CSVParser internalGetDatabaseReader(final DatasourceManifest manifest, final URLConnection connection) throws IOException { + connection.addRequestProperty(Constants.USER_AGENT_KEY, Constants.USER_AGENT_VALUE); + ZipInputStream zipIn = new ZipInputStream(connection.getInputStream()); + ZipEntry zipEntry = zipIn.getNextEntry(); + while (zipEntry != null) { + if (zipEntry.getName().equalsIgnoreCase(manifest.getDbName()) == false) { + zipEntry = zipIn.getNextEntry(); + continue; + } + return new CSVParser(new BufferedReader(new InputStreamReader(zipIn)), CSVFormat.RFC4180); + } + throw new IllegalArgumentException( + String.format(Locale.ROOT, "database file [%s] does not exist in the zip file [%s]", manifest.getDbName(), manifest.getUrl()) + ); + } + + /** + * Puts threat intel feed from CSVRecord iterator into a given index in bulk + * + * @param indexName Index name to puts the TIF data + * @param fields Field name matching with data in CSVRecord in order + * @param iterator TIF data to insert + * @param renewLock Runnable to renew lock + */ + public void saveThreatIntelFeedData( + final String indexName, + final String[] fields, + final Iterator iterator, + final Runnable renewLock +// final ThreatIntelFeedData threatIntelFeedData + ) throws IOException { + if (indexName == null || fields == null || iterator == null || renewLock == null){ + throw new IllegalArgumentException("Fields cannot be null"); + } + + TimeValue timeout = clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT); + Integer batchSize = clusterSettings.get(ThreatIntelSettings.BATCH_SIZE); + final BulkRequest bulkRequest = new BulkRequest(); + Queue requests = new LinkedList<>(); + for (int i = 0; i < batchSize; i++) { + requests.add(Requests.indexRequest(indexName)); + } + while (iterator.hasNext()) { + CSVRecord record = iterator.next(); +// XContentBuilder tifData = threatIntelFeedData.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS); + IndexRequest indexRequest = (IndexRequest) requests.poll(); +// indexRequest.source(tifData); + indexRequest.id(record.get(0)); + bulkRequest.add(indexRequest); + if (iterator.hasNext() == false || bulkRequest.requests().size() == batchSize) { + BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout)); + if (response.hasFailures()) { + throw new OpenSearchException( + "error occurred while ingesting threat intel feed data in {} with an error {}", + indexName, + response.buildFailureMessage() + ); + } + requests.addAll(bulkRequest.requests()); + bulkRequest.requests().clear(); + } + renewLock.run(); + } + freezeIndex(indexName); + } + + public void deleteThreatIntelDataIndex(final String index) { + deleteThreatIntelDataIndex(Arrays.asList(index)); + } + + public void deleteThreatIntelDataIndex(final List indices) { + if (indices == null || indices.isEmpty()) { + return; + } + + Optional invalidIndex = indices.stream() + .filter(index -> index.startsWith(THREAT_INTEL_DATA_INDEX_NAME_PREFIX) == false) + .findAny(); + if (invalidIndex.isPresent()) { + throw new OpenSearchException( + "the index[{}] is not threat intel data index which should start with {}", + invalidIndex.get(), + THREAT_INTEL_DATA_INDEX_NAME_PREFIX + ); + } + + AcknowledgedResponse response = StashedThreadContext.run( + client, + () -> client.admin() + .indices() + .prepareDelete(indices.toArray(new String[0])) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) + .execute() + .actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT)) + ); + + if (response.isAcknowledged() == false) { + throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", indices)); + } + } + } diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/common/Constants.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/common/Constants.java new file mode 100644 index 000000000..af31e7897 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/common/Constants.java @@ -0,0 +1,9 @@ +package org.opensearch.securityanalytics.threatIntel.common; + +import org.opensearch.Version; + +import java.util.Locale; +public class Constants { + public static final String USER_AGENT_KEY = "User-Agent"; + public static final String USER_AGENT_VALUE = String.format(Locale.ROOT, "OpenSearch/%s vanilla", Version.CURRENT.toString()); +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceAction.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceAction.java new file mode 100644 index 000000000..35effc4b7 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceAction.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.master.AcknowledgedResponse; + +/** + * Threat intel datasource delete action + */ +public class DeleteDatasourceAction extends ActionType { + /** + * Delete datasource action instance + */ + public static final DeleteDatasourceAction INSTANCE = new DeleteDatasourceAction(); + /** + * Delete datasource action name + */ + public static final String NAME = "cluster:admin/security_analytics/datasource/delete"; + + private DeleteDatasourceAction() { + super(NAME, AcknowledgedResponse::new); + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceRequest.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceRequest.java new file mode 100644 index 000000000..654b93985 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceRequest.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.securityanalytics.threatIntel.common.ParameterValidator; + +import java.io.IOException; + +/** + * Threat intel datasource delete request + */ + +public class DeleteDatasourceRequest extends ActionRequest { + private static final ParameterValidator VALIDATOR = new ParameterValidator(); + /** + * @param name the datasource name + * @return the datasource name + */ + private String name; + + /** + * Constructor + * + * @param in the stream input + * @throws IOException IOException + */ + public DeleteDatasourceRequest(final StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + } + + public DeleteDatasourceRequest(final String name) { + this.name = name; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException errors = null; + if (VALIDATOR.validateDatasourceName(name).isEmpty() == false) { + errors = new ActionRequestValidationException(); + errors.addValidationError("no such datasource exist"); + } + return errors; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + } + + public String getName() { + return name; + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceTransportAction.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceTransportAction.java new file mode 100644 index 000000000..5ff65a945 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceTransportAction.java @@ -0,0 +1,152 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchStatusException; +import org.opensearch.ResourceNotFoundException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; + +import org.opensearch.ingest.IngestService; +import org.opensearch.securityanalytics.model.DetectorTrigger; +import org.opensearch.securityanalytics.threatIntel.common.DatasourceState; +import org.opensearch.securityanalytics.threatIntel.common.ThreatIntelLockService; +import org.opensearch.securityanalytics.threatIntel.dao.DatasourceDao; +import org.opensearch.securityanalytics.threatIntel.ThreatIntelFeedDataService; +import org.opensearch.securityanalytics.threatIntel.jobscheduler.Datasource; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +/** + * Transport action to delete datasource + */ +public class DeleteDatasourceTransportAction extends HandledTransportAction { + private static final Logger log = LogManager.getLogger(DetectorTrigger.class); + + private static final long LOCK_DURATION_IN_SECONDS = 300l; + private final ThreatIntelLockService lockService; + private final IngestService ingestService; + private final DatasourceDao datasourceDao; + private final ThreatIntelFeedDataService threatIntelFeedDataService; +// private final Ip2GeoProcessorDao ip2GeoProcessorDao; + private final ThreadPool threadPool; + + /** + * Constructor + * @param transportService the transport service + * @param actionFilters the action filters + * @param lockService the lock service + * @param ingestService the ingest service + * @param datasourceDao the datasource facade + */ + @Inject + public DeleteDatasourceTransportAction( + final TransportService transportService, + final ActionFilters actionFilters, + final ThreatIntelLockService lockService, + final IngestService ingestService, + final DatasourceDao datasourceDao, + final ThreatIntelFeedDataService threatIntelFeedDataService, +// final Ip2GeoProcessorDao ip2GeoProcessorDao, + final ThreadPool threadPool + ) { + super(DeleteDatasourceAction.NAME, transportService, actionFilters, DeleteDatasourceRequest::new); + this.lockService = lockService; + this.ingestService = ingestService; + this.datasourceDao = datasourceDao; + this.threatIntelFeedDataService = threatIntelFeedDataService; +// this.ip2GeoProcessorDao = ip2GeoProcessorDao; + this.threadPool = threadPool; + } + + /** + * We delete datasource regardless of its state as long as we can acquire a lock + * + * @param task the task + * @param request the request + * @param listener the listener + */ + @Override + protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener listener) { + lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { + if (lock == null) { + listener.onFailure( + new OpenSearchStatusException("Another processor is holding a lock on the resource. Try again later", RestStatus.BAD_REQUEST) + ); + log.error("Another processor is holding lock, BAD_REQUEST exception", RestStatus.BAD_REQUEST); + + return; + } + try { + // TODO: makes every sub-methods as async call to avoid using a thread in generic pool + threadPool.generic().submit(() -> { + try { + deleteDatasource(request.getName()); + lockService.releaseLock(lock); + listener.onResponse(new AcknowledgedResponse(true)); + } catch (Exception e) { + lockService.releaseLock(lock); + listener.onFailure(e); + log.error("delete data source failed",e); + } + }); + } catch (Exception e) { + lockService.releaseLock(lock); + listener.onFailure(e); + log.error("Internal server error", e); + } + }, exception -> { listener.onFailure(exception); })); + } + + protected void deleteDatasource(final String datasourceName) throws IOException { + Datasource datasource = datasourceDao.getDatasource(datasourceName); + if (datasource == null) { + throw new ResourceNotFoundException("no such datasource exist"); + } + DatasourceState previousState = datasource.getState(); +// setDatasourceStateAsDeleting(datasource); + + try { + threatIntelFeedDataService.deleteThreatIntelDataIndex(datasource.getIndices()); + } catch (Exception e) { + if (previousState.equals(datasource.getState()) == false) { + datasource.setState(previousState); + datasourceDao.updateDatasource(datasource); + } + throw e; + } + datasourceDao.deleteDatasource(datasource); + } + +// private void setDatasourceStateAsDeleting(final Datasource datasource) { +// if (datasourceDao.getProcessors(datasource.getName()).isEmpty() == false) { +// throw new OpenSearchStatusException("datasource is being used by one of processors", RestStatus.BAD_REQUEST); +// } +// +// DatasourceState previousState = datasource.getState(); +// datasource.setState(DatasourceState.DELETING); +// datasourceDao.updateDatasource(datasource); +// +// // Check again as processor might just have been created. +// // If it fails to update the state back to the previous state, the new processor +// // will fail to convert an ip to a geo data. +// // In such case, user have to delete the processor and delete this datasource again. +// if (datasourceDao.getProcessors(datasource.getName()).isEmpty() == false) { +// datasource.setState(previousState); +// datasourceDao.updateDatasource(datasource); +// throw new OpenSearchStatusException("datasource is being used by one of processors", RestStatus.BAD_REQUEST); +// } +// } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceAction.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceAction.java new file mode 100644 index 000000000..6befdde04 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceAction.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.opensearch.action.ActionType; + +/** + * Threat intel datasource get action + */ +public class GetDatasourceAction extends ActionType { + /** + * Get datasource action instance + */ + public static final GetDatasourceAction INSTANCE = new GetDatasourceAction(); + /** + * Get datasource action name + */ + public static final String NAME = "cluster:admin/security_analytics/datasource/get"; + + private GetDatasourceAction() { + super(NAME, GetDatasourceResponse::new); + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceRequest.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceRequest.java new file mode 100644 index 000000000..16f36b08e --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceRequest.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * threat intel datasource get request + */ +public class GetDatasourceRequest extends ActionRequest { + /** + * @param names the datasource names + * @return the datasource names + */ + private String[] names; + + /** + * Constructs a new get datasource request with a list of datasources. + * + * If the list of datasources is empty or it contains a single element "_all", all registered datasources + * are returned. + * + * @param names list of datasource names + */ + public GetDatasourceRequest(final String[] names) { + this.names = names; + } + + /** + * Constructor with stream input + * @param in the stream input + * @throws IOException IOException + */ + public GetDatasourceRequest(final StreamInput in) throws IOException { + super(in); + this.names = in.readStringArray(); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException errors = null; + if (names == null) { + errors = new ActionRequestValidationException(); + errors.addValidationError("names should not be null"); + } + return errors; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(names); + } + + public String[] getNames() { + return this.names; + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceResponse.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceResponse.java new file mode 100644 index 000000000..d404ad728 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceResponse.java @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.opensearch.core.ParseField; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.securityanalytics.threatIntel.jobscheduler.Datasource; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; + +/** + * threat intel datasource get request + */ +public class GetDatasourceResponse extends ActionResponse implements ToXContentObject { + private static final ParseField FIELD_NAME_DATASOURCES = new ParseField("datasources"); + private static final ParseField FIELD_NAME_NAME = new ParseField("name"); + private static final ParseField FIELD_NAME_STATE = new ParseField("state"); + private static final ParseField FIELD_NAME_ENDPOINT = new ParseField("endpoint"); + private static final ParseField FIELD_NAME_UPDATE_INTERVAL = new ParseField("update_interval_in_days"); + private static final ParseField FIELD_NAME_NEXT_UPDATE_AT = new ParseField("next_update_at_in_epoch_millis"); + private static final ParseField FIELD_NAME_NEXT_UPDATE_AT_READABLE = new ParseField("next_update_at"); + private static final ParseField FIELD_NAME_DATABASE = new ParseField("database"); + private static final ParseField FIELD_NAME_UPDATE_STATS = new ParseField("update_stats"); + private List datasources; + + /** + * Default constructor + * + * @param datasources List of datasources + */ + public GetDatasourceResponse(final List datasources) { + this.datasources = datasources; + } + + /** + * Constructor with StreamInput + * + * @param in the stream input + */ + public GetDatasourceResponse(final StreamInput in) throws IOException { + datasources = in.readList(Datasource::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeList(datasources); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + builder.startArray(FIELD_NAME_DATASOURCES.getPreferredName()); + for (Datasource datasource : datasources) { + builder.startObject(); + builder.field(FIELD_NAME_NAME.getPreferredName(), datasource.getName()); + builder.field(FIELD_NAME_STATE.getPreferredName(), datasource.getState()); + builder.field(FIELD_NAME_ENDPOINT.getPreferredName(), datasource.getEndpoint()); + builder.field(FIELD_NAME_UPDATE_INTERVAL.getPreferredName(), datasource.getSchedule()); //TODO + builder.timeField( + FIELD_NAME_NEXT_UPDATE_AT.getPreferredName(), + FIELD_NAME_NEXT_UPDATE_AT_READABLE.getPreferredName(), + datasource.getSchedule().getNextExecutionTime(Instant.now()).toEpochMilli() + ); + builder.field(FIELD_NAME_DATABASE.getPreferredName(), datasource.getDatabase()); + builder.field(FIELD_NAME_UPDATE_STATS.getPreferredName(), datasource.getUpdateStats()); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + return builder; + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceTransportAction.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceTransportAction.java new file mode 100644 index 000000000..cb1419517 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/GetDatasourceTransportAction.java @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.securityanalytics.threatIntel.dao.DatasourceDao; +import org.opensearch.securityanalytics.threatIntel.jobscheduler.Datasource; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +import java.util.Collections; +import java.util.List; + +/** + * Transport action to get datasource + */ +public class GetDatasourceTransportAction extends HandledTransportAction { + private final DatasourceDao datasourceDao; + + /** + * Default constructor + * @param transportService the transport service + * @param actionFilters the action filters + * @param datasourceDao the datasource facade + */ + @Inject + public GetDatasourceTransportAction( + final TransportService transportService, + final ActionFilters actionFilters, + final DatasourceDao datasourceDao + ) { + super(GetDatasourceAction.NAME, transportService, actionFilters, GetDatasourceRequest::new); + this.datasourceDao = datasourceDao; + } + + @Override + protected void doExecute(final Task task, final GetDatasourceRequest request, final ActionListener listener) { + if (shouldGetAllDatasource(request)) { + // We don't expect too many data sources. Therefore, querying all data sources without pagination should be fine. + datasourceDao.getAllDatasources(newActionListener(listener)); + } else { + datasourceDao.getDatasources(request.getNames(), newActionListener(listener)); + } + } + + private boolean shouldGetAllDatasource(final GetDatasourceRequest request) { + if (request.getNames() == null) { + throw new OpenSearchException("names in a request should not be null"); + } + + return request.getNames().length == 0 || (request.getNames().length == 1 && "_all".equals(request.getNames()[0])); + } + + protected ActionListener> newActionListener(final ActionListener listener) { + return new ActionListener<>() { + @Override + public void onResponse(final List datasources) { + listener.onResponse(new GetDatasourceResponse(datasources)); + } + + @Override + public void onFailure(final Exception e) { + if (e instanceof IndexNotFoundException) { + listener.onResponse(new GetDatasourceResponse(Collections.emptyList())); + return; + } + listener.onFailure(e); + } + }; + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/PutDatasourceAction.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/PutDatasourceAction.java new file mode 100644 index 000000000..6a6acb9ed --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/PutDatasourceAction.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.master.AcknowledgedResponse; + +/** + * Threat intel datasource creation action + */ +public class PutDatasourceAction extends ActionType { + /** + * Put datasource action instance + */ + public static final PutDatasourceAction INSTANCE = new PutDatasourceAction(); + /** + * Put datasource action name + */ + public static final String NAME = "cluster:admin/security_analytics/datasource/put"; + + private PutDatasourceAction() { + super(NAME, AcknowledgedResponse::new); + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/PutDatasourceRequest.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/PutDatasourceRequest.java new file mode 100644 index 000000000..dac67ed43 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/PutDatasourceRequest.java @@ -0,0 +1,267 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.List; +import java.util.Locale; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.ParseField; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ObjectParser; +import org.opensearch.securityanalytics.model.DetectorTrigger; +import org.opensearch.securityanalytics.threatIntel.common.DatasourceManifest; +import org.opensearch.securityanalytics.threatIntel.common.ParameterValidator; + +/** + * Threat intel datasource creation request + */ +public class PutDatasourceRequest extends ActionRequest { + private static final Logger log = LogManager.getLogger(DetectorTrigger.class); + + public static final ParseField FEED_FORMAT_FIELD = new ParseField("feed_format"); + public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); + public static final ParseField FEED_NAME_FIELD = new ParseField("feed_name"); + public static final ParseField DESCRIPTION_FIELD = new ParseField("description"); + public static final ParseField ORGANIZATION_FIELD = new ParseField("organization"); + public static final ParseField CONTAINED_IOCS_FIELD = new ParseField("contained_iocs_field"); + public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days"); + private static final ParameterValidator VALIDATOR = new ParameterValidator(); + + /** + * @param name the datasource name + * @return the datasource name + */ + private String name; + + private String feedFormat; + + /** + * @param endpoint url to a manifest file for a datasource + * @return url to a manifest file for a datasource + */ + private String endpoint; + + private String feedName; + + private String description; + + private String organization; + + private List contained_iocs_field; + + public void setFeedFormat(String feedFormat) { + this.feedFormat = feedFormat; + } + + public void setThisEndpoint(String endpoint) { + this.endpoint = endpoint; + } + + public void setFeedName(String feedName) { + this.feedName = feedName; + } + + public void setDescription(String description) { + this.description = description; + } + + public void setOrganization(String organization) { + this.organization = organization; + } + + public void setContained_iocs_field(List contained_iocs_field) { + this.contained_iocs_field = contained_iocs_field; + } + + public List getContained_iocs_field() { + return contained_iocs_field; + } + + public String getFeedFormat() { + return feedFormat; + } + + public String getFeedName() { + return feedName; + } + + @Override + public String getDescription() { + return description; + } + + public String getOrganization() { + return organization; + } + /** + * @param updateInterval update interval of a datasource + * @return update interval of a datasource + */ + private TimeValue updateInterval; + + /** + * Parser of a datasource + */ + public static final ObjectParser PARSER; + static { + PARSER = new ObjectParser<>("put_datasource"); + PARSER.declareString((request, val) -> request.setFeedFormat(val), FEED_FORMAT_FIELD); + PARSER.declareString((request, val) -> request.setThisEndpoint(val), ENDPOINT_FIELD); + PARSER.declareString((request, val) -> request.setFeedName(val), FEED_NAME_FIELD); + PARSER.declareString((request, val) -> request.setDescription(val), DESCRIPTION_FIELD); + PARSER.declareString((request, val) -> request.setOrganization(val), ORGANIZATION_FIELD); +// PARSER.declareStringArray((request, val[]) -> request.setContained_iocs_field(val), CONTAINED_IOCS_FIELD); + PARSER.declareLong((request, val) -> request.setUpdateInterval(TimeValue.timeValueDays(val)), UPDATE_INTERVAL_IN_DAYS_FIELD); + } + + /** + * Default constructor + * @param name name of a datasource + */ + public PutDatasourceRequest(final String name) { + this.name = name; + } + + /** + * Constructor with stream input + * @param in the stream input + * @throws IOException IOException + */ + public PutDatasourceRequest(final StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + this.feedFormat = in.readString(); + this.endpoint = in.readString(); + this.feedName = in.readString(); + this.description = in.readString(); + this.organization = in.readString(); + this.contained_iocs_field = in.readStringList(); + this.updateInterval = in.readTimeValue(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + out.writeString(feedFormat); + out.writeString(endpoint); + out.writeString(feedName); + out.writeString(description); + out.writeString(organization); + out.writeStringCollection(contained_iocs_field); + out.writeTimeValue(updateInterval); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException errors = new ActionRequestValidationException(); + List errorMsgs = VALIDATOR.validateDatasourceName(name); + if (errorMsgs.isEmpty() == false) { + errorMsgs.stream().forEach(msg -> errors.addValidationError(msg)); + } + validateEndpoint(errors); + validateUpdateInterval(errors); + return errors.validationErrors().isEmpty() ? null : errors; + } + + /** + * Conduct following validation on endpoint + * 1. endpoint format complies with RFC-2396 + * 2. validate manifest file from the endpoint + * + * @param errors the errors to add error messages + */ + private void validateEndpoint(final ActionRequestValidationException errors) { + try { + URL url = new URL(endpoint); + url.toURI(); // Validate URL complies with RFC-2396 + validateManifestFile(url, errors); + } catch (MalformedURLException | URISyntaxException e) { + log.info("Invalid URL[{}] is provided", endpoint, e); + errors.addValidationError("Invalid URL format is provided"); + } + } + + /** + * Conduct following validation on url + * 1. can read manifest file from the endpoint + * 2. the url in the manifest file complies with RFC-2396 + * 3. updateInterval is less than validForInDays value in the manifest file + * + * @param url the url to validate + * @param errors the errors to add error messages + */ + private void validateManifestFile(final URL url, final ActionRequestValidationException errors) { + DatasourceManifest manifest; + try { + manifest = DatasourceManifest.Builder.build(url); + } catch (Exception e) { + log.info("Error occurred while reading a file from {}", url, e); + errors.addValidationError(String.format(Locale.ROOT, "Error occurred while reading a file from %s: %s", url, e.getMessage())); + return; + } + + try { + new URL(manifest.getUrl()).toURI(); // Validate URL complies with RFC-2396 + } catch (MalformedURLException | URISyntaxException e) { + log.info("Invalid URL[{}] is provided for url field in the manifest file", manifest.getUrl(), e); + errors.addValidationError("Invalid URL format is provided for url field in the manifest file"); + return; + } + +// if (manifest.getValidForInDays() != null && updateInterval.days() >= manifest.getValidForInDays()) { +// errors.addValidationError( +// String.format( +// Locale.ROOT, +// "updateInterval %d should be smaller than %d", +// updateInterval.days(), +// manifest.getValidForInDays() +// ) +// ); +// } + } + + /** + * Validate updateInterval is equal or larger than 1 + * + * @param errors the errors to add error messages + */ + private void validateUpdateInterval(final ActionRequestValidationException errors) { + if (updateInterval.compareTo(TimeValue.timeValueDays(1)) < 0) { + errors.addValidationError("Update interval should be equal to or larger than 1 day"); + } + } + + public String getName() { + return name; + } + + public String getEndpoint() { + return this.endpoint; + } + + public void setEndpoint(String newEndpoint) { + this.endpoint = newEndpoint; + } + + public TimeValue getUpdateInterval() { + return this.updateInterval; + } + + public void setUpdateInterval(TimeValue timeValue) { + this.updateInterval = timeValue; + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/PutDatasourceTransportAction.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/PutDatasourceTransportAction.java new file mode 100644 index 000000000..f1f87c4c5 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/PutDatasourceTransportAction.java @@ -0,0 +1,182 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import static org.opensearch.securityanalytics.threatIntel.common.ThreatIntelLockService.LOCK_DURATION_IN_SECONDS; + +import java.time.Instant; +import java.util.ConcurrentModificationException; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.action.StepListener; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; + +import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.jobscheduler.spi.LockModel; +import org.opensearch.securityanalytics.model.DetectorTrigger; +import org.opensearch.securityanalytics.threatIntel.common.DatasourceState; +import org.opensearch.securityanalytics.threatIntel.common.ThreatIntelLockService; +import org.opensearch.securityanalytics.threatIntel.dao.DatasourceDao; +import org.opensearch.securityanalytics.threatIntel.jobscheduler.Datasource; +import org.opensearch.securityanalytics.threatIntel.jobscheduler.DatasourceUpdateService; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +/** + * Transport action to create datasource + */ +public class PutDatasourceTransportAction extends HandledTransportAction { + private static final Logger log = LogManager.getLogger(DetectorTrigger.class); + + private final ThreadPool threadPool; + private final DatasourceDao datasourceDao; + private final DatasourceUpdateService datasourceUpdateService; + private final ThreatIntelLockService lockService; + + /** + * Default constructor + * @param transportService the transport service + * @param actionFilters the action filters + * @param threadPool the thread pool + * @param datasourceDao the datasource facade + * @param datasourceUpdateService the datasource update service + * @param lockService the lock service + */ + @Inject + public PutDatasourceTransportAction( + final TransportService transportService, + final ActionFilters actionFilters, + final ThreadPool threadPool, + final DatasourceDao datasourceDao, + final DatasourceUpdateService datasourceUpdateService, + final ThreatIntelLockService lockService + ) { + super(PutDatasourceAction.NAME, transportService, actionFilters, PutDatasourceRequest::new); + this.threadPool = threadPool; + this.datasourceDao = datasourceDao; + this.datasourceUpdateService = datasourceUpdateService; + this.lockService = lockService; + } + + @Override + protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener listener) { + lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { + if (lock == null) { + listener.onFailure( + new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") + ); + log.error("another processor is a lock, BAD_REQUEST error", RestStatus.BAD_REQUEST); + return; + } + try { + internalDoExecute(request, lock, listener); + } catch (Exception e) { + lockService.releaseLock(lock); + listener.onFailure(e); + log.error("listener failed when executing", e); + } + }, exception -> { + listener.onFailure(exception); + log.error("execution failed", exception); + })); + } + + /** + * This method takes lock as a parameter and is responsible for releasing lock + * unless exception is thrown + */ + protected void internalDoExecute( + final PutDatasourceRequest request, + final LockModel lock, + final ActionListener listener + ) { + StepListener createIndexStep = new StepListener<>(); + datasourceDao.createIndexIfNotExists(createIndexStep); + createIndexStep.whenComplete(v -> { + Datasource datasource = Datasource.Builder.build(request); + datasourceDao.putDatasource(datasource, getIndexResponseListener(datasource, lock, listener)); + }, exception -> { + lockService.releaseLock(lock); + log.error("failed to release lock", exception); + listener.onFailure(exception); + }); + } + + /** + * This method takes lock as a parameter and is responsible for releasing lock + * unless exception is thrown + */ + protected ActionListener getIndexResponseListener( + final Datasource datasource, + final LockModel lock, + final ActionListener listener + ) { + return new ActionListener<>() { + @Override + public void onResponse(final IndexResponse indexResponse) { + // This is user initiated request. Therefore, we want to handle the first datasource update task in a generic thread + // pool. + threadPool.generic().submit(() -> { + AtomicReference lockReference = new AtomicReference<>(lock); + try { + createDatasource(datasource, lockService.getRenewLockRunnable(lockReference)); + } finally { + lockService.releaseLock(lockReference.get()); + } + }); + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + public void onFailure(final Exception e) { + lockService.releaseLock(lock); + if (e instanceof VersionConflictEngineException) { + log.error("datasource already exists"); + listener.onFailure(new ResourceAlreadyExistsException("datasource [{}] already exists", datasource.getName())); + } else { + log.error("Internal server error"); + listener.onFailure(e); + } + } + }; + } + + protected void createDatasource(final Datasource datasource, final Runnable renewLock) { + if (DatasourceState.CREATING.equals(datasource.getState()) == false) { + log.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.CREATING, datasource.getState()); + markDatasourceAsCreateFailed(datasource); + return; + } + + try { + datasourceUpdateService.updateOrCreateThreatIntelFeedData(datasource, renewLock); + } catch (Exception e) { + log.error("Failed to create datasource for {}", datasource.getName(), e); + markDatasourceAsCreateFailed(datasource); + } + } + + private void markDatasourceAsCreateFailed(final Datasource datasource) { + datasource.getUpdateStats().setLastFailedAt(Instant.now()); + datasource.setState(DatasourceState.CREATE_FAILED); + try { + datasourceDao.updateDatasource(datasource); + } catch (Exception e) { + log.error("Failed to mark datasource state as CREATE_FAILED for {}", datasource.getName(), e); + } + } +} + diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/RestDeleteDatasourceHandler.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/RestDeleteDatasourceHandler.java new file mode 100644 index 000000000..3da4c4abc --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/RestDeleteDatasourceHandler.java @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +import static org.opensearch.rest.RestRequest.Method.DELETE; + +/** + * Rest handler for threat intel datasource delete request + */ +public class RestDeleteDatasourceHandler extends BaseRestHandler { + private static final String ACTION_NAME = "threatintel_datasource_delete"; + private static final String PARAMS_NAME = "name"; + + @Override + public String getName() { + return ACTION_NAME; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + final String name = request.param(PARAMS_NAME); + final DeleteDatasourceRequest deleteDatasourceRequest = new DeleteDatasourceRequest(name); + + return channel -> client.executeLocally( + DeleteDatasourceAction.INSTANCE, + deleteDatasourceRequest, + new RestToXContentListener<>(channel) + ); + } + + @Override + public List routes() { + String path = String.join("/", "/_plugins/_security_analytics", String.format(Locale.ROOT, "threatintel/datasource/{%s}", PARAMS_NAME)); + return List.of(new Route(DELETE, path)); + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/RestGetDatasourceHandler.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/RestGetDatasourceHandler.java new file mode 100644 index 000000000..ddbecdad5 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/RestGetDatasourceHandler.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.common.Strings; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Rest handler for threat intel datasource get request + */ +public class RestGetDatasourceHandler extends BaseRestHandler { + private static final String ACTION_NAME = "threatintel_datasource_get"; + + @Override + public String getName() { + return ACTION_NAME; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) { + final String[] names = request.paramAsStringArray("name", Strings.EMPTY_ARRAY); + final GetDatasourceRequest getDatasourceRequest = new GetDatasourceRequest(names); + + return channel -> client.executeLocally(GetDatasourceAction.INSTANCE, getDatasourceRequest, new RestToXContentListener<>(channel)); + } + + @Override + public List routes() { + return List.of( + new Route(GET, String.join("/", "/_plugins/_security_analytics", "threatintel/datasource")), + new Route(GET, String.join("/", "/_plugins/_security_analytics", "threatintel/datasource/{name}")) + ); + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/RestPutDatasourceHandler.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/RestPutDatasourceHandler.java new file mode 100644 index 000000000..5c9ecd7b4 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/RestPutDatasourceHandler.java @@ -0,0 +1,71 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; +import org.opensearch.securityanalytics.threatIntel.common.ThreatIntelSettings; + +import java.io.IOException; +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.PUT; + +/** + * Rest handler for threat intel datasource creation + * + * This handler handles a request of + * PUT /_plugins/security_analytics/threatintel/datasource/{id} + * { + * "endpoint": {endpoint}, + * "update_interval_in_days": 3 + * } + * + * When request is received, it will create a datasource by downloading threat intel feed from the endpoint. + * After the creation of datasource is completed, it will schedule the next update task after update_interval_in_days. + * + */ +public class RestPutDatasourceHandler extends BaseRestHandler { + private static final String ACTION_NAME = "threatintel_datasource_put"; + private final ClusterSettings clusterSettings; + + public RestPutDatasourceHandler(final ClusterSettings clusterSettings) { + this.clusterSettings = clusterSettings; + } + + @Override + public String getName() { + return ACTION_NAME; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("name")); + if (request.hasContentOrSourceParam()) { + try (XContentParser parser = request.contentOrSourceParamParser()) { + PutDatasourceRequest.PARSER.parse(parser, putDatasourceRequest, null); + } + } + if (putDatasourceRequest.getEndpoint() == null) { + putDatasourceRequest.setEndpoint(clusterSettings.get(ThreatIntelSettings.DATASOURCE_ENDPOINT)); + } + if (putDatasourceRequest.getUpdateInterval() == null) { + putDatasourceRequest.setUpdateInterval(TimeValue.timeValueDays(clusterSettings.get(ThreatIntelSettings.DATASOURCE_UPDATE_INTERVAL))); + } + return channel -> client.executeLocally(PutDatasourceAction.INSTANCE, putDatasourceRequest, new RestToXContentListener<>(channel)); + } + + @Override + public List routes() { + String path = String.join("/", "/_plugins/_security_analytics", "threatintel/datasource/{name}"); + return List.of(new Route(PUT, path)); + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/RestUpdateDatasourceHandler.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/RestUpdateDatasourceHandler.java new file mode 100644 index 000000000..3f755670f --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/RestUpdateDatasourceHandler.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.PUT; + +/** + * Rest handler for threat intel datasource update request + */ +public class RestUpdateDatasourceHandler extends BaseRestHandler { + private static final String ACTION_NAME = "threatintel_datasource_update"; + + @Override + public String getName() { + return ACTION_NAME; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + final UpdateDatasourceRequest updateDatasourceRequest = new UpdateDatasourceRequest(request.param("name")); + if (request.hasContentOrSourceParam()) { + try (XContentParser parser = request.contentOrSourceParamParser()) { + UpdateDatasourceRequest.PARSER.parse(parser, updateDatasourceRequest, null); + } + } + return channel -> client.executeLocally( + UpdateDatasourceAction.INSTANCE, + updateDatasourceRequest, + new RestToXContentListener<>(channel) + ); + } + + @Override + public List routes() { + String path = String.join("/", "/_plugins/_security_analytics", "threatintel/datasource/{name}/_settings"); + return List.of(new Route(PUT, path)); + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/UpdateDatasourceAction.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/UpdateDatasourceAction.java new file mode 100644 index 000000000..ddf2d42e6 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/UpdateDatasourceAction.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.master.AcknowledgedResponse; + +/** + * threat intel datasource update action + */ +public class UpdateDatasourceAction extends ActionType { + /** + * Update datasource action instance + */ + public static final UpdateDatasourceAction INSTANCE = new UpdateDatasourceAction(); + /** + * Update datasource action name + */ + public static final String NAME = "cluster:admin/security_analytics/datasource/update"; + + private UpdateDatasourceAction() { + super(NAME, AcknowledgedResponse::new); + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/UpdateDatasourceRequest.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/UpdateDatasourceRequest.java new file mode 100644 index 000000000..7d70f45aa --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/UpdateDatasourceRequest.java @@ -0,0 +1,190 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.ParseField; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ObjectParser; +import org.opensearch.securityanalytics.model.DetectorTrigger; +import org.opensearch.securityanalytics.threatIntel.common.DatasourceManifest; +import org.opensearch.securityanalytics.threatIntel.common.ParameterValidator; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Locale; + +/** + * threat intel datasource update request + */ +public class UpdateDatasourceRequest extends ActionRequest { + private static final Logger log = LogManager.getLogger(DetectorTrigger.class); + + public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); + public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days"); + private static final int MAX_DATASOURCE_NAME_BYTES = 255; + private static final ParameterValidator VALIDATOR = new ParameterValidator(); + + /** + * @param name the datasource name + * @return the datasource name + */ + private String name; + + /** + * @param endpoint url to a manifest file for a datasource + * @return url to a manifest file for a datasource + */ + private String endpoint; + + /** + * @param updateInterval update interval of a datasource + * @return update interval of a datasource + */ + private TimeValue updateInterval; + + /** + * Parser of a datasource + */ + public static final ObjectParser PARSER; + static { + PARSER = new ObjectParser<>("update_datasource"); + PARSER.declareString((request, val) -> request.setEndpoint(val), ENDPOINT_FIELD); + PARSER.declareLong((request, val) -> request.setUpdateInterval(TimeValue.timeValueDays(val)), UPDATE_INTERVAL_IN_DAYS_FIELD); + } + + public String getName() { + return name; + } + public String getEndpoint() { + return endpoint; + } + private void setEndpoint(String endpoint) { + this.endpoint = endpoint; + } + + public TimeValue getUpdateInterval() { + return updateInterval; + } + + private void setUpdateInterval(TimeValue updateInterval){ + this.updateInterval = updateInterval; + } + + /** + * Constructor + * @param name name of a datasource + */ + public UpdateDatasourceRequest(final String name) { + this.name = name; + } + + /** + * Constructor + * @param in the stream input + * @throws IOException IOException + */ + public UpdateDatasourceRequest(final StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + this.endpoint = in.readOptionalString(); + this.updateInterval = in.readOptionalTimeValue(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + out.writeOptionalString(endpoint); + out.writeOptionalTimeValue(updateInterval); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException errors = new ActionRequestValidationException(); + if (VALIDATOR.validateDatasourceName(name).isEmpty() == false) { + errors.addValidationError("no such datasource exist"); + } + if (endpoint == null && updateInterval == null) { + errors.addValidationError("no values to update"); + } + + validateEndpoint(errors); + validateUpdateInterval(errors); + + return errors.validationErrors().isEmpty() ? null : errors; + } + + /** + * Conduct following validation on endpoint + * 1. endpoint format complies with RFC-2396 + * 2. validate manifest file from the endpoint + * + * @param errors the errors to add error messages + */ + private void validateEndpoint(final ActionRequestValidationException errors) { + if (endpoint == null) { + return; + } + + try { + URL url = new URL(endpoint); + url.toURI(); // Validate URL complies with RFC-2396 + validateManifestFile(url, errors); + } catch (MalformedURLException | URISyntaxException e) { + log.info("Invalid URL[{}] is provided", endpoint, e); + errors.addValidationError("Invalid URL format is provided"); + } + } + + /** + * Conduct following validation on url + * 1. can read manifest file from the endpoint + * 2. the url in the manifest file complies with RFC-2396 + * + * @param url the url to validate + * @param errors the errors to add error messages + */ + private void validateManifestFile(final URL url, final ActionRequestValidationException errors) { + DatasourceManifest manifest; + try { + manifest = DatasourceManifest.Builder.build(url); + } catch (Exception e) { + log.info("Error occurred while reading a file from {}", url, e); + errors.addValidationError(String.format(Locale.ROOT, "Error occurred while reading a file from %s: %s", url, e.getMessage())); + return; + } + + try { + new URL(manifest.getUrl()).toURI(); // Validate URL complies with RFC-2396 + } catch (MalformedURLException | URISyntaxException e) { + log.info("Invalid URL[{}] is provided for url field in the manifest file", manifest.getUrl(), e); + errors.addValidationError("Invalid URL format is provided for url field in the manifest file"); + } + } + + /** + * Validate updateInterval is equal or larger than 1 + * + * @param errors the errors to add error messages + */ + private void validateUpdateInterval(final ActionRequestValidationException errors) { + if (updateInterval == null) { + return; + } + + if (updateInterval.compareTo(TimeValue.timeValueDays(1)) < 0) { + errors.addValidationError("Update interval should be equal to or larger than 1 day"); + } + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/UpdateDatasourceTransportAction.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/UpdateDatasourceTransportAction.java new file mode 100644 index 000000000..11d99e41c --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/UpdateDatasourceTransportAction.java @@ -0,0 +1,179 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.action; + +import org.opensearch.OpenSearchStatusException; +import org.opensearch.ResourceNotFoundException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.securityanalytics.threatIntel.common.DatasourceState; +import org.opensearch.securityanalytics.threatIntel.common.ThreatIntelLockService; +import org.opensearch.securityanalytics.threatIntel.dao.DatasourceDao; +import org.opensearch.securityanalytics.threatIntel.jobscheduler.Datasource; +import org.opensearch.securityanalytics.threatIntel.jobscheduler.DatasourceTask; +import org.opensearch.securityanalytics.threatIntel.jobscheduler.DatasourceUpdateService; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Locale; + +/** + * Transport action to update datasource + */ +public class UpdateDatasourceTransportAction extends HandledTransportAction { + private static final long LOCK_DURATION_IN_SECONDS = 300l; + private final ThreatIntelLockService lockService; + private final DatasourceDao datasourceDao; + private final DatasourceUpdateService datasourceUpdateService; + private final ThreadPool threadPool; + + /** + * Constructor + * + * @param transportService the transport service + * @param actionFilters the action filters + * @param lockService the lock service + * @param datasourceDao the datasource facade + * @param datasourceUpdateService the datasource update service + */ + @Inject + public UpdateDatasourceTransportAction( + final TransportService transportService, + final ActionFilters actionFilters, + final ThreatIntelLockService lockService, + final DatasourceDao datasourceDao, + final DatasourceUpdateService datasourceUpdateService, + final ThreadPool threadPool + ) { + super(UpdateDatasourceAction.NAME, transportService, actionFilters, UpdateDatasourceRequest::new); + this.lockService = lockService; + this.datasourceUpdateService = datasourceUpdateService; + this.datasourceDao = datasourceDao; + this.threadPool = threadPool; + } + + /** + * Get a lock and update datasource + * + * @param task the task + * @param request the request + * @param listener the listener + */ + @Override + protected void doExecute(final Task task, final UpdateDatasourceRequest request, final ActionListener listener) { + lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { + if (lock == null) { + listener.onFailure( + new OpenSearchStatusException("Another processor is holding a lock on the resource. Try again later", RestStatus.BAD_REQUEST) + ); + return; + } + try { + // TODO: makes every sub-methods as async call to avoid using a thread in generic pool + threadPool.generic().submit(() -> { + try { + Datasource datasource = datasourceDao.getDatasource(request.getName()); + if (datasource == null) { + throw new ResourceNotFoundException("no such datasource exist"); + } + if (DatasourceState.AVAILABLE.equals(datasource.getState()) == false) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "data source is not in an [%s] state", DatasourceState.AVAILABLE) + ); + } + validate(request, datasource); + updateIfChanged(request, datasource); + lockService.releaseLock(lock); + listener.onResponse(new AcknowledgedResponse(true)); + } catch (Exception e) { + lockService.releaseLock(lock); + listener.onFailure(e); + } + }); + } catch (Exception e) { + lockService.releaseLock(lock); + listener.onFailure(e); + } + }, exception -> listener.onFailure(exception))); + } + + private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) { + boolean isChanged = false; + if (isEndpointChanged(request, datasource)) { + datasource.setEndpoint(request.getEndpoint()); + isChanged = true; + } + if (isUpdateIntervalChanged(request)) { + datasource.setSchedule(new IntervalSchedule(Instant.now(), (int) request.getUpdateInterval().getDays(), ChronoUnit.DAYS)); + datasource.setTask(DatasourceTask.ALL); + isChanged = true; + } + + if (isChanged) { + datasourceDao.updateDatasource(datasource); + } + } + + /** + * Additional validation based on an existing datasource + * + * Basic validation is done in UpdateDatasourceRequest#validate + * In this method we do additional validation based on an existing datasource + * + * 1. Check the compatibility of new fields and old fields + * 2. Check the updateInterval is less than validForInDays in datasource + * + * This method throws exception if one of validation fails. + * + * @param request the update request + * @param datasource the existing datasource + * @throws IOException the exception + */ + private void validate(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException { + validateFieldsCompatibility(request, datasource); + } + + private void validateFieldsCompatibility(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException { + if (isEndpointChanged(request, datasource) == false) { + return; + } + + List fields = datasourceUpdateService.getHeaderFields(request.getEndpoint()); + if (datasource.isCompatible(fields) == false) { +// throw new IncompatibleDatasourceException( +// "new fields [{}] does not contain all old fields [{}]", +// fields.toString(), +// datasource.getDatabase().getFields().toString() +// ); + throw new OpenSearchStatusException("new fields does not contain all old fields", RestStatus.BAD_REQUEST); + } + } + + private boolean isEndpointChanged(final UpdateDatasourceRequest request, final Datasource datasource) { + return request.getEndpoint() != null && request.getEndpoint().equals(datasource.getEndpoint()) == false; + } + + /** + * Update interval is changed as long as user provide one because + * start time will get updated even if the update interval is same as current one. + * + * @param request the update datasource request + * @return true if update interval is changed, and false otherwise + */ + private boolean isUpdateIntervalChanged(final UpdateDatasourceRequest request) { + return request.getUpdateInterval() != null; + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/common/DatasourceManifest.java b/src/main/java/org/opensearch/securityanalytics/threatintel/common/DatasourceManifest.java new file mode 100644 index 000000000..1417c8a36 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/common/DatasourceManifest.java @@ -0,0 +1,168 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.securityanalytics.threatIntel.common; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.net.URLConnection; +import java.nio.CharBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Locale; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.SpecialPermission; +import org.opensearch.Version; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.ParseField; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ConstructingObjectParser; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.securityanalytics.model.DetectorTrigger; +import org.opensearch.securityanalytics.util.SecurityAnalyticsException; + +/** + * Threat intel datasource manifest file object + * + * Manifest file is stored in an external endpoint. OpenSearch read the file and store values it in this object. + */ +public class DatasourceManifest { + private static final Logger log = LogManager.getLogger(DetectorTrigger.class); + + private static final ParseField URL_FIELD = new ParseField("url"); //url for csv threat intel feed + private static final ParseField DB_NAME_FIELD = new ParseField("db_name"); // name of the db (csv file for now) + private static final ParseField SHA256_HASH_FIELD = new ParseField("sha256_hash"); //not using for now + private static final ParseField ORGANIZATION_FIELD = new ParseField("organization"); //not using for now + private static final ParseField DESCRIPTION_FIELD = new ParseField("description"); //not using for now + private static final ParseField UPDATED_AT_FIELD = new ParseField("updated_at_in_epoch_milli"); //not using for now + + /** + * @param url URL of a ZIP file containing a database + * @return URL of a ZIP file containing a database + */ + private String url; + + /** + * @param dbName A database file name inside the ZIP file + * @return A database file name inside the ZIP file + */ + private String dbName; + /** + * @param sha256Hash SHA256 hash value of a database file + * @return SHA256 hash value of a database file + */ + private String sha256Hash; + + /** + * @param organization A database organization name + * @return A database organization name + */ + private String organization; + /** + * @param description A description of the database + * @return A description of a database + */ + private String description; + /** + * @param updatedAt A date when the database was updated + * @return A date when the database was updated + */ + private Long updatedAt; + + public String getUrl() { + return this.url; + } + public String getDbName() { + return dbName; + } + + public String getOrganization() { + return organization; + } + + public String getSha256Hash() { + return sha256Hash; + } + + public String getDescription() { + return description; + } + + public Long getUpdatedAt() { + return updatedAt; + } + + public DatasourceManifest(final String url, final String dbName) { + this.url = url; + this.dbName = dbName; + } + + /** + * Datasource manifest parser + */ + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "datasource_manifest", + true, + args -> { + String url = (String) args[0]; + String dbName = (String) args[1]; + return new DatasourceManifest(url, dbName); + } + ); + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), URL_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), DB_NAME_FIELD); + } + + /** + * Datasource manifest builder + */ + public static class Builder { + private static final int MANIFEST_FILE_MAX_BYTES = 1024 * 8; + + /** + * Build DatasourceManifest from a given url + * + * @param url url to downloads a manifest file + * @return DatasourceManifest representing the manifest file + */ + @SuppressForbidden(reason = "Need to connect to http endpoint to read manifest file") // change permissions + public static DatasourceManifest build(final URL url) { + SpecialPermission.check(); + return AccessController.doPrivileged((PrivilegedAction) () -> { + try { + URLConnection connection = url.openConnection(); + return internalBuild(connection); + } catch (IOException e) { + log.error("Runtime exception connecting to the manifest file", e); + throw new SecurityAnalyticsException("Runtime exception", RestStatus.INTERNAL_SERVER_ERROR, e); //TODO + } + }); + } + + @SuppressForbidden(reason = "Need to connect to http endpoint to read manifest file") + protected static DatasourceManifest internalBuild(final URLConnection connection) throws IOException { + connection.addRequestProperty(Constants.USER_AGENT_KEY, Constants.USER_AGENT_VALUE); + InputStreamReader inputStreamReader = new InputStreamReader(connection.getInputStream()); + try (BufferedReader reader = new BufferedReader(inputStreamReader)) { + CharBuffer charBuffer = CharBuffer.allocate(MANIFEST_FILE_MAX_BYTES); + reader.read(charBuffer); + charBuffer.flip(); + XContentParser parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.IGNORE_DEPRECATIONS, + charBuffer.toString() + ); + return PARSER.parse(parser, null); + } + } + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/common/DatasourceState.java b/src/main/java/org/opensearch/securityanalytics/threatintel/common/DatasourceState.java new file mode 100644 index 000000000..a516b1d34 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/common/DatasourceState.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.common; + +/** + * Threat intel datasource state + * + * When data source is created, it starts with CREATING state. Once the first threat intel feed is generated, the state changes to AVAILABLE. + * Only when the first threat intel feed generation failed, the state changes to CREATE_FAILED. + * Subsequent threat intel feed failure won't change data source state from AVAILABLE to CREATE_FAILED. + * When delete request is received, the data source state changes to DELETING. + * + * State changed from left to right for the entire lifecycle of a datasource + * (CREATING) to (CREATE_FAILED or AVAILABLE) to (DELETING) + * + */ +public enum DatasourceState { + /** + * Data source is being created + */ + CREATING, + /** + * Data source is ready to be used + */ + AVAILABLE, + /** + * Data source creation failed + */ + CREATE_FAILED, + /** + * Data source is being deleted + */ + DELETING +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/common/ParameterValidator.java b/src/main/java/org/opensearch/securityanalytics/threatintel/common/ParameterValidator.java new file mode 100644 index 000000000..13276975c --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/common/ParameterValidator.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.common; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +import org.apache.commons.lang3.StringUtils; +import org.opensearch.core.common.Strings; + +/** + * Parameter validator for TIF APIs + */ +public class ParameterValidator { + private static final int MAX_DATASOURCE_NAME_BYTES = 127; + + /** + * Validate datasource name and return list of error messages + * + * @param datasourceName datasource name + * @return Error messages. Empty list if there is no violation. + */ + public List validateDatasourceName(final String datasourceName) { + List errorMsgs = new ArrayList<>(); + if (StringUtils.isBlank(datasourceName)) { + errorMsgs.add("datasource name must not be empty"); + return errorMsgs; + } + + if (!Strings.validFileName(datasourceName)) { + errorMsgs.add( + String.format(Locale.ROOT, "datasource name must not contain the following characters %s", Strings.INVALID_FILENAME_CHARS) + ); + } + if (datasourceName.contains("#")) { + errorMsgs.add("datasource name must not contain '#'"); + } + if (datasourceName.contains(":")) { + errorMsgs.add("datasource name must not contain ':'"); + } + if (datasourceName.charAt(0) == '_' || datasourceName.charAt(0) == '-' || datasourceName.charAt(0) == '+') { + errorMsgs.add("datasource name must not start with '_', '-', or '+'"); + } + int byteCount = datasourceName.getBytes(StandardCharsets.UTF_8).length; + if (byteCount > MAX_DATASOURCE_NAME_BYTES) { + errorMsgs.add(String.format(Locale.ROOT, "datasource name is too long, (%d > %d)", byteCount, MAX_DATASOURCE_NAME_BYTES)); + } + if (datasourceName.equals(".") || datasourceName.equals("..")) { + errorMsgs.add("datasource name must not be '.' or '..'"); + } + return errorMsgs; + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/common/StashedThreadContext.java b/src/main/java/org/opensearch/securityanalytics/threatintel/common/StashedThreadContext.java new file mode 100644 index 000000000..32f4e6d40 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/common/StashedThreadContext.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.common; + +import java.util.function.Supplier; + +import org.opensearch.client.Client; +import org.opensearch.common.util.concurrent.ThreadContext; + +/** + * Helper class to run code with stashed thread context + * + * Code need to be run with stashed thread context if it interacts with system index + * when security plugin is enabled. + */ +public class StashedThreadContext { + /** + * Set the thread context to default, this is needed to allow actions on model system index + * when security plugin is enabled + * @param function runnable that needs to be executed after thread context has been stashed, accepts and returns nothing + */ + public static void run(final Client client, final Runnable function) { + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + function.run(); + } + } + + /** + * Set the thread context to default, this is needed to allow actions on model system index + * when security plugin is enabled + * @param function supplier function that needs to be executed after thread context has been stashed, return object + */ + public static T run(final Client client, final Supplier function) { + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + return function.get(); + } + } +} + diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/common/ThreatIntelExecutor.java b/src/main/java/org/opensearch/securityanalytics/threatintel/common/ThreatIntelExecutor.java new file mode 100644 index 000000000..b3817786c --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/common/ThreatIntelExecutor.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.common; + +import java.util.concurrent.ExecutorService; + +import org.opensearch.common.settings.Settings; +import org.opensearch.threadpool.ExecutorBuilder; +import org.opensearch.threadpool.FixedExecutorBuilder; +import org.opensearch.threadpool.ThreadPool; + +/** + * Provide a list of static methods related with executors for threat intel + */ +public class ThreatIntelExecutor { + private static final String THREAD_POOL_NAME = "plugin_sap_datasource_update"; + private final ThreadPool threadPool; + + public ThreatIntelExecutor(final ThreadPool threadPool) { + this.threadPool = threadPool; + } + + /** + * We use fixed thread count of 1 for updating datasource as updating datasource is running background + * once a day at most and no need to expedite the task. + * + * @param settings the settings + * @return the executor builder + */ + public static ExecutorBuilder executorBuilder(final Settings settings) { + return new FixedExecutorBuilder(settings, THREAD_POOL_NAME, 1, 1000, THREAD_POOL_NAME, false); + } + + /** + * Return an executor service for datasource update task + * + * @return the executor service + */ + public ExecutorService forDatasourceUpdate() { + return threadPool.executor(THREAD_POOL_NAME); + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/common/ThreatIntelLockService.java b/src/main/java/org/opensearch/securityanalytics/threatintel/common/ThreatIntelLockService.java new file mode 100644 index 000000000..8847d681e --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/common/ThreatIntelLockService.java @@ -0,0 +1,167 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.common; + +import static org.opensearch.securityanalytics.threatIntel.jobscheduler.DatasourceExtension.JOB_INDEX_NAME; + +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import org.opensearch.OpenSearchException; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.action.ActionListener; +import org.opensearch.jobscheduler.spi.LockModel; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.securityanalytics.model.DetectorTrigger; + +/** + * A wrapper of job scheduler's lock service for datasource + */ +public class ThreatIntelLockService { + private static final Logger log = LogManager.getLogger(DetectorTrigger.class); + + public static final long LOCK_DURATION_IN_SECONDS = 300l; + public static final long RENEW_AFTER_IN_SECONDS = 120l; + + private final ClusterService clusterService; + private final LockService lockService; + + + /** + * Constructor + * + * @param clusterService the cluster service + * @param client the client + */ + public ThreatIntelLockService(final ClusterService clusterService, final Client client) { + this.clusterService = clusterService; + this.lockService = new LockService(client, clusterService); + } + + /** + * Wrapper method of LockService#acquireLockWithId + * + * Datasource uses its name as doc id in job scheduler. Therefore, we can use datasource name to acquire + * a lock on a datasource. + * + * @param datasourceName datasourceName to acquire lock on + * @param lockDurationSeconds the lock duration in seconds + * @param listener the listener + */ + public void acquireLock(final String datasourceName, final Long lockDurationSeconds, final ActionListener listener) { + lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, listener); + } + + /** + * Synchronous method of #acquireLock + * + * @param datasourceName datasourceName to acquire lock on + * @param lockDurationSeconds the lock duration in seconds + * @return lock model + */ + public Optional acquireLock(final String datasourceName, final Long lockDurationSeconds) { + AtomicReference lockReference = new AtomicReference(); + CountDownLatch countDownLatch = new CountDownLatch(1); + lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, new ActionListener<>() { + @Override + public void onResponse(final LockModel lockModel) { + lockReference.set(lockModel); + countDownLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + lockReference.set(null); + countDownLatch.countDown(); + log.error("aquiring lock failed", e); + } + }); + + try { + countDownLatch.await(clusterService.getClusterSettings().get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT).getSeconds(), TimeUnit.SECONDS); + return Optional.ofNullable(lockReference.get()); + } catch (InterruptedException e) { + log.error("Waiting for the count down latch failed", e); + return Optional.empty(); + } + } + + /** + * Wrapper method of LockService#release + * + * @param lockModel the lock model + */ + public void releaseLock(final LockModel lockModel) { + lockService.release( + lockModel, + ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception)) + ); + } + + /** + * Synchronous method of LockService#renewLock + * + * @param lockModel lock to renew + * @return renewed lock if renew succeed and null otherwise + */ + public LockModel renewLock(final LockModel lockModel) { + AtomicReference lockReference = new AtomicReference(); + CountDownLatch countDownLatch = new CountDownLatch(1); + lockService.renewLock(lockModel, new ActionListener<>() { + @Override + public void onResponse(final LockModel lockModel) { + lockReference.set(lockModel); + countDownLatch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + log.error("failed to renew lock", e); + lockReference.set(null); + countDownLatch.countDown(); + } + }); + + try { + countDownLatch.await(clusterService.getClusterSettings().get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT).getSeconds(), TimeUnit.SECONDS); + return lockReference.get(); + } catch (InterruptedException e) { + log.error("Interrupted exception", e); + return null; + } + } + + /** + * Return a runnable which can renew the given lock model + * + * The runnable renews the lock and store the renewed lock in the AtomicReference. + * It only renews the lock when it passed {@code RENEW_AFTER_IN_SECONDS} since + * the last time the lock was renewed to avoid resource abuse. + * + * @param lockModel lock model to renew + * @return runnable which can renew the given lock for every call + */ + public Runnable getRenewLockRunnable(final AtomicReference lockModel) { + return () -> { + LockModel preLock = lockModel.get(); + if (Instant.now().isBefore(preLock.getLockTime().plusSeconds(RENEW_AFTER_IN_SECONDS))) { + return; + } + lockModel.set(renewLock(lockModel.get())); + if (lockModel.get() == null) { + log.error("Exception: failed to renew a lock"); + new OpenSearchException("failed to renew a lock [{}]", preLock); + } + }; + } +} \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/common/ThreatIntelSettings.java b/src/main/java/org/opensearch/securityanalytics/threatintel/common/ThreatIntelSettings.java new file mode 100644 index 000000000..1d649e0b6 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/common/ThreatIntelSettings.java @@ -0,0 +1,103 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.common; + +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.List; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.securityanalytics.model.DetectorTrigger; + +/** + * Settings for threat intel datasource operations + */ +public class ThreatIntelSettings { + private static final Logger log = LogManager.getLogger(DetectorTrigger.class); + + + /** + * Default endpoint to be used in threat intel feed datasource creation API + */ + public static final Setting DATASOURCE_ENDPOINT = Setting.simpleString( + "plugins.security_analytics.threatintel.datasource.endpoint", + "https://geoip.maps.opensearch.org/v1/geolite2-city/manifest.json", //TODO fix this endpoint + new DatasourceEndpointValidator(), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Default update interval to be used in threat intel datasource creation API + */ + public static final Setting DATASOURCE_UPDATE_INTERVAL = Setting.longSetting( + "plugins.security_analytics.threatintel.datasource.update_interval_in_days", + 3l, + 1l, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Bulk size for indexing threat intel feed data + */ + public static final Setting BATCH_SIZE = Setting.intSetting( + "plugins.security_analytics.threatintel.datasource.batch_size", + 10000, + 1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Timeout value for threat intel processor + */ + public static final Setting THREAT_INTEL_TIMEOUT = Setting.timeSetting( + "plugins.security_analytics.threat_intel_timeout", + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(1), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Max size for threat intel feed cache + */ + public static final Setting CACHE_SIZE = Setting.longSetting( + "plugins.security_analytics.threatintel.processor.cache_size", + 1000, + 0, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Return all settings of threat intel feature + * @return a list of all settings for threat intel feature + */ + public static final List> settings() { + return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, BATCH_SIZE, THREAT_INTEL_TIMEOUT); + } + + /** + * Visible for testing + */ + protected static class DatasourceEndpointValidator implements Setting.Validator { + @Override + public void validate(final String value) { + try { + new URL(value).toURI(); + } catch (MalformedURLException | URISyntaxException e) { + log.error("Invalid URL format is provided", e); + throw new IllegalArgumentException("Invalid URL format is provided"); + } + } + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/dao/DatasourceDao.java b/src/main/java/org/opensearch/securityanalytics/threatintel/dao/DatasourceDao.java new file mode 100644 index 000000000..9d6a15241 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/dao/DatasourceDao.java @@ -0,0 +1,380 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.dao; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchException; +import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.ResourceNotFoundException; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.StepListener; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.get.MultiGetItemResponse; +import org.opensearch.action.get.MultiGetResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.routing.Preference; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.securityanalytics.model.DetectorTrigger; +import org.opensearch.securityanalytics.threatIntel.common.ThreatIntelSettings; +import org.opensearch.securityanalytics.threatIntel.jobscheduler.Datasource; +import org.opensearch.securityanalytics.threatIntel.jobscheduler.DatasourceExtension; +import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.SearchHit; +import org.opensearch.securityanalytics.util.SecurityAnalyticsException; + +/** + * Data access object for datasource + */ +public class DatasourceDao { + private static final Logger log = LogManager.getLogger(DetectorTrigger.class); + + private static final Integer MAX_SIZE = 1000; + private final Client client; + private final ClusterService clusterService; + private final ClusterSettings clusterSettings; + + public DatasourceDao(final Client client, final ClusterService clusterService) { + this.client = client; + this.clusterService = clusterService; + this.clusterSettings = clusterService.getClusterSettings(); + } + + /** + * Create datasource index + * + * @param stepListener setup listener + */ + public void createIndexIfNotExists(final StepListener stepListener) { + if (clusterService.state().metadata().hasIndex(DatasourceExtension.JOB_INDEX_NAME) == true) { + stepListener.onResponse(null); + return; + } + final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping()) + .settings(DatasourceExtension.INDEX_SETTING); + StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, new ActionListener<>() { + @Override + public void onResponse(final CreateIndexResponse createIndexResponse) { + stepListener.onResponse(null); + } + + @Override + public void onFailure(final Exception e) { + if (e instanceof ResourceAlreadyExistsException) { + log.info("index[{}] already exist", DatasourceExtension.JOB_INDEX_NAME); + stepListener.onResponse(null); + return; + } + stepListener.onFailure(e); + } + })); + } + + private String getIndexMapping() { + try { + try (InputStream is = DatasourceDao.class.getResourceAsStream("/mappings/threatintel_datasource.json")) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + return reader.lines().map(String::trim).collect(Collectors.joining()); + } + } + } catch (IOException e) { + log.error("Runtime exception", e); + throw new SecurityAnalyticsException("Runtime exception", RestStatus.INTERNAL_SERVER_ERROR, e); //TODO + } + } + + /** + * Update datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param datasource the datasource + * @return index response + */ + public IndexResponse updateDatasource(final Datasource datasource) { + datasource.setLastUpdateTime(Instant.now()); + return StashedThreadContext.run(client, () -> { + try { + return client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME) + .setId(datasource.getName()) + .setOpType(DocWriteRequest.OpType.INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .execute() + .actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT)); + } catch (IOException e) { + throw new SecurityAnalyticsException("Runtime exception", RestStatus.INTERNAL_SERVER_ERROR, e); //TODO + } + }); + } + + /** + * Update datasources in an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param datasources the datasources + * @param listener action listener + */ + public void updateDatasource(final List datasources, final ActionListener listener) { + BulkRequest bulkRequest = new BulkRequest(); + datasources.stream().map(datasource -> { + datasource.setLastUpdateTime(Instant.now()); + return datasource; + }).map(this::toIndexRequest).forEach(indexRequest -> bulkRequest.add(indexRequest)); + StashedThreadContext.run(client, () -> client.bulk(bulkRequest, listener)); + } + + private IndexRequest toIndexRequest(Datasource datasource) { + try { + IndexRequest indexRequest = new IndexRequest(); + indexRequest.index(DatasourceExtension.JOB_INDEX_NAME); + indexRequest.id(datasource.getName()); + indexRequest.opType(DocWriteRequest.OpType.INDEX); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + indexRequest.source(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)); + return indexRequest; + } catch (IOException e) { + throw new SecurityAnalyticsException("Runtime exception", RestStatus.INTERNAL_SERVER_ERROR, e); //TODO + } + } + + /** + * Put datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME} + * + * @param datasource the datasource + * @param listener the listener + */ + public void putDatasource(final Datasource datasource, final ActionListener listener) { + datasource.setLastUpdateTime(Instant.now()); + StashedThreadContext.run(client, () -> { + try { + client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME) + .setId(datasource.getName()) + .setOpType(DocWriteRequest.OpType.CREATE) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .execute(listener); + } catch (IOException e) { + throw new SecurityAnalyticsException("Runtime exception", RestStatus.INTERNAL_SERVER_ERROR, e); //TODO + } + }); + } + + /** + * Delete datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME} + * + * @param datasource the datasource + * + */ + public void deleteDatasource(final Datasource datasource) { + DeleteResponse response = client.prepareDelete() + .setIndex(DatasourceExtension.JOB_INDEX_NAME) + .setId(datasource.getName()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .execute() + .actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT)); + + if (response.status().equals(RestStatus.OK)) { + log.info("deleted datasource[{}] successfully", datasource.getName()); + } else if (response.status().equals(RestStatus.NOT_FOUND)) { + throw new ResourceNotFoundException("datasource[{}] does not exist", datasource.getName()); + } else { + throw new OpenSearchException("failed to delete datasource[{}] with status[{}]", datasource.getName(), response.status()); + } + } + + /** + * Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param name the name of a datasource + * @return datasource + * @throws IOException exception + */ + public Datasource getDatasource(final String name) throws IOException { + GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name); + GetResponse response; + try { + response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT))); + if (response.isExists() == false) { + log.error("Datasource[{}] does not exist in an index[{}]", name, DatasourceExtension.JOB_INDEX_NAME); + return null; + } + } catch (IndexNotFoundException e) { + log.error("Index[{}] is not found", DatasourceExtension.JOB_INDEX_NAME); + return null; + } + + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + response.getSourceAsBytesRef() + ); + return Datasource.PARSER.parse(parser, null); + } + + /** + * Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param name the name of a datasource + * @param actionListener the action listener + */ + public void getDatasource(final String name, final ActionListener actionListener) { + GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name); + StashedThreadContext.run(client, () -> client.get(request, new ActionListener<>() { + @Override + public void onResponse(final GetResponse response) { + if (response.isExists() == false) { + actionListener.onResponse(null); + return; + } + + try { + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + response.getSourceAsBytesRef() + ); + actionListener.onResponse(Datasource.PARSER.parse(parser, null)); + } catch (IOException e) { + actionListener.onFailure(e); + } + } + + @Override + public void onFailure(final Exception e) { + actionListener.onFailure(e); + } + })); + } + + /** + * Get datasources from an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param names the array of datasource names + * @param actionListener the action listener + */ + public void getDatasources(final String[] names, final ActionListener> actionListener) { + StashedThreadContext.run( + client, + () -> client.prepareMultiGet() + .add(DatasourceExtension.JOB_INDEX_NAME, names) + .execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener)) + ); + } + + /** + * Get all datasources up to {@code MAX_SIZE} from an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param actionListener the action listener + */ + public void getAllDatasources(final ActionListener> actionListener) { + StashedThreadContext.run( + client, + () -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME) + .setQuery(QueryBuilders.matchAllQuery()) + .setPreference(Preference.PRIMARY.type()) + .setSize(MAX_SIZE) + .execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener)) + ); + } + + /** + * Get all datasources up to {@code MAX_SIZE} from an index {@code DatasourceExtension.JOB_INDEX_NAME} + */ + public List getAllDatasources() { + SearchResponse response = StashedThreadContext.run( + client, + () -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME) + .setQuery(QueryBuilders.matchAllQuery()) + .setPreference(Preference.PRIMARY.type()) + .setSize(MAX_SIZE) + .execute() + .actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT)) + ); + + List bytesReferences = toBytesReferences(response); + return bytesReferences.stream().map(bytesRef -> toDatasource(bytesRef)).collect(Collectors.toList()); + } + + private ActionListener createGetDataSourceQueryActionLister( + final Class response, + final ActionListener> actionListener + ) { + return new ActionListener() { + @Override + public void onResponse(final T response) { + try { + List bytesReferences = toBytesReferences(response); + List datasources = bytesReferences.stream() + .map(bytesRef -> toDatasource(bytesRef)) + .collect(Collectors.toList()); + actionListener.onResponse(datasources); + } catch (Exception e) { + actionListener.onFailure(e); + } + } + + @Override + public void onFailure(final Exception e) { + actionListener.onFailure(e); + } + }; + } + + private List toBytesReferences(final Object response) { + if (response instanceof SearchResponse) { + SearchResponse searchResponse = (SearchResponse) response; + return Arrays.stream(searchResponse.getHits().getHits()).map(SearchHit::getSourceRef).collect(Collectors.toList()); + } else if (response instanceof MultiGetResponse) { + MultiGetResponse multiGetResponse = (MultiGetResponse) response; + return Arrays.stream(multiGetResponse.getResponses()) + .map(MultiGetItemResponse::getResponse) + .filter(Objects::nonNull) + .filter(GetResponse::isExists) + .map(GetResponse::getSourceAsBytesRef) + .collect(Collectors.toList()); + } else { + throw new OpenSearchException("No supported instance type[{}] is provided", response.getClass()); + } + } + + private Datasource toDatasource(final BytesReference bytesReference) { + try { + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + bytesReference + ); + return Datasource.PARSER.parse(parser, null); + } catch (IOException e) { + throw new SecurityAnalyticsException("Runtime exception", RestStatus.INTERNAL_SERVER_ERROR, e); //TODO + } + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/Datasource.java b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/Datasource.java new file mode 100644 index 000000000..00ff1d419 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/Datasource.java @@ -0,0 +1,819 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.securityanalytics.threatIntel.jobscheduler; + +import org.opensearch.core.ParseField; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ConstructingObjectParser; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.*; + +import static org.opensearch.common.time.DateUtils.toInstant; + +import org.opensearch.securityanalytics.threatIntel.action.PutDatasourceRequest; +import org.opensearch.securityanalytics.threatIntel.common.DatasourceManifest; +import org.opensearch.securityanalytics.threatIntel.common.DatasourceState; +import org.opensearch.securityanalytics.threatIntel.common.ThreatIntelLockService; + +public class Datasource implements Writeable, ScheduledJobParameter { + /** + * Prefix of indices having threatIntel data + */ + public static final String THREAT_INTEL_DATA_INDEX_NAME_PREFIX = "opensearch-sap-threatintel"; + + /** + * Default fields for job scheduling + */ + private static final ParseField NAME_FIELD = new ParseField("name"); + private static final ParseField ENABLED_FIELD = new ParseField("update_enabled"); + private static final ParseField LAST_UPDATE_TIME_FIELD = new ParseField("last_update_time"); + private static final ParseField LAST_UPDATE_TIME_FIELD_READABLE = new ParseField("last_update_time_field"); + public static final ParseField SCHEDULE_FIELD = new ParseField("schedule"); + private static final ParseField ENABLED_TIME_FIELD = new ParseField("enabled_time"); + private static final ParseField ENABLED_TIME_FIELD_READABLE = new ParseField("enabled_time_field"); + + // need? + private static final ParseField TASK_FIELD = new ParseField("task"); + public static final String LOCK_DURATION_SECONDS = "lock_duration_seconds"; + + /** + * Additional fields for datasource + */ + private static final ParseField FEED_NAME = new ParseField("feed_name"); + private static final ParseField FEED_FORMAT = new ParseField("feed_format"); + private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); + private static final ParseField DESCRIPTION = new ParseField("description"); + private static final ParseField ORGANIZATION = new ParseField("organization"); + private static final ParseField CONTAINED_IOCS_FIELD = new ParseField("contained_iocs_field"); + private static final ParseField STATE_FIELD = new ParseField("state"); + private static final ParseField CURRENT_INDEX_FIELD = new ParseField("current_index"); + private static final ParseField INDICES_FIELD = new ParseField("indices"); + private static final ParseField DATABASE_FIELD = new ParseField("database"); + private static final ParseField UPDATE_STATS_FIELD = new ParseField("update_stats"); + + + /** + * Default variables for job scheduling + */ + + /** + * @param name name of a datasource + * @return name of a datasource + */ + private String name; + + /** + * @param lastUpdateTime Last update time of a datasource + * @return Last update time of a datasource + */ + private Instant lastUpdateTime; + /** + * @param enabledTime Last time when a scheduling is enabled for a threat intel feed data update + * @return Last time when a scheduling is enabled for the job scheduler + */ + private Instant enabledTime; + /** + * @param isEnabled Indicate if threat intel feed data update is scheduled or not + * @return Indicate if scheduling is enabled or not + */ + private boolean isEnabled; + /** + * @param schedule Schedule that system uses + * @return Schedule that system uses + */ + private IntervalSchedule schedule; + + /** + * @param task Task that {@link DatasourceRunner} will execute + * @return Task that {@link DatasourceRunner} will execute + */ + private DatasourceTask task; + + + /** + * Additional variables for datasource + */ + + /** + * @param feedFormat format of the feed (ip, dns...) + * @return the type of feed ingested + */ + private String feedFormat; + + /** + * @param endpoint URL of a manifest file + * @return URL of a manifest file + */ + private String endpoint; + + /** + * @param feedName name of the threat intel feed + * @return name of the threat intel feed + */ + private String feedName; + + /** + * @param description description of the threat intel feed + * @return description of the threat intel feed + */ + private String description; + + /** + * @param organization organization of the threat intel feed + * @return organization of the threat intel feed + */ + private String organization; + + /** + * @param contained_iocs_field list of iocs contained in a given feed + * @return list of iocs contained in a given feed + */ + private List contained_iocs_field; + + /** + * @param state State of a datasource + * @return State of a datasource + */ + private DatasourceState state; + + /** + * @param currentIndex the current index name having threat intel feed data + * @return the current index name having threat intel feed data + */ + private String currentIndex; + /** + * @param indices A list of indices having threat intel feed data including currentIndex + * @return A list of indices having threat intel feed data including currentIndex + */ + private List indices; + /** + * @param database threat intel feed database information + * @return threat intel feed database information + */ + private Database database; + /** + * @param updateStats threat intel feed database update statistics + * @return threat intel feed database update statistics + */ + private UpdateStats updateStats; + + public DatasourceTask getTask() { + return task; + } + + public void setEndpoint(String endpoint) { + this.endpoint = endpoint; + } + + public void setLastUpdateTime(Instant lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } + + public void setOrganization(String organization) { + this.organization = organization; + } + + public void setCurrentIndex(String currentIndex) { + this.currentIndex = currentIndex; + } + + public void setTask(DatasourceTask task) { + this.task = task; + } + + + /** + * Datasource parser + */ + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "datasource_metadata", + true, + args -> { + String name = (String) args[0]; + Instant lastUpdateTime = Instant.ofEpochMilli((long) args[1]); + Instant enabledTime = args[2] == null ? null : Instant.ofEpochMilli((long) args[2]); + boolean isEnabled = (boolean) args[3]; + IntervalSchedule schedule = (IntervalSchedule) args[4]; + DatasourceTask task = DatasourceTask.valueOf((String) args[6]); + String feedFormat = (String) args[7]; + String endpoint = (String) args[8]; + String feedName = (String) args[9]; + String description = (String) args[10]; + String organization = (String) args[11]; + List contained_iocs_field = (List) args[12]; + DatasourceState state = DatasourceState.valueOf((String) args[13]); + String currentIndex = (String) args[14]; + List indices = (List) args[15]; + Database database = (Database) args[16]; + UpdateStats updateStats = (UpdateStats) args[17]; + Datasource parameter = new Datasource( + name, + lastUpdateTime, + enabledTime, + isEnabled, + schedule, + task, + feedFormat, + endpoint, + feedName, + description, + organization, + contained_iocs_field, + state, + currentIndex, + indices, + database, + updateStats + ); + return parameter; + } + ); + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_UPDATE_TIME_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), ENABLED_TIME_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ScheduleParser.parse(p), SCHEDULE_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), TASK_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), FEED_FORMAT); + PARSER.declareString(ConstructingObjectParser.constructorArg(), ENDPOINT_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), FEED_NAME); + PARSER.declareString(ConstructingObjectParser.constructorArg(), DESCRIPTION); + PARSER.declareString(ConstructingObjectParser.constructorArg(), ORGANIZATION); + PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), CONTAINED_IOCS_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), CURRENT_INDEX_FIELD); + PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), Database.PARSER, DATABASE_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), UpdateStats.PARSER, UPDATE_STATS_FIELD); + } + + public Datasource() { + this(null, null, null, null, null, null, null, null); + } + + public Datasource(final String name, final Instant lastUpdateTime, final Instant enabledTime, final Boolean isEnabled, + final IntervalSchedule schedule, DatasourceTask task, final String feedFormat, final String endpoint, + final String feedName, final String description, final String organization, final List contained_iocs_field, + final DatasourceState state, final String currentIndex, final List indices, final Database database, final UpdateStats updateStats) { + this.name = name; + this.lastUpdateTime = lastUpdateTime; + this.enabledTime = enabledTime; + this.isEnabled = isEnabled; + this.schedule = schedule; + this.task = task; + this.feedFormat = feedFormat; + this.endpoint = endpoint; + this.feedName = feedName; + this.description = description; + this.organization = organization; + this.contained_iocs_field = contained_iocs_field; + this.state = state; + this.currentIndex = currentIndex; + this.indices = indices; + this.database = database; + this.updateStats = updateStats; + } + + public Datasource(final String name, final IntervalSchedule schedule, final String feedFormat, final String endpoint, final String feedName, final String description, final String organization, final List contained_iocs_field ) { + this( + name, + Instant.now().truncatedTo(ChronoUnit.MILLIS), + null, + false, + schedule, + DatasourceTask.ALL, + feedFormat, + endpoint, + feedName, + description, + organization, + contained_iocs_field, + DatasourceState.CREATING, + null, + new ArrayList<>(), + new Database(), + new UpdateStats() + ); + } + + public Datasource(final StreamInput in) throws IOException { + name = in.readString(); + lastUpdateTime = toInstant(in.readVLong()); + enabledTime = toInstant(in.readOptionalVLong()); + isEnabled = in.readBoolean(); + schedule = new IntervalSchedule(in); + task = DatasourceTask.valueOf(in.readString()); + feedFormat = in.readString(); + endpoint = in.readString(); + feedName = in.readString(); + description = in.readString(); + organization = in.readString(); + contained_iocs_field = in.readStringList(); + state = DatasourceState.valueOf(in.readString()); + currentIndex = in.readOptionalString(); + indices = in.readStringList(); + database = new Database(in); + updateStats = new UpdateStats(in); + } + + public void writeTo(final StreamOutput out) throws IOException { + out.writeString(name); + out.writeVLong(lastUpdateTime.toEpochMilli()); + out.writeOptionalVLong(enabledTime == null ? null : enabledTime.toEpochMilli()); + out.writeBoolean(isEnabled); + schedule.writeTo(out); + out.writeString(task.name()); + out.writeString(feedFormat); + out.writeString(endpoint); + out.writeString(feedName); + out.writeString(description); + out.writeString(organization); + out.writeStringCollection(contained_iocs_field); + out.writeString(state.name()); + out.writeOptionalString(currentIndex); + out.writeStringCollection(indices); + database.writeTo(out); + updateStats.writeTo(out); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + builder.field(NAME_FIELD.getPreferredName(), name); + builder.timeField( + LAST_UPDATE_TIME_FIELD.getPreferredName(), + LAST_UPDATE_TIME_FIELD_READABLE.getPreferredName(), + lastUpdateTime.toEpochMilli() + ); + if (enabledTime != null) { + builder.timeField( + ENABLED_TIME_FIELD.getPreferredName(), + ENABLED_TIME_FIELD_READABLE.getPreferredName(), + enabledTime.toEpochMilli() + ); + } + builder.field(ENABLED_FIELD.getPreferredName(), isEnabled); + builder.field(SCHEDULE_FIELD.getPreferredName(), schedule); + builder.field(TASK_FIELD.getPreferredName(), task.name()); + builder.field(FEED_FORMAT.getPreferredName(), feedFormat); + builder.field(ENDPOINT_FIELD.getPreferredName(), endpoint); + builder.field(FEED_NAME.getPreferredName(), feedName); + builder.field(DESCRIPTION.getPreferredName(), description); + builder.field(ORGANIZATION.getPreferredName(), organization); + builder.field(CONTAINED_IOCS_FIELD.getPreferredName(), contained_iocs_field); + builder.field(STATE_FIELD.getPreferredName(), state.name()); + if (currentIndex != null) { + builder.field(CURRENT_INDEX_FIELD.getPreferredName(), currentIndex); + } + builder.field(INDICES_FIELD.getPreferredName(), indices); + builder.field(DATABASE_FIELD.getPreferredName(), database); + builder.field(UPDATE_STATS_FIELD.getPreferredName(), updateStats); + builder.endObject(); + return builder; + } + + @Override + public String getName() { + return this.name; + } + + @Override + public Instant getLastUpdateTime() { + return this.lastUpdateTime; + } + + @Override + public Instant getEnabledTime() { + return this.enabledTime; + } + + @Override + public IntervalSchedule getSchedule() { + return this.schedule; + } + + @Override + public boolean isEnabled() { + return this.isEnabled; + } + + @Override + public Long getLockDurationSeconds() { + return ThreatIntelLockService.LOCK_DURATION_IN_SECONDS; + } + + /** + * Enable auto update of threat intel feed data + */ + public void enable() { + if (isEnabled == true) { + return; + } + enabledTime = Instant.now().truncatedTo(ChronoUnit.MILLIS); + isEnabled = true; + } + + /** + * Disable auto update of threat intel feed data + */ + public void disable() { + enabledTime = null; + isEnabled = false; + } + + /** + * Current index name of a datasource + * + * @return Current index name of a datasource + */ + public String currentIndexName() { + return currentIndex; + } + + public void setSchedule(IntervalSchedule schedule) { + this.schedule = schedule; + } + + /** + * Reset database so that it can be updated in next run regardless there is new update or not + */ + public void resetDatabase() { + database.setUpdatedAt(null); + database.setSha256Hash(null); + } + + /** + * Index name for a datasource with given suffix + * + * @param suffix the suffix of a index name + * @return index name for a datasource with given suffix + */ + public String newIndexName(final String suffix) { + return String.format(Locale.ROOT, "%s.%s.%s", THREAT_INTEL_DATA_INDEX_NAME_PREFIX, name, suffix); + } + + /** + * Set database attributes with given input + * + * @param datasourceManifest the datasource manifest + * @param fields the fields + */ + public void setDatabase(final DatasourceManifest datasourceManifest, final List fields) { + this.database.setProvider(datasourceManifest.getOrganization()); + this.database.setSha256Hash(datasourceManifest.getSha256Hash()); + this.database.setUpdatedAt(Instant.ofEpochMilli(datasourceManifest.getUpdatedAt())); + this.database.setFields(fields); + } + + /** + * Checks if the database fields are compatible with the given set of fields. + * + * If database fields are null, it is compatible with any input fields + * as it hasn't been generated before. + * + * @param fields The set of input fields to check for compatibility. + * @return true if the database fields are compatible with the given input fields, false otherwise. + */ + public boolean isCompatible(final List fields) { + if (database.fields == null) { + return true; + } + + if (fields.size() < database.fields.size()) { + return false; + } + + Set fieldsSet = new HashSet<>(fields); + for (String field : database.fields) { + if (fieldsSet.contains(field) == false) { + return false; + } + } + return true; + } + + public DatasourceState getState() { + return state; + } + + public List getIndices() { + return indices; + } + + public void setState(DatasourceState previousState) { + this.state = previousState; + } + + public String getEndpoint() { + return this.endpoint; + } + + public Database getDatabase() { + return this.database; + } + + public UpdateStats getUpdateStats() { + return this.updateStats; + } + + /** + * Database of a datasource + */ + public static class Database implements Writeable, ToXContent { + private static final ParseField PROVIDER_FIELD = new ParseField("provider"); + private static final ParseField SHA256_HASH_FIELD = new ParseField("sha256_hash"); + private static final ParseField UPDATED_AT_FIELD = new ParseField("updated_at_in_epoch_millis"); + private static final ParseField UPDATED_AT_FIELD_READABLE = new ParseField("updated_at"); + private static final ParseField FIELDS_FIELD = new ParseField("fields"); + + /** + * @param provider A database provider name + * @return A database provider name + */ + private String provider; + /** + * @param sha256Hash SHA256 hash value of a database file + * @return SHA256 hash value of a database file + */ + private String sha256Hash; + + /** + * @param updatedAt A date when the database was updated + * @return A date when the database was updated + */ + private Instant updatedAt; + + /** + * @param fields A list of available fields in the database + * @return A list of available fields in the database + */ + private List fields; + + public Database(String provider, String sha256Hash, Instant updatedAt, List fields) { + this.provider = provider; + this.sha256Hash = sha256Hash; + this.updatedAt = updatedAt; + this.fields = fields; + } + + public void setProvider(String provider) { + this.provider = provider; + } + + public void setSha256Hash(String sha256Hash) { + this.sha256Hash = sha256Hash; + } + + public void setUpdatedAt(Instant updatedAt) { + this.updatedAt = updatedAt; + } + + public void setFields(List fields) { + this.fields = fields; + } + + public Instant getUpdatedAt() { + return updatedAt; + } + + public String getSha256Hash() { + return sha256Hash; + } + + public List getFields() { + return fields; + } + + public String getProvider() { + return provider; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "datasource_metadata_database", + true, + args -> { + String provider = (String) args[0]; + String sha256Hash = (String) args[1]; + Instant updatedAt = args[2] == null ? null : Instant.ofEpochMilli((Long) args[2]); + List fields = (List) args[3]; + return new Database(provider, sha256Hash, updatedAt, fields); + } + ); + static { + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), PROVIDER_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), SHA256_HASH_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), UPDATED_AT_FIELD); + PARSER.declareStringArray(ConstructingObjectParser.optionalConstructorArg(), FIELDS_FIELD); + } + + public Database(final StreamInput in) throws IOException { + provider = in.readOptionalString(); + sha256Hash = in.readOptionalString(); + updatedAt = toInstant(in.readOptionalVLong()); + fields = in.readOptionalStringList(); + } + + private Database(){} + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeOptionalString(provider); + out.writeOptionalString(sha256Hash); + out.writeOptionalVLong(updatedAt == null ? null : updatedAt.toEpochMilli()); + out.writeOptionalStringCollection(fields); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + if (provider != null) { + builder.field(PROVIDER_FIELD.getPreferredName(), provider); + } + if (sha256Hash != null) { + builder.field(SHA256_HASH_FIELD.getPreferredName(), sha256Hash); + } + if (updatedAt != null) { + builder.timeField( + UPDATED_AT_FIELD.getPreferredName(), + UPDATED_AT_FIELD_READABLE.getPreferredName(), + updatedAt.toEpochMilli() + ); + } + if (fields != null) { + builder.startArray(FIELDS_FIELD.getPreferredName()); + for (String field : fields) { + builder.value(field); + } + builder.endArray(); + } + builder.endObject(); + return builder; + } + } + + /** + * Update stats of a datasource + */ + public static class UpdateStats implements Writeable, ToXContent { + private static final ParseField LAST_SUCCEEDED_AT_FIELD = new ParseField("last_succeeded_at_in_epoch_millis"); + private static final ParseField LAST_SUCCEEDED_AT_FIELD_READABLE = new ParseField("last_succeeded_at"); + private static final ParseField LAST_PROCESSING_TIME_IN_MILLIS_FIELD = new ParseField("last_processing_time_in_millis"); + private static final ParseField LAST_FAILED_AT_FIELD = new ParseField("last_failed_at_in_epoch_millis"); + private static final ParseField LAST_FAILED_AT_FIELD_READABLE = new ParseField("last_failed_at"); + private static final ParseField LAST_SKIPPED_AT = new ParseField("last_skipped_at_in_epoch_millis"); + private static final ParseField LAST_SKIPPED_AT_READABLE = new ParseField("last_skipped_at"); + + /** + * @param lastSucceededAt The last time when threat intel feed data update was succeeded + * @return The last time when threat intel feed data update was succeeded + */ + private Instant lastSucceededAt; + /** + * @param lastProcessingTimeInMillis The last processing time when threat intel feed data update was succeeded + * @return The last processing time when threat intel feed data update was succeeded + */ + private Long lastProcessingTimeInMillis; + /** + * @param lastFailedAt The last time when threat intel feed data update was failed + * @return The last time when threat intel feed data update was failed + */ + private Instant lastFailedAt; + + /** + * @param lastSkippedAt The last time when threat intel feed data update was skipped as there was no new update from an endpoint + * @return The last time when threat intel feed data update was skipped as there was no new update from an endpoint + */ + private Instant lastSkippedAt; + + private UpdateStats(){} + + public void setLastSkippedAt(Instant lastSkippedAt) { + this.lastSkippedAt = lastSkippedAt; + } + + public void setLastSucceededAt(Instant lastSucceededAt) { + this.lastSucceededAt = lastSucceededAt; + } + + public void setLastProcessingTimeInMillis(Long lastProcessingTimeInMillis) { + this.lastProcessingTimeInMillis = lastProcessingTimeInMillis; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "datasource_metadata_update_stats", + true, + args -> { + Instant lastSucceededAt = args[0] == null ? null : Instant.ofEpochMilli((long) args[0]); + Long lastProcessingTimeInMillis = (Long) args[1]; + Instant lastFailedAt = args[2] == null ? null : Instant.ofEpochMilli((long) args[2]); + Instant lastSkippedAt = args[3] == null ? null : Instant.ofEpochMilli((long) args[3]); + return new UpdateStats(lastSucceededAt, lastProcessingTimeInMillis, lastFailedAt, lastSkippedAt); + } + ); + + static { + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_SUCCEEDED_AT_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_PROCESSING_TIME_IN_MILLIS_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_FAILED_AT_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_SKIPPED_AT); + } + + public UpdateStats(final StreamInput in) throws IOException { + lastSucceededAt = toInstant(in.readOptionalVLong()); + lastProcessingTimeInMillis = in.readOptionalVLong(); + lastFailedAt = toInstant(in.readOptionalVLong()); + lastSkippedAt = toInstant(in.readOptionalVLong()); + } + + public UpdateStats(Instant lastSucceededAt, Long lastProcessingTimeInMillis, Instant lastFailedAt, Instant lastSkippedAt) { + this.lastSucceededAt = lastSucceededAt; + this.lastProcessingTimeInMillis = lastProcessingTimeInMillis; + this.lastFailedAt = lastFailedAt; + this.lastSkippedAt = lastSkippedAt; + } + + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeOptionalVLong(lastSucceededAt == null ? null : lastSucceededAt.toEpochMilli()); + out.writeOptionalVLong(lastProcessingTimeInMillis); + out.writeOptionalVLong(lastFailedAt == null ? null : lastFailedAt.toEpochMilli()); + out.writeOptionalVLong(lastSkippedAt == null ? null : lastSkippedAt.toEpochMilli()); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + if (lastSucceededAt != null) { + builder.timeField( + LAST_SUCCEEDED_AT_FIELD.getPreferredName(), + LAST_SUCCEEDED_AT_FIELD_READABLE.getPreferredName(), + lastSucceededAt.toEpochMilli() + ); + } + if (lastProcessingTimeInMillis != null) { + builder.field(LAST_PROCESSING_TIME_IN_MILLIS_FIELD.getPreferredName(), lastProcessingTimeInMillis); + } + if (lastFailedAt != null) { + builder.timeField( + LAST_FAILED_AT_FIELD.getPreferredName(), + LAST_FAILED_AT_FIELD_READABLE.getPreferredName(), + lastFailedAt.toEpochMilli() + ); + } + if (lastSkippedAt != null) { + builder.timeField( + LAST_SKIPPED_AT.getPreferredName(), + LAST_SKIPPED_AT_READABLE.getPreferredName(), + lastSkippedAt.toEpochMilli() + ); + } + builder.endObject(); + return builder; + } + + public void setLastFailedAt(Instant now) { + this.lastFailedAt = now; + } + } + + + /** + * Builder class for Datasource + */ + public static class Builder { + public static Datasource build(final PutDatasourceRequest request) { + String id = request.getName(); + IntervalSchedule schedule = new IntervalSchedule( + Instant.now().truncatedTo(ChronoUnit.MILLIS), + (int) request.getUpdateInterval().days(), + ChronoUnit.DAYS + ); + String feedFormat = request.getFeedFormat(); + String endpoint = request.getEndpoint(); + String feedName = request.getFeedName(); + String description = request.getDescription(); + String organization = request.getOrganization(); + List contained_iocs_field = request.getContained_iocs_field(); + return new Datasource(id, schedule, feedFormat, endpoint, feedName, description, organization, contained_iocs_field); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceExtension.java b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceExtension.java new file mode 100644 index 000000000..4d32973e6 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceExtension.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.jobscheduler; + +import org.opensearch.jobscheduler.spi.JobSchedulerExtension; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; + +import java.util.Map; + +public class DatasourceExtension implements JobSchedulerExtension { + /** + * Job index name for a datasource + */ + public static final String JOB_INDEX_NAME = ".scheduler-security_analytics-threatintel-datasource"; //rename this... + + /** + * Job index setting + * + * We want it to be single shard so that job can be run only in a single node by job scheduler. + * We want it to expand to all replicas so that querying to this index can be done locally to reduce latency. + */ + public static final Map INDEX_SETTING = Map.of("index.number_of_shards", 1, "index.number_of_replicas", "0-all", "index.hidden", true); + + @Override + public String getJobType() { + return "scheduler_security_analytics_threatintel_datasource"; + } + + @Override + public String getJobIndex() { + return JOB_INDEX_NAME; + } + + @Override + public ScheduledJobRunner getJobRunner() { + return DatasourceRunner.getJobRunnerInstance(); + } + + @Override + public ScheduledJobParser getJobParser() { + return (parser, id, jobDocVersion) -> Datasource.PARSER.parse(parser, null); + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceRunner.java b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceRunner.java new file mode 100644 index 000000000..8de306d33 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceRunner.java @@ -0,0 +1,159 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.jobscheduler; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.LockModel; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.securityanalytics.model.DetectorTrigger; + +import java.io.IOException; +import java.time.temporal.ChronoUnit; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.time.Instant; + +import org.opensearch.securityanalytics.threatIntel.common.DatasourceState; +import org.opensearch.securityanalytics.threatIntel.common.ThreatIntelExecutor; +import org.opensearch.securityanalytics.threatIntel.common.ThreatIntelLockService; +import org.opensearch.securityanalytics.threatIntel.dao.DatasourceDao; +/** + * Datasource update task + * + * This is a background task which is responsible for updating threat intel feed data + */ +public class DatasourceRunner implements ScheduledJobRunner { + private static final Logger log = LogManager.getLogger(DetectorTrigger.class); + private static DatasourceRunner INSTANCE; + + public static DatasourceRunner getJobRunnerInstance() { + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (DatasourceRunner.class) { + if (INSTANCE != null) { + return INSTANCE; + } + INSTANCE = new DatasourceRunner(); + return INSTANCE; + } + } + + private ClusterService clusterService; + + // threat intel specific variables + private DatasourceUpdateService datasourceUpdateService; + private DatasourceDao datasourceDao; + private ThreatIntelExecutor threatIntelExecutor; + private ThreatIntelLockService lockService; + private boolean initialized; + + private DatasourceRunner() { + // Singleton class, use getJobRunner method instead of constructor + } + + public void initialize( + final ClusterService clusterService, + final DatasourceUpdateService datasourceUpdateService, + final DatasourceDao datasourceDao, + final ThreatIntelExecutor threatIntelExecutor, + final ThreatIntelLockService threatIntelLockService + ) { + this.clusterService = clusterService; + this.datasourceUpdateService = datasourceUpdateService; + this.datasourceDao = datasourceDao; + this.threatIntelExecutor = threatIntelExecutor; + this.lockService = threatIntelLockService; + this.initialized = true; + } + + @Override + public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionContext context) { + if (initialized == false) { + throw new AssertionError("this instance is not initialized"); + } + + log.info("Update job started for a datasource[{}]", jobParameter.getName()); + if (jobParameter instanceof Datasource == false) { + log.error("Illegal state exception: job parameter is not instance of Datasource"); + throw new IllegalStateException( + "job parameter is not instance of Datasource, type: " + jobParameter.getClass().getCanonicalName() + ); + } + threatIntelExecutor.forDatasourceUpdate().submit(updateDatasourceRunner(jobParameter)); + } + + /** + * Update threat intel feed data + * + * Lock is used so that only one of nodes run this task. + * + * @param jobParameter job parameter + */ + protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParameter) { + return () -> { + Optional lockModel = lockService.acquireLock( + jobParameter.getName(), + ThreatIntelLockService.LOCK_DURATION_IN_SECONDS + ); + if (lockModel.isEmpty()) { + log.error("Failed to update. Another processor is holding a lock for datasource[{}]", jobParameter.getName()); + return; + } + + LockModel lock = lockModel.get(); + try { + updateDatasource(jobParameter, lockService.getRenewLockRunnable(new AtomicReference<>(lock))); + } catch (Exception e) { + log.error("Failed to update datasource[{}]", jobParameter.getName(), e); + } finally { + lockService.releaseLock(lock); + } + }; + } + + protected void updateDatasource(final ScheduledJobParameter jobParameter, final Runnable renewLock) throws IOException { + Datasource datasource = datasourceDao.getDatasource(jobParameter.getName()); + /** + * If delete request comes while update task is waiting on a queue for other update tasks to complete, + * because update task for this datasource didn't acquire a lock yet, delete request is processed. + * When it is this datasource's turn to run, it will find that the datasource is deleted already. + * Therefore, we stop the update process when data source does not exist. + */ + if (datasource == null) { + log.info("Datasource[{}] does not exist", jobParameter.getName()); + return; + } + + if (DatasourceState.AVAILABLE.equals(datasource.getState()) == false) { + log.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.AVAILABLE, datasource.getState()); + datasource.disable(); + datasource.getUpdateStats().setLastFailedAt(Instant.now()); + datasourceDao.updateDatasource(datasource); + return; + } + try { + datasourceUpdateService.deleteUnusedIndices(datasource); + if (DatasourceTask.DELETE_UNUSED_INDICES.equals(datasource.getTask()) == false) { + datasourceUpdateService.updateOrCreateThreatIntelFeedData(datasource, renewLock); + } + datasourceUpdateService.deleteUnusedIndices(datasource); + } catch (Exception e) { + log.error("Failed to update datasource for {}", datasource.getName(), e); + datasource.getUpdateStats().setLastFailedAt(Instant.now()); + datasourceDao.updateDatasource(datasource); + } finally { //post processing + datasourceUpdateService.updateDatasource(datasource, datasource.getSchedule(), DatasourceTask.ALL); + } + } + +} \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceTask.java b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceTask.java new file mode 100644 index 000000000..b0e9ac184 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceTask.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.jobscheduler; + +/** + * Task that {@link DatasourceRunner} will run + */ +public enum DatasourceTask { + /** + * Do everything + */ + ALL, + + /** + * Only delete unused indices + */ + DELETE_UNUSED_INDICES +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceUpdateService.java new file mode 100644 index 000000000..5a24c5a84 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceUpdateService.java @@ -0,0 +1,296 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.securityanalytics.threatIntel.jobscheduler; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.net.URL; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.opensearch.OpenSearchException; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; + +import org.opensearch.core.rest.RestStatus; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.securityanalytics.model.DetectorTrigger; +import org.opensearch.securityanalytics.threatIntel.common.DatasourceManifest; +import org.opensearch.securityanalytics.threatIntel.dao.DatasourceDao; +import org.opensearch.securityanalytics.threatIntel.ThreatIntelFeedDataService; +import org.opensearch.securityanalytics.threatIntel.common.DatasourceState; +import org.opensearch.securityanalytics.util.SecurityAnalyticsException; + +public class DatasourceUpdateService { + private static final Logger log = LogManager.getLogger(DetectorTrigger.class); + + private static final int SLEEP_TIME_IN_MILLIS = 5000; // 5 seconds + private static final int MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS = 10 * 60 * 60 * 1000; // 10 hours + private final ClusterService clusterService; + private final ClusterSettings clusterSettings; + private final DatasourceDao datasourceDao; + private final ThreatIntelFeedDataService threatIntelFeedDataService; + + public DatasourceUpdateService( + final ClusterService clusterService, + final DatasourceDao datasourceDao, + final ThreatIntelFeedDataService threatIntelFeedDataService + ) { + this.clusterService = clusterService; + this.clusterSettings = clusterService.getClusterSettings(); + this.datasourceDao = datasourceDao; + this.threatIntelFeedDataService = threatIntelFeedDataService; + } + + /** + * Update threat intel feed data + * + * The first column is ip range field regardless its header name. + * Therefore, we don't store the first column's header name. + * + * @param datasource the datasource + * @param renewLock runnable to renew lock + * + * @throws IOException + */ + public void updateOrCreateThreatIntelFeedData(final Datasource datasource, final Runnable renewLock) throws IOException { + URL url = new URL(datasource.getEndpoint()); + DatasourceManifest manifest = DatasourceManifest.Builder.build(url); + + if (shouldUpdate(datasource, manifest) == false) { + log.info("Skipping threat intel feed database update. Update is not required for {}", datasource.getName()); + datasource.getUpdateStats().setLastSkippedAt(Instant.now()); + datasourceDao.updateDatasource(datasource); + return; + } + + Instant startTime = Instant.now(); + String indexName = setupIndex(datasource); + String[] header; + List fieldsToStore; + try (CSVParser reader = threatIntelFeedDataService.getDatabaseReader(manifest)) { + CSVRecord headerLine = reader.iterator().next(); + header = validateHeader(headerLine).values(); + fieldsToStore = Arrays.asList(header).subList(1, header.length); + if (datasource.isCompatible(fieldsToStore) == false) { + log.error("Exception: new fields does not contain all old fields"); + throw new OpenSearchException( + "new fields [{}] does not contain all old fields [{}]", + fieldsToStore.toString(), + datasource.getDatabase().getFields().toString() + ); + } + threatIntelFeedDataService.saveThreatIntelFeedData(indexName, header, reader.iterator(), renewLock); + } + + waitUntilAllShardsStarted(indexName, MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS); + Instant endTime = Instant.now(); + updateDatasourceAsSucceeded(indexName, datasource, manifest, fieldsToStore, startTime, endTime); // then I update the datasource + } + + + /** + * We wait until all shards are ready to serve search requests before updating datasource metadata to + * point to a new index so that there won't be latency degradation during threat intel feed data update + * + * @param indexName the indexName + */ + protected void waitUntilAllShardsStarted(final String indexName, final int timeout) { + Instant start = Instant.now(); + try { + while (Instant.now().toEpochMilli() - start.toEpochMilli() < timeout) { + if (clusterService.state().routingTable().allShards(indexName).stream().allMatch(shard -> shard.started())) { + return; + } + Thread.sleep(SLEEP_TIME_IN_MILLIS); + } + throw new OpenSearchException( + "index[{}] replication did not complete after {} millis", + MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS + ); + } catch (InterruptedException e) { + log.error("runtime exception", e); + throw new SecurityAnalyticsException("Runtime exception", RestStatus.INTERNAL_SERVER_ERROR, e); //TODO + } + } + + /** + * Return header fields of threat intel feed data with given url of a manifest file + * + * The first column is ip range field regardless its header name. + * Therefore, we don't store the first column's header name. + * + * @param manifestUrl the url of a manifest file + * @return header fields of threat intel feed + */ + public List getHeaderFields(String manifestUrl) throws IOException { + URL url = new URL(manifestUrl); + DatasourceManifest manifest = DatasourceManifest.Builder.build(url); + + try (CSVParser reader = threatIntelFeedDataService.getDatabaseReader(manifest)) { + String[] fields = reader.iterator().next().values(); + return Arrays.asList(fields).subList(1, fields.length); + } + } + + /** + * Delete all indices except the one which are being used + * + * @param datasource + */ + public void deleteUnusedIndices(final Datasource datasource) { + try { + List indicesToDelete = datasource.getIndices() + .stream() + .filter(index -> index.equals(datasource.currentIndexName()) == false) + .collect(Collectors.toList()); + + List deletedIndices = deleteIndices(indicesToDelete); + + if (deletedIndices.isEmpty() == false) { + datasource.getIndices().removeAll(deletedIndices); + datasourceDao.updateDatasource(datasource); + } + } catch (Exception e) { + log.error("Failed to delete old indices for {}", datasource.getName(), e); + } + } + + /** + * Update datasource with given systemSchedule and task + * + * @param datasource datasource to update + * @param systemSchedule new system schedule value + * @param task new task value + */ + public void updateDatasource(final Datasource datasource, final IntervalSchedule systemSchedule, final DatasourceTask task) { + boolean updated = false; + if (datasource.getSchedule().equals(systemSchedule) == false) { + datasource.setSchedule(systemSchedule); + updated = true; + } + + if (datasource.getTask().equals(task) == false) { + datasource.setTask(task); + updated = true; + } + + if (updated) { + datasourceDao.updateDatasource(datasource); + } + } + + private List deleteIndices(final List indicesToDelete) { + List deletedIndices = new ArrayList<>(indicesToDelete.size()); + for (String index : indicesToDelete) { + if (clusterService.state().metadata().hasIndex(index) == false) { + deletedIndices.add(index); + continue; + } + + try { + threatIntelFeedDataService.deleteThreatIntelDataIndex(index); + deletedIndices.add(index); + } catch (Exception e) { + log.error("Failed to delete an index [{}]", index, e); + } + } + return deletedIndices; + } + + /** + * Validate header + * + * 1. header should not be null + * 2. the number of values in header should be more than one + * + * @param header the header + * @return CSVRecord the input header + */ + private CSVRecord validateHeader(CSVRecord header) { + if (header == null) { + throw new OpenSearchException("threat intel feed database is empty"); + } + if (header.values().length < 2) { + throw new OpenSearchException("threat intel feed database should have at least two fields"); + } + return header; + } + + /*** + * Update datasource as succeeded + * + * @param manifest the manifest + * @param datasource the datasource + */ + private void updateDatasourceAsSucceeded( + final String newIndexName, + final Datasource datasource, + final DatasourceManifest manifest, + final List fields, + final Instant startTime, + final Instant endTime + ) { + datasource.setCurrentIndex(newIndexName); + datasource.setDatabase(manifest, fields); + datasource.getUpdateStats().setLastSucceededAt(endTime); + datasource.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli()); + datasource.enable(); + datasource.setState(DatasourceState.AVAILABLE); + datasourceDao.updateDatasource(datasource); + log.info( + "threat intel feed database creation succeeded for {} and took {} seconds", + datasource.getName(), + Duration.between(startTime, endTime) + ); + } + + /*** + * Setup index to add a new threat intel feed data + * + * @param datasource the datasource + * @return new index name + */ + private String setupIndex(final Datasource datasource) { + String indexName = datasource.newIndexName(UUID.randomUUID().toString()); + datasource.getIndices().add(indexName); + datasourceDao.updateDatasource(datasource); + threatIntelFeedDataService.createIndexIfNotExists(indexName); + return indexName; + } + + /** + * Determine if update is needed or not + * + * Update is needed when all following conditions are met + * 1. updatedAt value in datasource is equal or before updateAt value in manifest + * 2. SHA256 hash value in datasource is different with SHA256 hash value in manifest + * + * @param datasource + * @param manifest + * @return + */ + private boolean shouldUpdate(final Datasource datasource, final DatasourceManifest manifest) { + if (datasource.getDatabase().getUpdatedAt() != null + && datasource.getDatabase().getUpdatedAt().toEpochMilli() > manifest.getUpdatedAt()) { + return false; + } + +// if (manifest.getSha256Hash().equals(datasource.getDatabase().getSha256Hash())) { +// return false; +// } + return true; + } +} diff --git a/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension b/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension new file mode 100644 index 000000000..0ffeb24aa --- /dev/null +++ b/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension @@ -0,0 +1 @@ +org.opensearch.securityanalytics.SecurityAnalyticsPlugin \ No newline at end of file diff --git a/src/test/java/org/opensearch/securityanalytics/TestHelpers.java b/src/test/java/org/opensearch/securityanalytics/TestHelpers.java index 44f5d39ae..a3e73e96f 100644 --- a/src/test/java/org/opensearch/securityanalytics/TestHelpers.java +++ b/src/test/java/org/opensearch/securityanalytics/TestHelpers.java @@ -172,7 +172,7 @@ public static CustomLogType randomCustomLogType(String name, String description, public static ThreatIntelFeedData randomThreatIntelFeedData() { return new ThreatIntelFeedData( "IP_ADDRESS", - ip, + "ip", "alientVault", Instant.now() ); diff --git a/src/test/java/org/opensearch/securityanalytics/findings/FindingServiceTests.java b/src/test/java/org/opensearch/securityanalytics/findings/FindingServiceTests.java index 9e7a4d061..6551f579c 100644 --- a/src/test/java/org/opensearch/securityanalytics/findings/FindingServiceTests.java +++ b/src/test/java/org/opensearch/securityanalytics/findings/FindingServiceTests.java @@ -5,6 +5,12 @@ package org.opensearch.securityanalytics.findings; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.net.URLConnection; import java.time.Instant; import java.time.ZoneId; import java.util.ArrayDeque;