Skip to content

Commit cfd4f59

Browse files
authored
add tests for Kinesis scheduler client using the SubscribeToShard API (#47)
1 parent d9e2e8c commit cfd4f59

File tree

8 files changed

+217
-8
lines changed

8 files changed

+217
-8
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,5 @@ hs_err_pid*
2929
.classpath
3030
.settings
3131
.project
32+
33+
.vscode

pom.xml

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
32
<modelVersion>4.0.0</modelVersion>
43

54
<groupId>cloud.localstack</groupId>
@@ -207,6 +206,30 @@
207206
<scope>provided</scope>
208207
</dependency>
209208

209+
<dependency>
210+
<groupId>software.amazon.kinesis</groupId>
211+
<artifactId>amazon-kinesis-client</artifactId>
212+
<version>2.2.9</version>
213+
<scope>provided</scope>
214+
</dependency>
215+
<dependency>
216+
<groupId>org.apache.logging.log4j</groupId>
217+
<artifactId>log4j-core</artifactId>
218+
<version>2.14.1</version>
219+
<scope>provided</scope>
220+
</dependency>
221+
<dependency>
222+
<groupId>org.apache.logging.log4j</groupId>
223+
<artifactId>log4j-api</artifactId>
224+
<version>2.14.1</version>
225+
<scope>provided</scope>
226+
</dependency>
227+
<dependency>
228+
<groupId>org.apache.logging.log4j</groupId>
229+
<artifactId>log4j-slf4j-impl</artifactId>
230+
<version>2.14.1</version>
231+
<scope>provided</scope>
232+
</dependency>
210233

211234
<!-- test dependencies -->
212235
<dependency>
@@ -287,8 +310,7 @@
287310
</profile>
288311
<profile>
289312
<id>awssdkv1</id>
290-
<dependencies>
291-
</dependencies>
313+
<dependencies></dependencies>
292314
</profile>
293315
<profile>
294316
<id>awssdkv2</id>
@@ -308,6 +330,11 @@
308330
<artifactId>kinesis</artifactId>
309331
<version>${aws.sdkv2.version}</version>
310332
</dependency>
333+
<dependency>
334+
<groupId>software.amazon.kinesis</groupId>
335+
<artifactId>amazon-kinesis-client</artifactId>
336+
<version>2.2.9</version>
337+
</dependency>
311338
</dependencies>
312339
</profile>
313340
</profiles>
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package cloud.localstack.awssdkv2;
2+
3+
import cloud.localstack.awssdkv2.consumer.DeliveryStatusRecordProcessorFactory;
4+
import cloud.localstack.awssdkv2.consumer.EventProcessor;
5+
import cloud.localstack.docker.annotation.LocalstackDockerProperties;
6+
import software.amazon.awssdk.core.SdkBytes;
7+
import software.amazon.awssdk.core.SdkSystemSetting;
8+
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
9+
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
10+
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
11+
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
12+
import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse;
13+
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
14+
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
15+
import software.amazon.kinesis.common.ConfigsBuilder;
16+
import software.amazon.kinesis.coordinator.Scheduler;
17+
import software.amazon.kinesis.metrics.NullMetricsFactory;
18+
19+
import org.junit.Assert;
20+
import org.junit.Before;
21+
import org.junit.Test;
22+
23+
import java.util.*;
24+
import java.util.concurrent.TimeUnit;
25+
26+
@LocalstackDockerProperties(ignoreDockerRunErrors = true)
27+
public class KinesisSchedulerTest extends PowerMockLocalStack {
28+
String streamName = "test" + UUID.randomUUID().toString();
29+
String workerId = UUID.randomUUID().toString();
30+
String testMessage = "hello, world";
31+
Integer consumerCreationTime = 15; //35 for aws
32+
33+
@Before
34+
public void mockServicesForScheduler() {
35+
// System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
36+
PowerMockLocalStack.mockCloudWatchAsyncClient();
37+
PowerMockLocalStack.mockDynamoDBAsync();
38+
PowerMockLocalStack.mockKinesisAsync();
39+
}
40+
41+
@Test
42+
public void schedulerTest() throws Exception {
43+
44+
KinesisAsyncClient kinesisAsyncClient = KinesisAsyncClient.create();
45+
DynamoDbAsyncClient dynamoAsyncClient = DynamoDbAsyncClient.create();
46+
CloudWatchAsyncClient cloudWatchAsyncClient = CloudWatchAsyncClient.create();
47+
48+
createStream(kinesisAsyncClient);
49+
TimeUnit.SECONDS.sleep(2);
50+
51+
EventProcessor eventProcessor = new EventProcessor();
52+
DeliveryStatusRecordProcessorFactory processorFactory = new DeliveryStatusRecordProcessorFactory(eventProcessor);
53+
54+
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisAsyncClient, dynamoAsyncClient,
55+
cloudWatchAsyncClient, workerId, processorFactory);
56+
Scheduler scheduler = createScheduler(configsBuilder);
57+
58+
new Thread(scheduler).start();
59+
TimeUnit.SECONDS.sleep(consumerCreationTime);
60+
61+
putRecord(kinesisAsyncClient);
62+
TimeUnit.SECONDS.sleep(5);
63+
64+
scheduler.shutdown();
65+
Assert.assertTrue(eventProcessor.CONSUMER_CREATED);
66+
Assert.assertTrue(eventProcessor.RECORD_RECEIVED);
67+
Assert.assertTrue(eventProcessor.messages.size() > 0);
68+
Assert.assertEquals(eventProcessor.messages.get(0), testMessage);
69+
}
70+
71+
public Scheduler createScheduler(ConfigsBuilder configsBuilder) {
72+
return new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(),
73+
configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(),
74+
configsBuilder.metricsConfig().metricsFactory(new NullMetricsFactory()), configsBuilder.processorConfig(),
75+
configsBuilder.retrievalConfig());
76+
}
77+
78+
public void createStream(KinesisAsyncClient kinesisClient) throws Exception {
79+
CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(1).build();
80+
CreateStreamResponse response = kinesisClient.createStream(request).get();
81+
82+
Assert.assertNotNull(response);
83+
}
84+
85+
public void putRecord(KinesisAsyncClient kinesisClient) throws Exception {
86+
System.out.println("PUTTING RECORD");
87+
PutRecordRequest request = PutRecordRequest.builder().partitionKey("partitionkey").streamName(streamName)
88+
.data(SdkBytes.fromUtf8String(testMessage)).build();
89+
PutRecordResponse response = kinesisClient.putRecord(request).get();
90+
91+
Assert.assertNotNull(response);
92+
}
93+
94+
}

