Skip to content

Commit

Permalink
make threat intel async (#703)
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
(cherry picked from commit 0dd9787)
  • Loading branch information
sbcd90 authored and github-actions[bot] committed Oct 31, 2023
1 parent eccf388 commit fe1d37c
Show file tree
Hide file tree
Showing 12 changed files with 534 additions and 304 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ private static String constructId(Detector detector, String iocType) {

/** Updates all detectors having threat intel detection enabled with the latest threat intel feed data*/
public void updateDetectorsWithLatestThreatIntelRules() {
try {
QueryBuilder queryBuilder =
QueryBuilders.nestedQuery("detector",
QueryBuilders.boolQuery().must(
Expand All @@ -168,7 +167,6 @@ public void updateDetectorsWithLatestThreatIntelRules() {
SearchSourceBuilder ssb = searchRequest.source();
ssb.query(queryBuilder);
ssb.size(9999);
CountDownLatch countDownLatch = new CountDownLatch(1);
client.execute(SearchDetectorAction.INSTANCE, new SearchDetectorRequest(searchRequest),
ActionListener.wrap(searchResponse -> {
List<Detector> detectors = getDetectors(searchResponse, xContentRegistry);
Expand All @@ -181,22 +179,15 @@ public void updateDetectorsWithLatestThreatIntelRules() {
ActionListener.wrap(
indexDetectorResponse -> {
log.debug("updated {} with latest threat intel info", indexDetectorResponse.getDetector().getId());
countDownLatch.countDown();
},
e -> {
log.error(() -> new ParameterizedMessage("Failed to update detector {} with latest threat intel info", detector.getId()), e);
countDownLatch.countDown();
}));
}
);
}, e -> {
log.error("Failed to fetch detectors to update with threat intel queries.", e);
countDownLatch.countDown();
}));
countDownLatch.await(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
log.error("");
}


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand All @@ -33,6 +36,7 @@
import org.opensearch.securityanalytics.model.ThreatIntelFeedData;
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobAction;
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest;
import org.opensearch.securityanalytics.threatIntel.action.ThreatIntelIndicesResponse;
import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata;
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
Expand All @@ -47,6 +51,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -102,17 +107,17 @@ public void getThreatIntelFeedData(

String tifdIndex = getLatestIndexByCreationDate();
if (tifdIndex == null) {
createThreatIntelFeedData();
tifdIndex = getLatestIndexByCreationDate();
createThreatIntelFeedData(listener);

Check warning on line 110 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L110

Added line #L110 was not covered by tests
} else {
SearchRequest searchRequest = new SearchRequest(tifdIndex);
searchRequest.source().size(9999); //TODO: convert to scroll
String finalTifdIndex = tifdIndex;
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> {
log.error(String.format(

Check warning on line 116 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L112-L116

Added lines #L112 - L116 were not covered by tests
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e);
listener.onFailure(e);
}));

Check warning on line 119 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L118-L119

Added lines #L118 - L119 were not covered by tests
}
SearchRequest searchRequest = new SearchRequest(tifdIndex);
searchRequest.source().size(9999); //TODO: convert to scroll
String finalTifdIndex = tifdIndex;
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> {
log.error(String.format(
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e);
listener.onFailure(e);
}));
} catch (InterruptedException e) {
log.error("Failed to get threat intel feed data", e);
listener.onFailure(e);
Expand All @@ -136,15 +141,30 @@ private String getLatestIndexByCreationDate() {
*
* @param indexName index name
*/
public void createIndexIfNotExists(final String indexName) {
public void createIndexIfNotExists(final String indexName, final ActionListener<CreateIndexResponse> listener) {
if (clusterService.state().metadata().hasIndex(indexName) == true) {
listener.onResponse(new CreateIndexResponse(true, true, indexName));

Check warning on line 146 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L146

Added line #L146 was not covered by tests
return;
}
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(INDEX_SETTING_TO_CREATE)
.mapping(getIndexMapping());
.mapping(getIndexMapping()).timeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT));

Check warning on line 150 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L150

Added line #L150 was not covered by tests
StashedThreadContext.run(
client,
() -> client.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT))
() -> client.admin().indices().create(createIndexRequest, new ActionListener<>() {

Check warning on line 153 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L153

Added line #L153 was not covered by tests
@Override
public void onResponse(CreateIndexResponse response) {
if (response.isAcknowledged()) {
listener.onResponse(response);

Check warning on line 157 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L157

Added line #L157 was not covered by tests
} else {
onFailure(new OpenSearchStatusException("Threat intel feed index creation failed", RestStatus.INTERNAL_SERVER_ERROR));

Check warning on line 159 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L159

Added line #L159 was not covered by tests
}
}

Check warning on line 161 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L161

Added line #L161 was not covered by tests

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

Check warning on line 166 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L165-L166

Added lines #L165 - L166 were not covered by tests
})
);
}

Expand All @@ -159,16 +179,20 @@ public void parseAndSaveThreatIntelFeedDataCSV(
final String indexName,
final Iterator<CSVRecord> iterator,
final Runnable renewLock,
final TIFMetadata tifMetadata
final TIFMetadata tifMetadata,
final ActionListener<ThreatIntelIndicesResponse> listener
) throws IOException {
if (indexName == null || iterator == null || renewLock == null) {
throw new IllegalArgumentException("Parameters cannot be null, failed to save threat intel feed data");
}

TimeValue timeout = clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT);
Integer batchSize = clusterSettings.get(SecurityAnalyticsSettings.BATCH_SIZE);
final BulkRequest bulkRequest = new BulkRequest();

List<BulkRequest> bulkRequestList = new ArrayList<>();
BulkRequest bulkRequest = new BulkRequest();

Check warning on line 193 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L192-L193

Added lines #L192 - L193 were not covered by tests
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

List<ThreatIntelFeedData> tifdList = new ArrayList<>();
while (iterator.hasNext()) {
CSVRecord record = iterator.next();
Expand All @@ -192,10 +216,39 @@ public void parseAndSaveThreatIntelFeedDataCSV(
bulkRequest.add(indexRequest);

if (bulkRequest.requests().size() == batchSize) {
saveTifds(bulkRequest, timeout);
bulkRequestList.add(bulkRequest);
bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

Check warning on line 221 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L219-L221

Added lines #L219 - L221 were not covered by tests
}
}
bulkRequestList.add(bulkRequest);

Check warning on line 224 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L223-L224

Added lines #L223 - L224 were not covered by tests

GroupedActionListener<BulkResponse> bulkResponseListener = new GroupedActionListener<>(new ActionListener<>() {

Check warning on line 226 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L226

Added line #L226 was not covered by tests
@Override
public void onResponse(Collection<BulkResponse> bulkResponses) {
int idx = 0;

Check warning on line 229 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L229

Added line #L229 was not covered by tests
for (BulkResponse response: bulkResponses) {
BulkRequest request = bulkRequestList.get(idx);

Check warning on line 231 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L231

Added line #L231 was not covered by tests
if (response.hasFailures()) {
throw new OpenSearchException(

Check warning on line 233 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L233

Added line #L233 was not covered by tests
"error occurred while ingesting threat intel feed data in {} with an error {}",
StringUtils.join(request.getIndices()),
response.buildFailureMessage()

Check warning on line 236 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L235-L236

Added lines #L235 - L236 were not covered by tests
);
}
}
listener.onResponse(new ThreatIntelIndicesResponse(true, List.of(indexName)));
}

Check warning on line 241 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L239-L241

Added lines #L239 - L241 were not covered by tests

@Override
public void onFailure(Exception e) {
listener.onFailure(e);

Check warning on line 245 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L245

Added line #L245 was not covered by tests
}
}, bulkRequestList.size());

Check warning on line 247 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L247

Added line #L247 was not covered by tests

for (int i = 0; i < bulkRequestList.size(); ++i) {
saveTifds(bulkRequestList.get(i), timeout, bulkResponseListener);

Check warning on line 250 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L250

Added line #L250 was not covered by tests
}
saveTifds(bulkRequest, timeout);
renewLock.run();
}

Expand All @@ -206,19 +259,9 @@ public static boolean isValidIp(String ip) {
return matcher.matches();
}

public void saveTifds(BulkRequest bulkRequest, TimeValue timeout) {
public void saveTifds(BulkRequest bulkRequest, TimeValue timeout, ActionListener<BulkResponse> listener) {
try {
BulkResponse response = StashedThreadContext.run(client, () -> {
return client.bulk(bulkRequest).actionGet(timeout);
});
if (response.hasFailures()) {
throw new OpenSearchException(
"error occurred while ingesting threat intel feed data in {} with an error {}",
StringUtils.join(bulkRequest.getIndices()),
response.buildFailureMessage()
);
}
bulkRequest.requests().clear();
StashedThreadContext.run(client, () -> client.bulk(bulkRequest, listener));

Check warning on line 264 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L264

Added line #L264 was not covered by tests
} catch (OpenSearchException e) {
log.error("failed to save threat intel feed data", e);
}
Expand All @@ -241,31 +284,49 @@ public void deleteThreatIntelDataIndex(final List<String> indices) {
);
}

AcknowledgedResponse response = StashedThreadContext.run(
StashedThreadContext.run(

Check warning on line 287 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L287

Added line #L287 was not covered by tests
client,
() -> client.admin()
.indices()
.prepareDelete(indices.toArray(new String[0]))
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.execute()
.actionGet(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT))
);
.setTimeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT))
.execute(new ActionListener<>() {

Check warning on line 294 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L293-L294

Added lines #L293 - L294 were not covered by tests
@Override
public void onResponse(AcknowledgedResponse response) {
if (response.isAcknowledged() == false) {
onFailure(new OpenSearchException("failed to delete data[{}]", String.join(",", indices)));

Check warning on line 298 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L298

Added line #L298 was not covered by tests
}
}

Check warning on line 300 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L300

Added line #L300 was not covered by tests

if (response.isAcknowledged() == false) {
throw new OpenSearchException("failed to delete data[{}]", String.join(",", indices));
}
@Override
public void onFailure(Exception e) {
log.error("unknown exception:", e);
}

Check warning on line 305 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L304-L305

Added lines #L304 - L305 were not covered by tests
})
);
}

