Skip to content

Commit

Permalink
Create Scala sample implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohei TSUJI committed Sep 21, 2018
1 parent c1abbb1 commit 0dec731
Show file tree
Hide file tree
Showing 17 changed files with 370 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
target/
.idea/
*.iml
1 change: 1 addition & 0 deletions java-example/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
target/
out/
.gradle/
2 changes: 2 additions & 0 deletions java-example/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ mainClassName = 'App'
dependencies {
// This dependency is found on compile classpath of this component and consumers.
compile 'com.amazonaws:amazon-kinesis-client:1.8.8'
compile 'ch.qos.logback:logback-classic:1.2.3'
compile 'org.slf4j:jcl-over-slf4j:1.7.25'

// Use JUnit test framework
testCompile 'junit:junit:4.12'
Expand Down
12 changes: 0 additions & 12 deletions java-example/src/main/java/App.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package kclexample.java;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

import java.util.Optional;

public class ExampleRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {

private final String tableName;

ExampleRecordProcessor(String tableName) {
this.tableName = tableName;
}

private String shardId;
private AmazonDynamoDB dynamoDB;
private Table table;

@Override
public void initialize(InitializationInput initializationInput) {
shardId = initializationInput.getShardId();

// Initialize any resources for #processRecords().
dynamoDB = AmazonDynamoDBClientBuilder.defaultClient();
table = new Table(dynamoDB, tableName);
}

@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
// Processing incoming records.
retry(() -> {
processRecordsInput.getRecords().forEach(record -> {
System.out.println(record);
});
});

// Record checkpoint if all incoming records processed successfully.
recordCheckpoint(processRecordsInput.getCheckpointer());
}

@Override
public void shutdown(ShutdownInput shutdownInput) {
// Record checkpoint at closing shard if shutdown reason is TERMINATE.
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
recordCheckpoint(shutdownInput.getCheckpointer());
}

// Cleanup initialized resources.
Optional.ofNullable(dynamoDB).ifPresent(AmazonDynamoDB::shutdown);
}

@Override
public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
// Record checkpoint at graceful shutdown.
recordCheckpoint(checkpointer);
}

private void recordCheckpoint(IRecordProcessorCheckpointer checkpointer) {
retry(() -> {
try {
checkpointer.checkpoint();
} catch (Throwable e) {
throw new RuntimeException("Record checkpoint failed.", e);
}
});
}

private void retry(Runnable f) {
try {
f.run();
} catch (Throwable e) {
System.out.println(String.format("An error occurred %s. That will be retry...", e.getMessage()));
try {
Thread.sleep(3000);
} catch (InterruptedException e2) {
e2.printStackTrace();
}
retry(f);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package kclexample.java;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class ExampleRecordProcessorFactory implements IRecordProcessorFactory {

private final String tableName;

ExampleRecordProcessorFactory(String tableName) {
this.tableName = tableName;
}

@Override
public IRecordProcessor createProcessor() {
return new ExampleRecordProcessor(tableName);
}
}
53 changes: 53 additions & 0 deletions java-example/src/main/java/kclexample/java/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package kclexample.java;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.UUID;

public class Main {

public static void main(String... args) {

// Create a Worker.
final Worker worker = new Worker.Builder()
.recordProcessorFactory(
new ExampleRecordProcessorFactory("examples-table")
)
.config(
new KinesisClientLibConfiguration(
"kcl-java-example",
"kcl-sample",
DefaultAWSCredentialsProviderChain.getInstance(),
generateWorkerId()
).withRegionName("us-east-1")
.withInitialLeaseTableReadCapacity(1)
.withInitialLeaseTableWriteCapacity(1)
)
.build();

// Shutdown worker gracefully using shutdown hook.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
worker.startGracefulShutdown().get();
} catch (Throwable e) {
e.printStackTrace();
}
}));

// Start the worker.
worker.run();
}

private static String generateWorkerId() {
try {
return InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
} catch (UnknownHostException e) {
throw new RuntimeException("Could not generate worker ID.", e);
}
}

}
18 changes: 18 additions & 0 deletions java-example/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>

