Skip to content

Commit

Permalink
GitHub-Issue#2778: Added CloudWatchLogs Buffer, ThresholdCheck, and C…
Browse files Browse the repository at this point in the history
…lientFactory utilities. (opensearch-project#2982)

Added CloudWatchLogs Buffer, ThresholdCheck, and ClientFactory utilities.

---------

Signed-off-by: Marcos Gonzalez Mayedo <alemayed@amazon.com>
Signed-off-by: Marcos Gonzalez Mayedo <95880281+MaGonzalMayedo@users.noreply.github.com>
Co-authored-by: Marcos <alemayed@amazon.com>
  • Loading branch information
MaGonzalMayedo and Marcos authored Jul 12, 2023
1 parent 2ed818f commit 1254702
Show file tree
Hide file tree
Showing 18 changed files with 704 additions and 76 deletions.
8 changes: 4 additions & 4 deletions data-prepper-plugins/cloudwatch-logs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,24 @@ repositories {
}

dependencies {
api project(':data-prepper-api')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:common')
testImplementation 'org.junit.jupiter:junit-jupiter'
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'software.amazon.awssdk:cloudwatch'
implementation 'software.amazon.awssdk:cloudwatchlogs'
implementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation project(path: ':data-prepper-test-common')
testImplementation project(path: ':data-prepper-test-common')
testImplementation testLibs.mockito.inline
testImplementation 'org.junit.jupiter:junit-jupiter'
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
rule {
limit {
minimum = 0.90
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.buffer;

import java.util.ArrayList;

/**
* Buffer that handles the temporary storage of
* events. It isolates the implementation of system storage.
* 1. Reads in a String.
* 2. Transforms to Byte type.
* 3. Returns a Byte type.
*/
public interface Buffer {
/**
* Size of buffer in events.
* @return int
*/
int getEventCount();

/**
* Size of buffer in bytes.
* @return int
*/
int getBufferSize();

void writeEvent(byte[] event);

byte[] popEvent();

ArrayList<byte[]> getBufferedData();

void clearBuffer();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.buffer;

/**
* BufferFactory will act as a means for decoupling the rest of
* the code from the type of buffer being used.
*/
public interface BufferFactory {
Buffer getBuffer();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.buffer;

import java.util.ArrayList;

public class InMemoryBuffer implements Buffer {
private final ArrayList<byte[]> eventsBuffered;
private int bufferSize = 0;

InMemoryBuffer() {
eventsBuffered = new ArrayList<>();
}

@Override
public int getEventCount() {
return eventsBuffered.size();
}

@Override
public int getBufferSize() {
return bufferSize;
}

@Override
public void writeEvent(final byte[] event) {
eventsBuffered.add(event);
bufferSize += event.length;
}

@Override
public byte[] popEvent() {
bufferSize -= eventsBuffered.get(0).length;
return eventsBuffered.remove(0);
}

@Override
public ArrayList<byte[]> getBufferedData() {
return eventsBuffered;
}

@Override
public void clearBuffer() {
bufferSize = 0;
eventsBuffered.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.buffer;

public class InMemoryBufferFactory implements BufferFactory{
@Override
public Buffer getBuffer() {
return new InMemoryBuffer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.client;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.sink.config.AwsConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;

/**
* CwlClientFactory is in charge of reading in
* aws config parameters to return a working
* client for interfacing with
* CloudWatchLogs services.
*/
public final class CloudWatchLogsClientFactory {

/**
* Generates a CloudWatchLogs Client based on STS role ARN system credentials.
* @param awsConfig - AwsConfig specifying region, roles, and header overrides.
* @param awsCredentialsSupplier - AwsCredentialsSupplier Interface for which to create CredentialsProvider for Client config.
* @return CloudWatchLogsClient - used to interact with CloudWatch Logs services.
*/
public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialOptions(awsConfig);
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions);

return CloudWatchLogsClient.builder()
.region(awsConfig.getAwsRegion())
.credentialsProvider(awsCredentialsProvider)
.overrideConfiguration(createOverrideConfiguration()).build();
}

private static ClientOverrideConfiguration createOverrideConfiguration() {
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS).build();

return ClientOverrideConfiguration.builder()
.retryPolicy(retryPolicy)
.build();
}

private static AwsCredentialsOptions convertToCredentialOptions(final AwsConfig awsConfig) {
return AwsCredentialsOptions.builder()
.withRegion(awsConfig.getAwsRegion())
.withStsRoleArn(awsConfig.getAwsStsRoleArn())
.withStsExternalId(awsConfig.getAwsStsExternalId())
.withStsHeaderOverrides(awsConfig.getAwsStsHeaderOverrides())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.config;

import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

public class CwlSinkConfig {
public class CloudWatchLogsSinkConfig {
public static final String DEFAULT_BUFFER_TYPE = "in_memory";

@JsonProperty("aws")
Expand All @@ -14,8 +19,7 @@ public class CwlSinkConfig {
private AwsConfig awsConfig;

@JsonProperty("threshold")
@NotNull
private ThresholdConfig thresholdConfig;
private ThresholdConfig thresholdConfig = new ThresholdConfig();

@JsonProperty("buffer_type")
private String bufferType = DEFAULT_BUFFER_TYPE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.config;

import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -9,12 +14,12 @@
* restrictions.
*/
public class ThresholdConfig {
public static final int DEFAULT_BATCH_SIZE = 100;
public static final int DEFAULT_BATCH_SIZE = 25;
public static final int DEFAULT_EVENT_SIZE = 50;
public static final int DEFAULT_SIZE_OF_REQUEST = 524288;
public static final int DEFAULT_RETRY_COUNT = 5;
public static final int DEFAULT_LOG_SEND_INTERVAL_TIME = 60;
public static final int DEFAULT_BACKOFF_TIME = 5000;
public static final int DEFAULT_BACKOFF_TIME = 500;

@JsonProperty("batch_size")
@Size(min = 1, max = 10000, message = "batch_size amount should be between 1 to 10000")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.threshold;
/**
* ThresholdCheck receives parameters for which to reference the
* limits of a buffer and CloudWatchLogsClient before making a
* PutLogEvent request to AWS.
*/
public class ThresholdCheck {
private final int batchSize;
private final int maxEventSizeBytes;
private final int maxRequestSizeBytes;
private final long logSendInterval;

public ThresholdCheck(final int batchSize, final int maxEventSizeBytes, final int maxRequestSizeBytes, final int logSendInterval) {
this.batchSize = batchSize;
this.maxEventSizeBytes = maxEventSizeBytes;
this.maxRequestSizeBytes = maxRequestSizeBytes;
this.logSendInterval = logSendInterval;
}

/**
* Checks to see if we exceed any of the threshold conditions.
* @param currentTime - (long) denoting the time in seconds.
* @param currentRequestSize - size of request in bytes.
* @param batchSize - size of batch in events.
* @return boolean - true if we exceed the threshold events or false otherwise.
*/
public boolean isGreaterThanThresholdReached(final long currentTime, final int currentRequestSize, final int batchSize) {
return ((isGreaterThanBatchSize(batchSize) || isGreaterEqualToLogSendInterval(currentTime)
|| isGreaterThanMaxRequestSize(currentRequestSize)) && (batchSize > 0));
}

/**
* Checks to see if we equal any of the threshold conditions.
* @param currentRequestSize - size of request in bytes.
* @param batchSize - size of batch in events.
* @return boolean - true if we equal the threshold events or false otherwise.
*/
public boolean isEqualToThresholdReached(final int currentRequestSize, final int batchSize) {
return ((isEqualBatchSize(batchSize) || isEqualMaxRequestSize(currentRequestSize)) && (batchSize > 0));
}

/**
* Checks if the interval passed in is equal to or greater
* than the threshold interval for sending PutLogEvents.
* @param currentTimeSeconds int denoting seconds.
* @return boolean - true if greater than or equal to logInterval, false otherwise.
*/
private boolean isGreaterEqualToLogSendInterval(final long currentTimeSeconds) {
return currentTimeSeconds >= logSendInterval;
}

/**
* Determines if the event size is greater than the max event size.
* @param eventSize int denoting size of event.
* @return boolean - true if greater than MaxEventSize, false otherwise.
*/
public boolean isGreaterThanMaxEventSize(final int eventSize) {
return eventSize > maxEventSizeBytes;
}

/**
* Checks if the request size is greater than or equal to the current size passed in.
* @param currentRequestSize int denoting size of request(Sum of PutLogEvent messages).
* @return boolean - true if greater than Max request size, smaller otherwise.
*/
private boolean isGreaterThanMaxRequestSize(final int currentRequestSize) {
return currentRequestSize > maxRequestSizeBytes;
}

/**
* Checks if the current batch size is greater to the threshold
* batch size.
* @param batchSize int denoting the size of the batch of PutLogEvents.
* @return boolean - true if greater, false otherwise.
*/
private boolean isGreaterThanBatchSize(final int batchSize) {
return batchSize > this.batchSize;
}

/**
* Checks if the request size is greater than or equal to the current size passed in.
* @param currentRequestSize int denoting size of request(Sum of PutLogEvent messages).
* @return boolean - true if equal Max request size, smaller otherwise.
*/
private boolean isEqualMaxRequestSize(final int currentRequestSize) {
return currentRequestSize == maxRequestSizeBytes;
}

private boolean isEqualBatchSize(final int batchSize) {
return batchSize == this.batchSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.buffer;

import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.typeCompatibleWith;

public class InMemoryBufferFactoryTest {
@Test
void check_buffer_not_null() {
Buffer buffer = new InMemoryBufferFactory().getBuffer();
assertThat(buffer, notNullValue());
assertThat(buffer.getClass(), typeCompatibleWith(Buffer.class));
}
}
Loading

0 comments on commit 1254702

Please sign in to comment.