src/test/java/cloud/localstack/awssdkv2/KinesisV2ConsumerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cloud.localstack.awssdkv2;
22

33
import cloud.localstack.LocalstackTestRunner;
4+
import cloud.localstack.docker.annotation.LocalstackDockerProperties;
45

56
import org.junit.Assert;
67
import org.junit.Test;
@@ -16,6 +17,7 @@
1617
import java.util.stream.Collectors;
1718

1819
@RunWith(LocalstackTestRunner.class)
20+
@LocalstackDockerProperties(ignoreDockerRunErrors = true)
1921
public class KinesisV2ConsumerTest {
2022

2123
@Test
@@ -49,6 +51,6 @@ public void testGetRecordCBOR() throws Exception {
4951
public void testGetRecordJSON() throws Exception {
5052
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
5153
this.testGetRecordCBOR();
52-
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
54+
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "true");
5355
}
5456
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package cloud.localstack.awssdkv2.consumer;
2+
3+
import java.io.IOException;
4+
import java.util.logging.Logger;
5+
6+
import software.amazon.kinesis.lifecycle.events.InitializationInput;
7+
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
8+
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
9+
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
10+
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
11+
import software.amazon.kinesis.processor.ShardRecordProcessor;
12+
import software.amazon.kinesis.retrieval.KinesisClientRecord;
13+
14+
public class DeliveryStatusProcessor implements ShardRecordProcessor {
15+
EventProcessor eventProcessor;
16+
private static final Logger LOG = Logger.getLogger(DeliveryStatusProcessor.class.getName());
17+
18+
public DeliveryStatusProcessor(EventProcessor eventProcessor) {
19+
this.eventProcessor = eventProcessor;
20+
}
21+
22+
public void initialize(InitializationInput initializationInput) {
23+
this.eventProcessor.CONSUMER_CREATED = true;
24+
}
25+
26+
public void processRecords(ProcessRecordsInput processRecordsInput) {
27+
LOG.info("RECORDS PROCESSING");
28+
this.eventProcessor.RECORD_RECEIVED = true;
29+
processRecordsInput.records().forEach(record -> {
30+
try {
31+
processRecord(record);
32+
} catch (IOException e) {
33+
e.printStackTrace();
34+
}
35+
});
36+
}
37+
38+
public void processRecord(KinesisClientRecord record) throws IOException {
39+
LOG.info("RECORD PROCESSING");
40+
this.eventProcessor.RECORD_RECEIVED = true;
41+
byte[] message = new byte[record.data().remaining()];
42+
record.data().get(message);
43+
String string = new String(message);
44+
eventProcessor.messages.add(string);
45+
}
46+
47+
public void processAndPublishRecord(byte[] messageStatus) throws IOException {
48+
}
49+
50+
public void leaseLost(LeaseLostInput leaseLostInput) {
51+
}
52+
53+
public void shardEnded(ShardEndedInput shardEndedInput) {
54+
}
55+
56+
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
57+
}
58+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package cloud.localstack.awssdkv2.consumer;
2+
3+
import software.amazon.kinesis.processor.ShardRecordProcessor;
4+
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
5+
6+
public class DeliveryStatusRecordProcessorFactory implements ShardRecordProcessorFactory{
7+
private final EventProcessor eventProcessor;
8+
9+
public DeliveryStatusRecordProcessorFactory(EventProcessor eventProcessor) {
10+
this.eventProcessor = eventProcessor;
11+
}
12+
13+
public ShardRecordProcessor shardRecordProcessor() {
14+
return new DeliveryStatusProcessor(eventProcessor);
15+
}
16+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package cloud.localstack.awssdkv2.consumer;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
6+
public class EventProcessor {
7+
public Boolean CONSUMER_CREATED = false;
8+
public Boolean RECORD_RECEIVED = false;
9+
public List<String> messages = new ArrayList<>();
10+
}

src/test/resources/log4j2.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
<Configuration status="DEBUG">
1+
<Configuration status="INFO">
22
<Appenders>
33
<Console name="ConsoleAppender" target="SYSTEM_OUT">
44
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c:%L - %m%n" />
55
</Console>
66
</Appenders>
77

88
<Loggers>
9-
<Root level="DEBUG">
9+
<Root level="INFO">
1010
<AppenderRef ref="ConsoleAppender"/>
1111
</Root>
12-
<Logger name="software.amazon.awssdk" level="DEBUG" />
12+
<Logger name="software.amazon.awssdk" level="INFO" />
1313
</Loggers>
1414
</Configuration>

0 commit comments

Comments
 (0)