<logger name="com.amazonaws.services.kinesis.clientlibrary" level="DEBUG" />
<logger name="com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker" level="WARN" />

<root level="INFO">
<appender-ref ref="STDOUT" />
</root>

</configuration>
12 changes: 0 additions & 12 deletions java-example/src/test/java/AppTest.java

This file was deleted.

2 changes: 2 additions & 0 deletions scala-example/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
target/
.idea/
18 changes: 18 additions & 0 deletions scala-example/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import Dependencies._

lazy val root = (project in file(".")).
settings(
inThisBuild(List(
organization := "kclexample",
scalaVersion := "2.12.5",
version := "0.1.0-SNAPSHOT"
)),
name := "scala-example",

libraryDependencies ++= Seq(
kcl,
logback,
slf4j,
scalaTest % Test
)
)
8 changes: 8 additions & 0 deletions scala-example/project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import sbt._

object Dependencies {
lazy val kcl = "com.amazonaws" % "amazon-kinesis-client" % "1.8.8"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
lazy val slf4j = "org.slf4j" % "jcl-over-slf4j" % "1.7.25"
lazy val scalaTest = "org.scalatest" %% "scalatest" % "3.0.5"
}
1 change: 1 addition & 0 deletions scala-example/project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.1.1
18 changes: 18 additions & 0 deletions scala-example/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>

<logger name="com.amazonaws.services.kinesis.clientlibrary" level="DEBUG" />
<logger name="com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker" level="WARN" />

<root level="INFO">
<appender-ref ref="STDOUT" />
</root>

</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package kclexample.scala

import com.amazonaws.services.dynamodbv2.{ AmazonDynamoDB, AmazonDynamoDBClientBuilder }
import com.amazonaws.services.dynamodbv2.document.Table
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.{ IRecordProcessor, IShutdownNotificationAware }
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
import com.amazonaws.services.kinesis.clientlibrary.types.{ InitializationInput, ProcessRecordsInput, ShutdownInput }

import scala.annotation.tailrec
import scala.util.{ Failure, Success, Try }

class ExampleRecordProcessor(
private val tableName: String
) extends IRecordProcessor with IShutdownNotificationAware {

private var shardId: String = _
private var dynamoDB: AmazonDynamoDB = _
private var table: Table = _

override def initialize(initializationInput: InitializationInput): Unit = {
shardId = initializationInput.getShardId

// Initialize any resources for #processRecords().
dynamoDB = AmazonDynamoDBClientBuilder.defaultClient()
table = new Table(dynamoDB, tableName)
}

override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
// Processing incoming records.
retry {
processRecordsInput.getRecords.forEach { record =>
println(record)
}
}

// Record checkpoint if all incoming records processed successfully.
recordCheckpoint(processRecordsInput.getCheckpointer)
}

override def shutdown(shutdownInput: ShutdownInput): Unit = {
// Record checkpoint at closing shard if shutdown reason is TERMINATE.
if (shutdownInput.getShutdownReason == ShutdownReason.TERMINATE) {
recordCheckpoint(shutdownInput.getCheckpointer)
}

// Cleanup initialized resources.
Option(dynamoDB).foreach(_.shutdown())
}

override def shutdownRequested(checkpointer: IRecordProcessorCheckpointer): Unit = {
// Record checkpoint at graceful shutdown.
recordCheckpoint(checkpointer)
}

private def recordCheckpoint(checkpointer: IRecordProcessorCheckpointer): Unit = {
retry {
checkpointer.checkpoint()
}
}

@tailrec
private def retry(f: => Unit): Unit = {
Try {
f
} match {
case Success(_) => ()
case Failure(e) =>
println(s"An error occurred $e. That will be retry...")
retry(f)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package kclexample.scala

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory

class ExampleRecordProcessorFactory(
private val tableName: String
) extends IRecordProcessorFactory {

override def createProcessor() = new ExampleRecordProcessor(tableName)

}
Loading

0 comments on commit 0dec731

Please sign in to comment.