Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.google.googlejavaformat:google-java-format` from 1.26.0 to 1.27.0 ([#5330](https://github.com/opensearch-project/security/pull/5330))
- Bump `io.github.goooler.shadow` from 8.1.7 to 8.1.8 ([#5329](https://github.com/opensearch-project/security/pull/5329))
- Bump `commons-io:commons-io` from 2.18.0 to 2.19.0 ([#5328](https://github.com/opensearch-project/security/pull/5328))
- Upgrade kafka_version from 3.7.1 to 4.0.0 ([#5131](https://github.com/opensearch-project/security/pull/5131))

### Deprecated

Expand Down
11 changes: 9 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ buildscript {

common_utils_version = System.getProperty("common_utils.version", '3.1.0.0-SNAPSHOT')

kafka_version = '3.7.1'
kafka_version = '4.0.0'
open_saml_version = '5.1.4'
open_saml_shib_version = "9.1.4"
one_login_java_saml = '2.9.0'
Expand Down Expand Up @@ -754,15 +754,22 @@ dependencies {
testImplementation 'com.unboundid:unboundid-ldapsdk:4.0.14'
testImplementation 'org.apache.httpcomponents:fluent-hc:4.5.14'
testImplementation "org.apache.httpcomponents.client5:httpclient5-fluent:${versions.httpclient5}"
testImplementation "com.google.re2j:re2j:1.8"
testImplementation "org.apache.kafka:kafka_2.13:${kafka_version}"
testImplementation "org.apache.kafka:kafka-server:${kafka_version}"
testImplementation "org.apache.kafka:kafka-server-common:${kafka_version}"
testImplementation "org.apache.kafka:kafka-server-common:${kafka_version}:test"
testImplementation "org.apache.kafka:kafka-group-coordinator:${kafka_version}"
testImplementation "org.apache.kafka:kafka_2.13:${kafka_version}:test"
testImplementation "org.apache.kafka:kafka-clients:${kafka_version}:test"
testImplementation "org.apache.kafka:test-common:${kafka_version}"
testImplementation "org.apache.kafka:kafka-coordinator-common:${kafka_version}"
testImplementation "org.apache.kafka:kafka-group-coordinator-api:${kafka_version}"
testImplementation "org.apache.kafka:kafka-share-coordinator:${kafka_version}"
testImplementation "org.apache.kafka:kafka-test-common-runtime:${kafka_version}"
testImplementation "org.apache.kafka:kafka-test-common-internal-api:${kafka_version}"
testImplementation 'commons-validator:commons-validator:1.9.0'
testImplementation 'org.springframework.kafka:spring-kafka-test:3.3.5'
testImplementation "org.springframework.kafka:spring-kafka-test:4.0.0-M2"
testImplementation "org.springframework:spring-beans:${spring_version}"
testImplementation 'org.junit.jupiter:junit-jupiter:5.12.2'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.12.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Test;

import org.opensearch.common.settings.Settings;
Expand All @@ -30,58 +31,65 @@
import org.opensearch.security.auditlog.impl.AuditCategory;
import org.opensearch.security.test.helper.file.FileHelper;

import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

public class KafkaSinkTest extends AbstractAuditlogUnitTest {

@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 1, "compliance") {
// Prevents test exceptions from randomized runner, see https://bit.ly/3y17IkI
private UncaughtExceptionHandler currentHandler;
private static EmbeddedKafkaBroker embeddedKafka;
private static UncaughtExceptionHandler origHandler;

@Override
public void before() {
currentHandler = Thread.getDefaultUncaughtExceptionHandler();
super.before();
}
@BeforeClass
public static void startKafka() throws Exception {
// Preserve the runner’s default handler (see https://bit.ly/3y17IkI)
origHandler = Thread.getDefaultUncaughtExceptionHandler();

// 1 broker, 1 partition per topic, create topic “compliance”
embeddedKafka = new EmbeddedKafkaKraftBroker(1, 1, "compliance");
embeddedKafka.afterPropertiesSet(); // <— starts the broker
}

@Override
public void after() {
super.after();
Thread.setDefaultUncaughtExceptionHandler(currentHandler);
@AfterClass
public static void stopKafka() {
if (embeddedKafka != null) {
embeddedKafka.destroy();
}
};
Thread.setDefaultUncaughtExceptionHandler(origHandler);
}

@Test
public void testKafka() throws Exception {
String configYml = FileHelper.loadFile("auditlog/endpoints/sink/configuration_kafka.yml");
configYml = configYml.replace("_RPLC_BOOTSTRAP_SERVERS_", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
String configYml = FileHelper.loadFile("auditlog/endpoints/sink/configuration_kafka.yml")
.replace("_RPLC_BOOTSTRAP_SERVERS_", embeddedKafka.getBrokersAsString());

Settings.Builder settingsBuilder = Settings.builder().loadFromSource(configYml, YamlXContent.yamlXContent.mediaType());

try (KafkaConsumer<Long, String> consumer = createConsumer()) {
consumer.subscribe(Arrays.asList("compliance"));

Settings settings = settingsBuilder.put("path.home", ".").build();
SinkProvider provider = new SinkProvider(settings, null, null, null, null);
AuditLogSink sink = provider.getDefaultSink();

try {
assertThat(sink.getClass(), is(KafkaSink.class));
boolean success = sink.doStore(MockAuditMessageFactory.validAuditMessage(AuditCategory.MISSING_PRIVILEGES));
Assert.assertTrue(success);

ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(10));
assertThat(records.count(), is(1));
} finally {
sink.close();
}
}

}

private KafkaConsumer<Long, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
props.put("bootstrap.servers", embeddedKafka.getBrokersAsString());
props.put("auto.offset.reset", "earliest");
props.put("group.id", "mygroup" + System.currentTimeMillis() + "_" + new Random().nextDouble());
props.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
Expand Down
Loading