-
Notifications
You must be signed in to change notification settings - Fork 43
Kinesis scheduler test #47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
61f7eed
start of scheduler test for kinesis v2
pinzon d809ff8
testing consumer created and record processed
pinzon a33300b
comments
pinzon 66b9640
test logging
pinzon 5b01852
compare records for complete validation
pinzon 84be160
json mode
pinzon File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,3 +29,5 @@ hs_err_pid* | |
.classpath | ||
.settings | ||
.project | ||
|
||
.vscode |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
94 changes: 94 additions & 0 deletions
94
src/test/java/cloud/localstack/awssdkv2/KinesisSchedulerTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
package cloud.localstack.awssdkv2; | ||
|
||
import cloud.localstack.awssdkv2.consumer.DeliveryStatusRecordProcessorFactory; | ||
import cloud.localstack.awssdkv2.consumer.EventProcessor; | ||
import cloud.localstack.docker.annotation.LocalstackDockerProperties; | ||
import software.amazon.awssdk.core.SdkBytes; | ||
import software.amazon.awssdk.core.SdkSystemSetting; | ||
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; | ||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; | ||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; | ||
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; | ||
import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse; | ||
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; | ||
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; | ||
import software.amazon.kinesis.common.ConfigsBuilder; | ||
import software.amazon.kinesis.coordinator.Scheduler; | ||
import software.amazon.kinesis.metrics.NullMetricsFactory; | ||
|
||
import org.junit.Assert; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
|
||
import java.util.*; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
@LocalstackDockerProperties(ignoreDockerRunErrors = true) | ||
public class KinesisSchedulerTest extends PowerMockLocalStack { | ||
String streamName = "test" + UUID.randomUUID().toString(); | ||
String workerId = UUID.randomUUID().toString(); | ||
String testMessage = "hello, world"; | ||
Integer consumerCreationTime = 15; //35 for aws | ||
|
||
@Before | ||
public void mockServicesForScheduler() { | ||
// System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); | ||
PowerMockLocalStack.mockCloudWatchAsyncClient(); | ||
PowerMockLocalStack.mockDynamoDBAsync(); | ||
PowerMockLocalStack.mockKinesisAsync(); | ||
} | ||
|
||
@Test | ||
public void schedulerTest() throws Exception { | ||
|
||
KinesisAsyncClient kinesisAsyncClient = KinesisAsyncClient.create(); | ||
DynamoDbAsyncClient dynamoAsyncClient = DynamoDbAsyncClient.create(); | ||
CloudWatchAsyncClient cloudWatchAsyncClient = CloudWatchAsyncClient.create(); | ||
|
||
createStream(kinesisAsyncClient); | ||
TimeUnit.SECONDS.sleep(2); | ||
|
||
EventProcessor eventProcessor = new EventProcessor(); | ||
DeliveryStatusRecordProcessorFactory processorFactory = new DeliveryStatusRecordProcessorFactory(eventProcessor); | ||
|
||
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisAsyncClient, dynamoAsyncClient, | ||
cloudWatchAsyncClient, workerId, processorFactory); | ||
Scheduler scheduler = createScheduler(configsBuilder); | ||
|
||
new Thread(scheduler).start(); | ||
TimeUnit.SECONDS.sleep(consumerCreationTime); | ||
|
||
putRecord(kinesisAsyncClient); | ||
TimeUnit.SECONDS.sleep(5); | ||
|
||
scheduler.shutdown(); | ||
Assert.assertTrue(eventProcessor.CONSUMER_CREATED); | ||
Assert.assertTrue(eventProcessor.RECORD_RECEIVED); | ||
Assert.assertTrue(eventProcessor.messages.size() > 0); | ||
Assert.assertEquals(eventProcessor.messages.get(0), testMessage); | ||
} | ||
|
||
public Scheduler createScheduler(ConfigsBuilder configsBuilder) { | ||
return new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), | ||
configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), | ||
configsBuilder.metricsConfig().metricsFactory(new NullMetricsFactory()), configsBuilder.processorConfig(), | ||
configsBuilder.retrievalConfig()); | ||
} | ||
|
||
public void createStream(KinesisAsyncClient kinesisClient) throws Exception { | ||
CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(1).build(); | ||
CreateStreamResponse response = kinesisClient.createStream(request).get(); | ||
|
||
Assert.assertNotNull(response); | ||
} | ||
|
||
public void putRecord(KinesisAsyncClient kinesisClient) throws Exception { | ||
System.out.println("PUTTING RECORD"); | ||
PutRecordRequest request = PutRecordRequest.builder().partitionKey("partitionkey").streamName(streamName) | ||
.data(SdkBytes.fromUtf8String(testMessage)).build(); | ||
PutRecordResponse response = kinesisClient.putRecord(request).get(); | ||
|
||
Assert.assertNotNull(response); | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
58 changes: 58 additions & 0 deletions
58
src/test/java/cloud/localstack/awssdkv2/consumer/DeliveryStatusProcessor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package cloud.localstack.awssdkv2.consumer; | ||
|
||
import java.io.IOException; | ||
import java.util.logging.Logger; | ||
|
||
import software.amazon.kinesis.lifecycle.events.InitializationInput; | ||
import software.amazon.kinesis.lifecycle.events.LeaseLostInput; | ||
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; | ||
import software.amazon.kinesis.lifecycle.events.ShardEndedInput; | ||
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; | ||
import software.amazon.kinesis.processor.ShardRecordProcessor; | ||
import software.amazon.kinesis.retrieval.KinesisClientRecord; | ||
|
||
public class DeliveryStatusProcessor implements ShardRecordProcessor { | ||
EventProcessor eventProcessor; | ||
private static final Logger LOG = Logger.getLogger(DeliveryStatusProcessor.class.getName()); | ||
|
||
public DeliveryStatusProcessor(EventProcessor eventProcessor) { | ||
this.eventProcessor = eventProcessor; | ||
} | ||
|
||
public void initialize(InitializationInput initializationInput) { | ||
this.eventProcessor.CONSUMER_CREATED = true; | ||
} | ||
|
||
public void processRecords(ProcessRecordsInput processRecordsInput) { | ||
LOG.info("RECORDS PROCESSING"); | ||
this.eventProcessor.RECORD_RECEIVED = true; | ||
processRecordsInput.records().forEach(record -> { | ||
try { | ||
processRecord(record); | ||
} catch (IOException e) { | ||
e.printStackTrace(); | ||
} | ||
}); | ||
} | ||
|
||
public void processRecord(KinesisClientRecord record) throws IOException { | ||
LOG.info("RECORD PROCESSING"); | ||
this.eventProcessor.RECORD_RECEIVED = true; | ||
byte[] message = new byte[record.data().remaining()]; | ||
record.data().get(message); | ||
String string = new String(message); | ||
eventProcessor.messages.add(string); | ||
} | ||
|
||
public void processAndPublishRecord(byte[] messageStatus) throws IOException { | ||
} | ||
|
||
public void leaseLost(LeaseLostInput leaseLostInput) { | ||
} | ||
|
||
public void shardEnded(ShardEndedInput shardEndedInput) { | ||
} | ||
|
||
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { | ||
} | ||
} |
16 changes: 16 additions & 0 deletions
16
src/test/java/cloud/localstack/awssdkv2/consumer/DeliveryStatusRecordProcessorFactory.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package cloud.localstack.awssdkv2.consumer; | ||
|
||
import software.amazon.kinesis.processor.ShardRecordProcessor; | ||
import software.amazon.kinesis.processor.ShardRecordProcessorFactory; | ||
|
||
public class DeliveryStatusRecordProcessorFactory implements ShardRecordProcessorFactory{ | ||
private final EventProcessor eventProcessor; | ||
|
||
public DeliveryStatusRecordProcessorFactory(EventProcessor eventProcessor) { | ||
this.eventProcessor = eventProcessor; | ||
} | ||
|
||
public ShardRecordProcessor shardRecordProcessor() { | ||
return new DeliveryStatusProcessor(eventProcessor); | ||
} | ||
} |
10 changes: 10 additions & 0 deletions
10
src/test/java/cloud/localstack/awssdkv2/consumer/EventProcessor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package cloud.localstack.awssdkv2.consumer; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
public class EventProcessor { | ||
public Boolean CONSUMER_CREATED = false; | ||
public Boolean RECORD_RECEIVED = false; | ||
public List<String> messages = new ArrayList<>(); | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,14 @@ | ||
<Configuration status="DEBUG"> | ||
<Configuration status="INFO"> | ||
<Appenders> | ||
<Console name="ConsoleAppender" target="SYSTEM_OUT"> | ||
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c:%L - %m%n" /> | ||
</Console> | ||
</Appenders> | ||
|
||
<Loggers> | ||
<Root level="DEBUG"> | ||
<Root level="INFO"> | ||
<AppenderRef ref="ConsoleAppender"/> | ||
</Root> | ||
<Logger name="software.amazon.awssdk" level="DEBUG" /> | ||
<Logger name="software.amazon.awssdk" level="INFO" /> | ||
</Loggers> | ||
</Configuration> |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.