private void createThreatIntelFeedData() throws InterruptedException {
private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
client.execute(
PutTIFJobAction.INSTANCE,
new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL)),
new ActionListener<AcknowledgedResponse>() {
new ActionListener<>() {

Check warning on line 315 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L315

Added line #L315 was not covered by tests
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
log.debug("Acknowledged threat intel feed updater job created");
countDownLatch.countDown();
String tifdIndex = getLatestIndexByCreationDate();

Check warning on line 320 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L320

Added line #L320 was not covered by tests

SearchRequest searchRequest = new SearchRequest(tifdIndex);
searchRequest.source().size(9999); //TODO: convert to scroll
String finalTifdIndex = tifdIndex;
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> {
log.error(String.format(

Check warning on line 326 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L322-L326

Added lines #L322 - L326 were not covered by tests
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e);
listener.onFailure(e);
}));

Check warning on line 329 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L328-L329

Added lines #L328 - L329 were not covered by tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.threatIntel.action;

import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.List;

public class ThreatIntelIndicesResponse extends ActionResponse {

private Boolean isAcknowledged;

private List<String> indices;

public ThreatIntelIndicesResponse(Boolean isAcknowledged, List<String> indices) {
super();
this.isAcknowledged = isAcknowledged;
this.indices = indices;
}

Check warning on line 24 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java#L21-L24

Added lines #L21 - L24 were not covered by tests

public ThreatIntelIndicesResponse(StreamInput sin) throws IOException {
this(sin.readBoolean(), sin.readStringList());
}

Check warning on line 28 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java#L27-L28

Added lines #L27 - L28 were not covered by tests

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isAcknowledged);
out.writeStringCollection(indices);
}

