A Kafka Connector which implements a "source connector" for AWS DynamoDB table Streams. This source connector allows replicating DynamoDB tables into Kafka topics. Once data is in Kafka you can use various Kafka sink connectors to push this data into different destinations systems, e.g. - BigQuery for easy analytics.
autodiscovery
- monitors and automatically discovers DynamoDB tables to start/stop syncing from (based on AWS TAG's)initial sync
- automatically detects and if needed performs initial(existing) data replication before tracking changes from the DynamoDB table streamlocal debugging
- use of test containers to test full connector life-cycle
Prior our development we found only one existing implementation by shikhar, but it seems to be missing major features (initial sync, handling shard changes) and is no longer supported.
Also, it tries to manage DynamoDB Stream shards manually by using one Kafka Connect task to read from each DynamoDB Streams shard. But, since DynamoDB Stream shards are dynamic contrary to static ones in "normal" Kinesis streams this approach would require rebalancing all Kafka Connect cluster tasks far to often.
In our implementation we opted to use Amazon Kinesis Client with DynamoDB Streams Kinesis Adapter which takes care of all shard reading and tracking tasks.
- Java 8
- Gradlew 5.3.1
- Kafka Connect Framework >= 2.1.1
- Amazon Kinesis Client 1.13.1
- DynamoDB Streams Kinesis Adapter 1.5.3
-
KCL(Amazon Kinesis Client) keeps metadata in separate dedicated DynamoDB table for each DynamoDB Stream it's tracking. Meaning that there will be one additional table created for each table this connector is tracking.
-
Current implementation supports only one Kafka Connect task(= KCL worker) reading from one table at any given time.
- Due to this limitation we tested maximum throughput from one table to be ~2000 records(change events) per second.
- This limitation is imposed by our connectors logic and not by the KCL library or Kafka Connect framework. We opted to skip this feature since running multiple tasks per table would require additional synchronization mechanisms for
INIT SYNC
state tracking. And this means higher complexity and longer development time. However this might be implemented in later versions.
-
Running multiple KCL workers on the same JVM has negative impact on overall performance of all workers. (NOTE: one KCL worker is executed by each individual Connector task. And each task is responsible for one DynamoDB table.)
- This is because Amazon Kinesis Client library has some global locking happening.
- This issue has been solved in newer KCL versions, but reading from DynamoDB Streams requires usage of DynamoDB Streams Kinesis Adapter library. And this library still depends on older Amazon Kinesis Client 1.9.1.
- However you will only encounter this issue by running lots of tasks on one machine with really high load.
-
Synced(Source) DynamoDB table unit capacity must be large enough to ensure
INIT_SYNC
to be finished in around 16 hours. Otherwise there is a riskINIT_SYNC
being restarted just as soon as it's finished because DynamoDB Streams store change events only for 24 hours.INIT_SYNC
can be skipped withinit.sync.skip=true
configuration
-
Required AWS roles:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:DescribeTable",
"dynamodb:DescribeStream",
"dynamodb:ListTagsOfResource",
"dynamodb:DescribeLimits"
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:Scan"
],
"Resource": [
"arn:aws:dynamodb:*:*:table/*"
]
},
{
"Effect": "Allow",
"Action": [
"dynamodb:*"
],
"Resource": [
"arn:aws:dynamodb:*:*:table/datalake-KCL-*"
]
},
{
"Effect": "Allow",
"Action": [
"dynamodb:ListStreams",
"dynamodb:ListTables",
"dynamodb:ListGlobalTables",
"tag:GetResources"
],
"Resource": [
"*"
]
}
]
}
# build & run unit tests
./gradlew
# run integration tests
./gradlew integrationTests
# build final jar
./gradlew shadowJar
To interactively debug your connector thirst:
- complete steps defined in Getting started
- set following variables:
confluent stop connect export CONNECT_DEBUG=y; export DEBUG_SUSPEND_FLAG=y; confluent start connect
- Connect with your debugger. For instance remotely to the Connect worker (default port: 5005)
- To stop running connect in debug mode, just run:
unset CONNECT_DEBUG; unset DEBUG_SUSPEND_FLAG;
when you are done
Please read CONTRIBUTING.md for details on our code of conduct, and the process for submitting pull requests to us.
We use SemVer for versioning. For the versions available, see the tags on this repository.
Releases are done by creating new release(aka tag) via Github user interface. Once created Travis CI will pick it up, build and upload final .jar file as asset for the Github release.
- Use multithreaded DynamoDB table scan for faster
INIT SYNC
This project is licensed under the MIT License - see the LICENSE file for details
Third party components and dependencies are covered by the following licenses - see the LICENSE-3rd-PARTIES file for details:
- Apache Software License version 2.0
- Amazon Software License