Skip to content

Commit

Permalink
Threat intel feeds job runner and unit tests (#654)
Browse files Browse the repository at this point in the history
* fix doc level query constructor (#651)

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* add mapping for indices storing threat intel feed data

* fix feed indices mapping

* add threat intel feed data dao

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* add threatIntelEnabled field in detector.

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* add threat intel feed service and searching feeds

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* ti feed data to doc level query convertor logic added

* plug threat intel feed into detector creation

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* Preliminary framework for jobscheduler and datasource (#626)


Signed-off-by: Joanne Wang <jowg@amazon.com>

* with listener and processor

Signed-off-by: Joanne Wang <jowg@amazon.com>

* removed actions

Signed-off-by: Joanne Wang <jowg@amazon.com>

* clean up

Signed-off-by: Joanne Wang <jowg@amazon.com>

* added parser

Signed-off-by: Joanne Wang <jowg@amazon.com>

* add unit tests

Signed-off-by: Joanne Wang <jowg@amazon.com>

* refactored class names

Signed-off-by: Joanne Wang <jowg@amazon.com>

* before moving db

Signed-off-by: Joanne Wang <jowg@amazon.com>

* after moving db

Signed-off-by: Joanne Wang <jowg@amazon.com>

* added actions to plugin and removed user schedule

Signed-off-by: Joanne Wang <jowg@amazon.com>

* unit tests

Signed-off-by: Joanne Wang <jowg@amazon.com>

* fix build error

Signed-off-by: Joanne Wang <jowg@amazon.com>

* changed transport naming

Signed-off-by: Joanne Wang <jowg@amazon.com>

---------

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
Signed-off-by: Joanne Wang <jowg@amazon.com>
Co-authored-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
jowg-amazon and eirsep authored Oct 10, 2023
1 parent cfd1bf0 commit 55d332d
Show file tree
Hide file tree
Showing 62 changed files with 4,341 additions and 2,511 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ dependencies {
implementation group: 'org.apache.commons', name: 'commons-lang3', version: "${versions.commonslang}"
implementation "org.antlr:antlr4-runtime:4.10.1"
implementation "com.cronutils:cron-utils:9.1.6"
api files("/Users/snistala/Documents/opensearch/common-utils/build/libs/common-utils-3.0.0.0-SNAPSHOT.jar")
api "org.opensearch:common-utils:${common_utils_version}@jar"
api "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@
*/
package org.opensearch.securityanalytics;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -35,12 +31,8 @@
import org.opensearch.index.codec.CodecServiceFactory;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.plugins.*;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
Expand All @@ -59,6 +51,12 @@
import org.opensearch.securityanalytics.resthandler.*;
import org.opensearch.securityanalytics.threatIntel.DetectorThreatIntelService;
import org.opensearch.securityanalytics.threatIntel.ThreatIntelFeedDataService;
import org.opensearch.securityanalytics.threatIntel.action.*;
import org.opensearch.securityanalytics.threatIntel.common.TIFExecutor;
import org.opensearch.securityanalytics.threatIntel.common.TIFLockService;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameterService;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobRunner;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobUpdateService;
import org.opensearch.securityanalytics.transport.*;
import org.opensearch.securityanalytics.model.Rule;
import org.opensearch.securityanalytics.model.Detector;
Expand All @@ -70,10 +68,13 @@
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.RuleIndices;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, MapperPlugin, SearchPlugin, EnginePlugin, ClusterPlugin {
import static org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameter.THREAT_INTEL_DATA_INDEX_NAME_PREFIX;

public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, MapperPlugin, SearchPlugin, EnginePlugin, ClusterPlugin, SystemIndexPlugin {

private static final Logger log = LogManager.getLogger(SecurityAnalyticsPlugin.class);

Expand Down Expand Up @@ -114,6 +115,18 @@ public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, Map

private Client client;

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings){
return List.of(new SystemIndexDescriptor(THREAT_INTEL_DATA_INDEX_NAME_PREFIX, "System index used for threat intel data"));
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
executorBuilders.add(TIFExecutor.executorBuilder(settings));
return executorBuilders;
}

@Override
public Collection<Object> createComponents(Client client,
ClusterService clusterService,
Expand All @@ -137,13 +150,21 @@ public Collection<Object> createComponents(Client client,
mapperService = new MapperService(client, clusterService, indexNameExpressionResolver, indexTemplateManager, logTypeService);
ruleIndices = new RuleIndices(logTypeService, client, clusterService, threadPool);
correlationRuleIndices = new CorrelationRuleIndices(client, clusterService);
ThreatIntelFeedDataService threatIntelFeedDataService = new ThreatIntelFeedDataService(clusterService, client, indexNameExpressionResolver, xContentRegistry);
ThreatIntelFeedDataService threatIntelFeedDataService = new ThreatIntelFeedDataService(clusterService.state(), clusterService, client, indexNameExpressionResolver, xContentRegistry);
DetectorThreatIntelService detectorThreatIntelService = new DetectorThreatIntelService(threatIntelFeedDataService);
TIFJobParameterService tifJobParameterService = new TIFJobParameterService(client, clusterService);
TIFJobUpdateService tifJobUpdateService = new TIFJobUpdateService(clusterService, tifJobParameterService, threatIntelFeedDataService);
TIFExecutor threatIntelExecutor = new TIFExecutor(threadPool);
TIFLockService threatIntelLockService = new TIFLockService(clusterService, client);

this.client = client;

TIFJobRunner.getJobRunnerInstance().initialize(clusterService,tifJobUpdateService, tifJobParameterService, threatIntelExecutor, threatIntelLockService, threadPool);

return List.of(
detectorIndices, correlationIndices, correlationRuleIndices, ruleTopicIndices, customLogTypeIndices, ruleIndices,
mapperService, indexTemplateManager, builtinLogTypeLoader, threatIntelFeedDataService, detectorThreatIntelService
mapperService, indexTemplateManager, builtinLogTypeLoader, threatIntelFeedDataService, detectorThreatIntelService,
tifJobUpdateService, tifJobParameterService, threatIntelExecutor, threatIntelLockService
);
}

Expand Down Expand Up @@ -245,7 +266,10 @@ public List<Setting<?>> getSettings() {
SecurityAnalyticsSettings.IS_CORRELATION_INDEX_SETTING,
SecurityAnalyticsSettings.CORRELATION_TIME_WINDOW,
SecurityAnalyticsSettings.DEFAULT_MAPPING_SCHEMA,
SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE
SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE,
SecurityAnalyticsSettings.TIFJOB_UPDATE_INTERVAL,
SecurityAnalyticsSettings.BATCH_SIZE,
SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT
);
}

Expand Down Expand Up @@ -276,8 +300,14 @@ public List<Setting<?>> getSettings() {
new ActionPlugin.ActionHandler<>(SearchCorrelationRuleAction.INSTANCE, TransportSearchCorrelationRuleAction.class),
new ActionHandler<>(IndexCustomLogTypeAction.INSTANCE, TransportIndexCustomLogTypeAction.class),
new ActionHandler<>(SearchCustomLogTypeAction.INSTANCE, TransportSearchCustomLogTypeAction.class),
new ActionHandler<>(DeleteCustomLogTypeAction.INSTANCE, TransportDeleteCustomLogTypeAction.class)
);
new ActionHandler<>(DeleteCustomLogTypeAction.INSTANCE, TransportDeleteCustomLogTypeAction.class),

new ActionHandler<>(PutTIFJobAction.INSTANCE, TransportPutTIFJobAction.class),
new ActionHandler<>(GetTIFJobAction.INSTANCE, TransportGetTIFJobAction.class),
new ActionHandler<>(UpdateTIFJobAction.INSTANCE, TransportUpdateTIFJobAction.class),
new ActionHandler<>(DeleteTIFJobAction.INSTANCE, TransportDeleteTIFJobAction.class)

);
}

@Override
Expand All @@ -294,5 +324,5 @@ public void onFailure(Exception e) {
log.warn("Failed to initialize LogType config index and builtin log types");
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.securityanalytics.sampleextension;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

/**
* Sample JobScheduler extension plugin.
*
* It use ".scheduler_sample_extension" index to manage its scheduled jobs, and exposes a REST API
* endpoint using {@link SampleExtensionRestHandler}.
*
*/
public class SampleExtensionPlugin extends Plugin implements ActionPlugin, JobSchedulerExtension {
private static final Logger log = LogManager.getLogger(SampleExtensionPlugin.class);

static final String JOB_INDEX_NAME = ".scheduler_sample_extension";

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
SampleJobRunner jobRunner = SampleJobRunner.getJobRunnerInstance();
jobRunner.setClusterService(clusterService);
jobRunner.setThreadPool(threadPool);
jobRunner.setClient(client);

return Collections.emptyList();
}

@Override
public String getJobType() {
return "scheduler_sample_extension";
}

@Override
public String getJobIndex() {
return JOB_INDEX_NAME;
}

@Override
public ScheduledJobRunner getJobRunner() {
return SampleJobRunner.getJobRunnerInstance();
}

@Override
public ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
SampleJobParameter jobParameter = new SampleJobParameter();
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case SampleJobParameter.NAME_FIELD:
jobParameter.setJobName(parser.text());
break;
case SampleJobParameter.ENABLED_FILED:
jobParameter.setEnabled(parser.booleanValue());
break;
case SampleJobParameter.ENABLED_TIME_FILED:
jobParameter.setEnabledTime(parseInstantValue(parser));
break;
case SampleJobParameter.LAST_UPDATE_TIME_FIELD:
jobParameter.setLastUpdateTime(parseInstantValue(parser));
break;
case SampleJobParameter.SCHEDULE_FIELD:
jobParameter.setSchedule(ScheduleParser.parse(parser));
break;
case SampleJobParameter.INDEX_NAME_FIELD:
jobParameter.setIndexToWatch(parser.text());
break;
case SampleJobParameter.LOCK_DURATION_SECONDS:
jobParameter.setLockDurationSeconds(parser.longValue());
break;
case SampleJobParameter.JITTER:
jobParameter.setJitter(parser.doubleValue());
break;
default:
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
}
}
return jobParameter;
};
}

private Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return Collections.singletonList(new SampleExtensionRestHandler());
}
}
Loading

0 comments on commit 55d332d

Please sign in to comment.