Check warning on line 34 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java#L32-L34

Added lines #L32 - L34 were not covered by tests

public Boolean isAcknowledged() {
return isAcknowledged;

Check warning on line 37 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java#L37

Added line #L37 was not covered by tests
}

public List<String> getIndices() {
return indices;

Check warning on line 41 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java#L41

Added line #L41 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.StepListener;
import org.opensearch.action.index.IndexResponse;
Expand Down Expand Up @@ -127,12 +128,22 @@ protected ActionListener<IndexResponse> postIndexingTifJobParameter(
@Override
public void onResponse(final IndexResponse indexResponse) {
AtomicReference<LockModel> lockReference = new AtomicReference<>(lock);
try {
createThreatIntelFeedData(tifJobParameter, lockService.getRenewLockRunnable(lockReference));
} finally {
lockService.releaseLock(lockReference.get());
}
listener.onResponse(new AcknowledgedResponse(true));
createThreatIntelFeedData(tifJobParameter, lockService.getRenewLockRunnable(lockReference), new ActionListener<>() {

Check warning on line 131 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java#L131

Added line #L131 was not covered by tests
@Override
public void onResponse(ThreatIntelIndicesResponse threatIntelIndicesResponse) {
if (threatIntelIndicesResponse.isAcknowledged()) {
lockService.releaseLock(lockReference.get());
listener.onResponse(new AcknowledgedResponse(true));

Check warning on line 136 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java#L135-L136

Added lines #L135 - L136 were not covered by tests
} else {
onFailure(new OpenSearchStatusException("creation of threat intel feed data failed", RestStatus.INTERNAL_SERVER_ERROR));

Check warning on line 138 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java#L138

Added line #L138 was not covered by tests
}
}

Check warning on line 140 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java#L140

Added line #L140 was not covered by tests

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

Check warning on line 145 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java#L144-L145

Added lines #L144 - L145 were not covered by tests
});
}

@Override
Expand All @@ -149,26 +160,26 @@ public void onFailure(final Exception e) {
};
}

protected void createThreatIntelFeedData(final TIFJobParameter tifJobParameter, final Runnable renewLock) {
protected void createThreatIntelFeedData(final TIFJobParameter tifJobParameter, final Runnable renewLock, final ActionListener<ThreatIntelIndicesResponse> listener) {
if (TIFJobState.CREATING.equals(tifJobParameter.getState()) == false) {
log.error("Invalid tifJobParameter state. Expecting {} but received {}", TIFJobState.CREATING, tifJobParameter.getState());
markTIFJobAsCreateFailed(tifJobParameter);
markTIFJobAsCreateFailed(tifJobParameter, listener);

Check warning on line 166 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java#L166

Added line #L166 was not covered by tests
return;
}

try {
tifJobUpdateService.createThreatIntelFeedData(tifJobParameter, renewLock);
tifJobUpdateService.createThreatIntelFeedData(tifJobParameter, renewLock, listener);

Check warning on line 171 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java#L171

Added line #L171 was not covered by tests
} catch (Exception e) {
log.error("Failed to create tifJobParameter for {}", tifJobParameter.getName(), e);
markTIFJobAsCreateFailed(tifJobParameter);
markTIFJobAsCreateFailed(tifJobParameter, listener);

Check warning on line 174 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java#L174

Added line #L174 was not covered by tests
}
}

private void markTIFJobAsCreateFailed(final TIFJobParameter tifJobParameter) {
private void markTIFJobAsCreateFailed(final TIFJobParameter tifJobParameter, final ActionListener<ThreatIntelIndicesResponse> listener) {
tifJobParameter.getUpdateStats().setLastFailedAt(Instant.now());
tifJobParameter.setState(TIFJobState.CREATE_FAILED);
try {
tifJobParameterService.updateJobSchedulerParameter(tifJobParameter);
tifJobParameterService.updateJobSchedulerParameter(tifJobParameter, listener);

Check warning on line 182 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java#L182

Added line #L182 was not covered by tests
} catch (Exception e) {
log.error("Failed to mark tifJobParameter state as CREATE_FAILED for {}", tifJobParameter.getName(), e);
}
Expand Down
Loading

0 comments on commit fe1d37c

Please sign in to comment.