Skip to content

Commit

Permalink
ComplianceAuditlogTest to use signal/wait (#1914)
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <petern@amazon.com>
  • Loading branch information
peternied authored Jun 27, 2022
1 parent 00e2a5d commit d507ebb
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

package org.opensearch.security.auditlog.compliance;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import com.google.common.collect.ImmutableMap;
import org.apache.http.Header;
Expand All @@ -30,26 +32,28 @@
import org.opensearch.security.auditlog.AbstractAuditlogiUnitTest;
import org.opensearch.security.auditlog.AuditTestUtils;
import org.opensearch.security.auditlog.config.AuditConfig;
import org.opensearch.security.auditlog.impl.AuditMessage;
import org.opensearch.security.auditlog.integration.TestAuditlogImpl;
import org.opensearch.security.auditlog.integration.TestAuditlogImpl.MessagesNotFoundException;
import org.opensearch.security.compliance.ComplianceConfig;
import org.opensearch.security.support.ConfigConstants;
import org.opensearch.security.test.DynamicSecurityConfig;
import org.opensearch.security.test.helper.rest.RestHelper.HttpResponse;

import static org.junit.Assert.assertFalse;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

public class ComplianceAuditlogTest extends AbstractAuditlogiUnitTest {

@Test
public void testSourceFilter() throws Exception {

Settings additionalSettings = Settings.builder()
.put("plugins.security.audit.type", TestAuditlogImpl.class.getName())
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_ENABLE_TRANSPORT, true)
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_RESOLVE_BULK_REQUESTS, true)
.put(ConfigConstants.OPENDISTRO_SECURITY_COMPLIANCE_HISTORY_EXTERNAL_CONFIG_ENABLED, false)
//.put(ConfigConstants.OPENDISTRO_SECURITY_COMPLIANCE_HISTORY_WRITE_WATCHED_INDICES, "emp")
.put(ConfigConstants.OPENDISTRO_SECURITY_COMPLIANCE_HISTORY_READ_WATCHED_FIELDS, "emp")
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_CONFIG_DISABLED_TRANSPORT_CATEGORIES, "authenticated,GRANTED_PRIVILEGES")
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_CONFIG_DISABLED_REST_CATEGORIES, "authenticated,GRANTED_PRIVILEGES")
Expand All @@ -66,7 +70,6 @@ public void testSourceFilter() throws Exception {
rh.sendAdminCertificate = sendAdminCertificate;
rh.keystore = keystore;

System.out.println("#### test source includes");
String search = "{" +
" \"_source\":[" +
" \"Gender\""+
Expand All @@ -80,13 +83,11 @@ public void testSourceFilter() throws Exception {
" }" +
"}";

TestAuditlogImpl.clear();
HttpResponse response = rh.executePostRequest("_search?pretty", search, encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
System.out.println(response.getBody());
Thread.sleep(1500);
System.out.println(TestAuditlogImpl.sb.toString());
Assert.assertTrue(TestAuditlogImpl.messages.size() >= 1);
final AuditMessage message = TestAuditlogImpl.doThenWaitForMessage(() -> {
final HttpResponse response = rh.executePostRequest("_search?pretty", search, encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
});

Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_DOC_READ"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("Designation"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("Salary"));
Expand All @@ -102,8 +103,6 @@ public void testComplianceEnable() throws Exception {

setup(additionalSettings);

final boolean sendAdminCertificate = rh.sendAdminCertificate;
final String keystore = rh.keystore;
rh.sendAdminCertificate = true;
rh.keystore = "auditlog/kirk-keystore.jks";

Expand All @@ -112,21 +111,21 @@ public void testComplianceEnable() throws Exception {
updateAuditConfig(AuditTestUtils.createAuditPayload(auditConfig));

// make an event happen
TestAuditlogImpl.clear();
rh.executePutRequest("emp/_doc/0?refresh", "{\"Designation\" : \"CEO\", \"Gender\" : \"female\", \"Salary\" : 100}");
TestAuditlogImpl.doThenWaitForMessages(() -> {
rh.executePutRequest("emp/_doc/0?refresh", "{\"Designation\" : \"CEO\", \"Gender\" : \"female\", \"Salary\" : 100}");
}, 7);
assertTrue(TestAuditlogImpl.messages.toString().contains("COMPLIANCE_DOC_WRITE"));

// disable compliance
auditConfig = new AuditConfig(true, AuditConfig.Filter.DEFAULT , ComplianceConfig.from(ImmutableMap.of("enabled", false, "write_watched_indices", Collections.singletonList("emp")), additionalSettings));
updateAuditConfig(AuditTestUtils.createAuditPayload(auditConfig));

// make an event happen
TestAuditlogImpl.clear();
rh.executePutRequest("emp/_doc/1?refresh", "{\"Designation\" : \"CEO\", \"Gender\" : \"female\", \"Salary\" : 100}");
assertFalse(TestAuditlogImpl.messages.toString().contains("COMPLIANCE_DOC_WRITE"));

rh.sendAdminCertificate = sendAdminCertificate;
rh.keystore = keystore;
// trigger an event that it not captured by the audit log
final MessagesNotFoundException ex = assertThrows(MessagesNotFoundException.class, () -> {
TestAuditlogImpl.doThenWaitForMessage(() -> {
rh.executePutRequest("emp/_doc/1?refresh", "{\"Designation\" : \"CEO\", \"Gender\" : \"female\", \"Salary\" : 100}");
});
});
assertThat(ex.getMissingCount(), equalTo(1));
}

@Test
Expand Down Expand Up @@ -154,7 +153,6 @@ public void testSourceFilterMsearch() throws Exception {
rh.sendAdminCertificate = sendAdminCertificate;
rh.keystore = keystore;

System.out.println("#### test source includes");
String search = "{}"+System.lineSeparator()
+ "{" +
" \"_source\":[" +
Expand Down Expand Up @@ -211,22 +209,23 @@ public void testInternalConfig() throws Exception {
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_CONFIG_DISABLED_REST_CATEGORIES, "authenticated,GRANTED_PRIVILEGES")
.build();

TestAuditlogImpl.clear();
setup(additionalSettings);

try (RestHighLevelClient restHighLevelClient = getRestClient(clusterInfo, "kirk-keystore.jks", "truststore.jks")) {
for(IndexRequest ir: new DynamicSecurityConfig().setSecurityRoles("roles_2.yml").getDynamicConfig(getResourceFolder())) {
restHighLevelClient.index(ir, RequestOptions.DEFAULT);
GetResponse getDocumentResponse = restHighLevelClient.get(new GetRequest(ir.index(), ir.id()), RequestOptions.DEFAULT);
Assert.assertTrue("Document not found:" + getDocumentResponse, getDocumentResponse.isExists());
final List<AuditMessage> messages = TestAuditlogImpl.doThenWaitForMessages(() -> {
try (RestHighLevelClient restHighLevelClient = getRestClient(clusterInfo, "kirk-keystore.jks", "truststore.jks")) {
for (IndexRequest ir: new DynamicSecurityConfig().setSecurityRoles("roles_2.yml").getDynamicConfig(getResourceFolder())) {
restHighLevelClient.index(ir, RequestOptions.DEFAULT);
GetResponse getDocumentResponse = restHighLevelClient.get(new GetRequest(ir.index(), ir.id()), RequestOptions.DEFAULT);
assertThat(getDocumentResponse.isExists(), equalTo(true));
}
} catch (IOException ioe) {
throw new RuntimeException("Unexpected exception", ioe);
}
}

HttpResponse response = rh.executeGetRequest("_search?pretty", encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
Thread.sleep(1500);
System.out.println(TestAuditlogImpl.sb.toString());
Assert.assertTrue(TestAuditlogImpl.messages.size() >= 15);
HttpResponse response = rh.executeGetRequest("_search?pretty", encodeBasicHeader("admin", "admin"));
assertThat(response.getStatusCode(), equalTo(HttpStatus.SC_OK));
}, 14);

Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_INTERNAL_CONFIG_READ"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_INTERNAL_CONFIG_WRITE"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("anonymous_auth_enabled"));
Expand All @@ -247,7 +246,7 @@ public void testInternalConfig() throws Exception {
@Test
public void testExternalConfig() throws Exception {

Settings additionalSettings = Settings.builder()
final Settings additionalSettings = Settings.builder()
.put("plugins.security.audit.type", TestAuditlogImpl.class.getName())
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_ENABLE_TRANSPORT, false)
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_ENABLE_REST, false)
Expand All @@ -258,23 +257,23 @@ public void testExternalConfig() throws Exception {
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_CONFIG_DISABLED_REST_CATEGORIES, "authenticated,GRANTED_PRIVILEGES")
.build();

TestAuditlogImpl.clear();

setup(additionalSettings);

try (Client tc = getClient()) {
TestAuditlogImpl.doThenWaitForMessages(() -> {
try {
setup(additionalSettings);
} catch (final Exception ex) {
throw new RuntimeException(ex);
}

for(IndexRequest ir: new DynamicSecurityConfig().setSecurityRoles("roles_2.yml").getDynamicConfig(getResourceFolder())) {
tc.index(ir).actionGet();
try (Client tc = getClient()) {
for(IndexRequest ir: new DynamicSecurityConfig().setSecurityRoles("roles_2.yml").getDynamicConfig(getResourceFolder())) {
tc.index(ir).actionGet();
}
}

}
final HttpResponse response = rh.executeGetRequest("_search?pretty", encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
}, 4);

HttpResponse response = rh.executeGetRequest("_search?pretty", encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
System.out.println(response.getBody());
Thread.sleep(1500);
System.out.println(TestAuditlogImpl.sb.toString());
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("external_configuration"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_EXTERNAL_CONFIG"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("opensearch_yml"));
Expand Down Expand Up @@ -306,73 +305,29 @@ public void testUpdate() throws Exception {
.actionGet();
}

TestAuditlogImpl.clear();

String body = "{\"doc\": {\"Age\":123}}";

HttpResponse response = rh.executePostRequest("humanresources/_doc/100?pretty", body, encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_CREATED, response.getStatusCode());

body = "{\"doc\": {\"Age\":456}}";
final MessagesNotFoundException ex1 = assertThrows(MessagesNotFoundException.class, () -> {
TestAuditlogImpl.doThenWaitForMessage(() -> {
final String body = "{\"doc\": {\"Age\":123}}";
final HttpResponse response = rh.executePostRequest("humanresources/_doc/100?pretty", body, encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_CREATED, response.getStatusCode());
});
});
assertThat(ex1.getMissingCount(), equalTo(1));


final MessagesNotFoundException ex2 = assertThrows(MessagesNotFoundException.class, () -> {
TestAuditlogImpl.doThenWaitForMessage(() -> {
final String body = "{\"doc\": {\"Age\":456}}";
final HttpResponse response = rh.executePostRequest("humanresources/_update/100?pretty", body, encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
});
});
assertThat(ex2.getMissingCount(), equalTo(1));

response = rh.executePostRequest("humanresources/_update/100?pretty", body, encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
System.out.println(response.getBody());
Thread.sleep(1500);
Assert.assertTrue(TestAuditlogImpl.messages.isEmpty());
Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));
}

@Test
public void testUpdatePerf() throws Exception {

Settings additionalSettings = Settings.builder()
.put("plugins.security.audit.type", TestAuditlogImpl.class.getName())
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_ENABLE_TRANSPORT, false)
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_ENABLE_REST, false)
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_RESOLVE_BULK_REQUESTS, true)
.put(ConfigConstants.OPENDISTRO_SECURITY_COMPLIANCE_HISTORY_EXTERNAL_CONFIG_ENABLED, false)
.put(ConfigConstants.SECURITY_COMPLIANCE_HISTORY_INTERNAL_CONFIG_ENABLED, false)
.put(ConfigConstants.OPENDISTRO_SECURITY_COMPLIANCE_HISTORY_WRITE_WATCHED_INDICES, "humanresources")
.put(ConfigConstants.OPENDISTRO_SECURITY_COMPLIANCE_HISTORY_READ_WATCHED_FIELDS, "humanresources,*")
.build();

setup(additionalSettings);
TestAuditlogImpl.clear();

/*try (TransportClient tc = getInternalTransportClient()) {
for(int i=0; i<5000; i++) {
tc.prepareIndex("humanresources", "employees")
//.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.setSource("Age", 456+i)
.execute();
}
}*/



for(int i=0; i<1; i++) {
HttpResponse response = rh.executePostRequest("humanresources/_doc/"+i+"", "{\"customer\": {\"Age\":"+i+"}}", encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_CREATED, response.getStatusCode());
response = rh.executePostRequest("humanresources/_doc/"+i+"", "{\"customer\": {\"Age\":"+(i+2)+"}}", encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
response = rh.executePostRequest("humanresources/_update/"+i+"?pretty", "{\"doc\": {\"doesel\":"+(i+3)+"}}", encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
}

/*Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
System.out.println(response.getBody());
Thread.sleep(1500);
Assert.assertTrue(TestAuditlogImpl.messages.isEmpty());
Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));*/

Thread.sleep(1500);
System.out.println("Messages: "+TestAuditlogImpl.messages.size());
//System.out.println(TestAuditlogImpl.sb.toString());

}

@Test
public void testWriteHistory() throws Exception {

Expand All @@ -387,7 +342,6 @@ public void testWriteHistory() throws Exception {

setup(additionalSettings);


try (Client tc = getClient()) {
tc.prepareIndex("humanresources")
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
Expand All @@ -396,24 +350,18 @@ public void testWriteHistory() throws Exception {
.actionGet();
}

TestAuditlogImpl.clear();

String body = "{\"doc\": {\"Age\":123}}";

HttpResponse response = rh.executePostRequest("humanresources/_doc/100?pretty", body, encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_CREATED, response.getStatusCode());
System.out.println(response.getBody());
Thread.sleep(1500);
System.out.println(TestAuditlogImpl.sb.toString());
TestAuditlogImpl.doThenWaitForMessage(() -> {
final String body = "{\"doc\": {\"Age\":123}}";
final HttpResponse response = rh.executePostRequest("humanresources/_doc/100?pretty", body, encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_CREATED, response.getStatusCode());
});
Assert.assertTrue(TestAuditlogImpl.sb.toString().split(".*audit_compliance_diff_content.*replace.*").length == 1);

body = "{\"doc\": {\"Age\":555}}";
TestAuditlogImpl.clear();
response = rh.executePostRequest("humanresources/_update/100?pretty", body, encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
System.out.println(response.getBody());
Thread.sleep(1500);
System.out.println(TestAuditlogImpl.sb.toString());
TestAuditlogImpl.doThenWaitForMessage(() -> {
final String body = "{\"doc\": {\"Age\":555}}";
final HttpResponse response = rh.executePostRequest("humanresources/_update/100?pretty", body, encodeBasicHeader("admin", "admin"));
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
});
Assert.assertTrue(TestAuditlogImpl.sb.toString().split(".*audit_compliance_diff_content.*replace.*").length == 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public TestAuditlogImpl(String name, Settings settings, String settingsPrefix, A

public synchronized boolean doStore(AuditMessage msg) {
if (messagesRef.get() == null || countDownRef.get() == null) {
throw new RuntimeException("No message latch is waiting");
// Ignore any messages that are sent before TestAuditlogImpl is waiting.
return true;
}
sb.append(msg.toPrettyString()+System.lineSeparator());
messagesRef.get().add(msg);
Expand Down Expand Up @@ -69,7 +70,7 @@ public static List<AuditMessage> doThenWaitForMessages(final Runnable action, fi
final int maxSecondsToWaitForMessages = 1;
final boolean foundAll = latch.await(maxSecondsToWaitForMessages, TimeUnit.SECONDS);
if (!foundAll) {
throw new RuntimeException("Did not recieve all " + expectedCount +" audit messages after a short wait.");
throw new MessagesNotFoundException(expectedCount, (int)latch.getCount());
}
if (messages.size() != expectedCount) {
throw new RuntimeException("Unexpected number of messages, was expecting " + expectedCount + ", recieved " + messages.size());
Expand All @@ -80,10 +81,33 @@ public static List<AuditMessage> doThenWaitForMessages(final Runnable action, fi
return new ArrayList<>(messages);
}

/**
* Perform an action and then wait until a single message has been found.
*/
public static AuditMessage doThenWaitForMessage(final Runnable action) {
return doThenWaitForMessages(action, 1).get(0);
}

@Override
public boolean isHandlingBackpressure() {
return true;
}

public static class MessagesNotFoundException extends RuntimeException {
private final int expectedCount;
private final int missingCount;
public MessagesNotFoundException(final int expectedCount, final int missingCount) {
super("Did not recieve all " + expectedCount +" audit messages after a short wait, missing " + missingCount + " messages");
this.expectedCount = expectedCount;
this.missingCount = missingCount;
}

public int getExpectedCount() {
return expectedCount;
}

public int getMissingCount() {
return missingCount;
}
}
}

0 comments on commit d507ebb

Please sign in to comment.